Commit 6b206002 authored by gabi-250's avatar gabi-250 🤸
Browse files

proto: Replace CircuitRx{Sender,Receiver} with new channel type

This is needed for relays as part of #2490.

Note that changing this type affects the client implementation too (i.e.
clients will start prioritizing inbound DESTROY, discarding any queued
data without forwarding it to their local streams). But that's okay,
because it will generally only affect misbehaving clients, and clients
unlucky enough to encounter a hibernating relay.
parent 4f5571fb
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -863,9 +863,11 @@ impl Channel {
        // TODO: blocking is risky, but so is unbounded.
        let (sender, receiver) =
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();

        let (tx, rx) = oneshot::channel();

        self.send_control(CtrlMsg::AllocateCircuit {
            created_sender: createdsender,
            sender,
@@ -911,6 +913,7 @@ impl Channel {
        // TODO: blocking is risky, but so is unbounded.
        let (sender, receiver) =
            MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?;
        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();

        let (tx, rx) = oneshot::channel();
+1 −11
Original line number Diff line number Diff line
@@ -19,17 +19,7 @@ pub use unique_id::UniqId;
use crate::ccparams::CongestionControlParams;
use crate::stream::flow_ctrl::params::FlowCtrlParameters;

use tor_cell::chancell::msg::AnyChanMsg;
use tor_memquota::mq_queue::{self, MpscSpec};

/// The following two MPSCs take any channel message as the receiving end can be either a client or
/// a relay circuit reactor. The reactor itself will convert into its restricted message set. On
/// error, the circuit will shutdown as it will be considered a protocol violation.
///
/// MPSC queue for inbound data on its way from channel to circuit, sender
pub(crate) type CircuitRxSender = mq_queue::Sender<AnyChanMsg, MpscSpec>;
/// MPSC queue for inbound data on its way from channel to circuit, receiver
pub(crate) type CircuitRxReceiver = mq_queue::Receiver<AnyChanMsg, MpscSpec>;
pub(crate) use circ_sender::{CircuitRxReceiver, CircuitRxSender};

/// Estimated upper bound for the likely number of hops.
pub(crate) const HOPS: usize = 6;
+1 −0
Original line number Diff line number Diff line
@@ -153,6 +153,7 @@ impl CreateRequestHandler {
        let time_provider = DynTimeProvider::new(runtime.clone());
        let account = memquota.as_raw_account();
        let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?;
        let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver);

        // TODO(relay): Do we really want a client padding machine here?
        let (padding_ctrl, padding_stream) =