From 7dd0e99db6fb49e1ce4d6cb0abce53dc5acbdfad Mon Sep 17 00:00:00 2001 From: Gabriela Moldovan Date: Wed, 13 May 2026 13:21:02 +0000 Subject: [PATCH 1/4] proto: Add a new channel -> circuit queue type This adds new a `CircuitRxSender`/`CircuitRxReceiver` queue type. The corresponding (`Sink`-link) sender and `Stream` implementations prioritize the delivery of `DESTROY` messages, which get delivered immediately, even if there are other messages queued in the underlying MPSC message queue. We are okay with the resulting data loss, because inbound DESTROY can be indicative of malicious activity on the circuit. We choose to err on the safe side, and free up the resources associated with such circuits as soon as possible. DESTROY messages are also sent by relays when they're about to hibernate, and by clients once they've decided to stop using a circuit. In the latter case, the lack of an `RELAY_COMMAND_END_ACK` does mean that this prioritization can cause data loss in cases where the client closes the circuit immediately after END-ing a stream. However, this is a deficiency in the protocol, and not something we want to fix by implementing custom flushing logic in the reactor. See torspec#196 and the discussion in #2490. Part of #2490 --- crates/tor-proto/src/circuit.rs | 1 + crates/tor-proto/src/circuit/circ_sender.rs | 371 ++++++++++++++++++++ 2 files changed, 372 insertions(+) create mode 100644 crates/tor-proto/src/circuit/circ_sender.rs diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 85186c50e7..eb434dfcaa 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -4,6 +4,7 @@ pub(crate) mod cell_sender; pub(crate) mod celltypes; +pub(crate) mod circ_sender; pub(crate) mod circhop; pub(crate) mod create; pub(crate) mod padding; diff --git a/crates/tor-proto/src/circuit/circ_sender.rs b/crates/tor-proto/src/circuit/circ_sender.rs new file mode 100644 index 0000000000..fea32aec85 --- /dev/null +++ b/crates/tor-proto/src/circuit/circ_sender.rs @@ -0,0 +1,371 @@ +//! Implements a sender and a [`Stream`] type for sending messages +//! from a [channel](crate::channel) to a circuit, +//! prioritizing the delivery of `DESTROY` messages. +//! +//! [`CircuitRxSender`] and [`CircuitRxReceiver`] take any channel message, +//! because the receiving end can be either a client or a relay circuit reactor. +//! The reactor itself will convert into its restricted message set. + +use std::pin::Pin; +use std::task::{self, Context, Poll}; + +use futures::{FutureExt as _, SinkExt as _, Stream, StreamExt as _}; +use oneshot_fused_workaround as oneshot; +use tor_basic_utils::assert_val_impl_trait; +use tor_cell::chancell::msg::{AnyChanMsg, Destroy}; +use tor_memquota::mq_queue::{self, ChannelSpec, MpscSpec}; + +/// The sending end of the SPSC queue for inbound data on its way from channel to circuit +/// +/// A [`CircuitRxSender`] sender is closed for sending as soon as the first +/// `DESTROY` message is sent, and will discard any unflushed cells +/// from its underlying [`mq_queue`], by dropping it. +/// +/// ## No [`Sink`](futures::Sink) implementation +/// +/// This type intentionally does not implement [`Sink`](futures::Sink). +/// Instead it provides a [`send()`](CircuitRxSender::send) function +/// similar to [`SinkExt::send`](futures::SinkExt::send). +/// +/// The reason for doing it this way is because we cannot provide +/// a correct `Sink::poll:ready()` implementation +/// that wouldn't block DESTROY cells from being sent +/// when our underlying MPSC sender is full: +/// `SinkExt::send()` calls `poll_ready()` followed by `start_send()`, +/// so in order for our `poll_ready()` implementation to not block DESTROY +/// on the MPSC queue's readiness, it would need to know whether +/// the cell that will be sent via `start_send()` is a DESTROY or not, +/// but that's not possible because of the way the `Sink`/`SinkExt` traits +/// are designed. +#[derive(Debug)] +pub(crate) struct CircuitRxSender(Option); + +/// The inner state of a [`CircuitRxSender`]. +#[derive(Debug)] +struct CircuitRxSenderInner { + /// Sender for sending `DESTROY` to [`CircuitRxReceiver`] + destroy_tx: oneshot::Sender, + /// Sender for sending all other [`AnyChanMsg`]s to [`CircuitRxReceiver`] + cell_tx: mq_queue::Sender, +} + +/// The receiving end of the SPSC queue for inbound data on its way from channel to circuit +/// +/// A [`CircuitRxReceiver`] stream ends as soon as the first `DESTROY` message +/// is received, causing the stream to discard any unflushed cells +/// from its underlying [`mq_queue`], by dropping it. +#[derive(Debug)] +pub(crate) struct CircuitRxReceiver(Option); + +/// The inner state of a [`CircuitRxReceiver`]. +#[derive(Debug)] +struct CircuitRxReceiverInner { + /// Receiver for receiving `DESTROY` from [`CircuitRxSender`] + destroy_rx: oneshot::Receiver, + /// Receiver for receiving all other [`AnyChanMsg`]s from [`CircuitRxReceiver`] + cell_rx: mq_queue::Receiver, +} + +/// Wrap the sender and receiver of an [`mq_queue`] channel +/// into [`CircuitRxSender`] and [`CircuitRxReceiver`]. +/// +/// The returned channel will ensure any DESTROY messages sent +/// over the [`CircuitRxSender`] will be delivered +/// by the [`CircuitRxReceiver`] immediately, +/// ahead of any other messages that might already be queued, +/// which will be discarded. +/// +/// We are fine with the resulting data loss, because inbound DESTROY +/// can be indicative of malicious activity on the circuit. +/// We choose to err on the safe side, and free up the resources associated +/// with such circuits as soon as possible. +/// DESTROY messages are also sent by relays when they're about to hibernate, +/// and by clients once they've decided to stop using a circuit. +/// In the latter case, the lack of an `RELAY_COMMAND_END_ACK` +/// does mean that this prioritization can cause data loss +/// (if the client closes the circuit immediately after END-ing a stream). +/// However, this is a deficiency in the protocol, +/// and not something we want to fix by implementing custom flushing logic +/// in the reactor. See torspec#196 and the discussion in #2490. +/// +/// Note: the underlying buffer of the [`mq_queue`] will only be freed +/// once both the [`CircuitRxSender`] and [`CircuitRxReceiver`] are dropped; +/// in other words, after a `DESTROY` cell has been obtained from the [`CircuitRxReceiver`], +/// via its [`Stream`] implementation +pub(crate) fn channel( + cell_tx: mq_queue::Sender, + cell_rx: mq_queue::Receiver, +) -> (CircuitRxSender, CircuitRxReceiver) { + let (destroy_tx, destroy_rx) = oneshot::channel(); + let sender = CircuitRxSender(Some(CircuitRxSenderInner { + destroy_tx, + cell_tx, + })); + + let receiver = CircuitRxReceiver(Some(CircuitRxReceiverInner { + destroy_rx, + cell_rx, + })); + + (sender, receiver) +} + +impl Stream for CircuitRxReceiver { + type Item = AnyChanMsg; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(inner) = self.0.as_mut() else { + return Poll::Ready(None); + }; + + // It's important that destroy_rx is fused, + // because we call poll_unpin() unconditionally below. + assert_val_impl_trait!(inner.destroy_rx, futures_util::future::FusedFuture); + + // First, check if we have a DESTROY message ready + let destroy_cell = match inner.destroy_rx.poll_unpin(cx) { + Poll::Ready(destroy) => { + // If destroy.is_err(), it means the CircuitRxSender was dropped, + // but there may be more data buffered in the underlying mpsc, + // so we need to continue polling cell_rx. + // + // This is important, because we want to preserve the behavior + // of the mq_queue, whose Receiver will continue yielding queued + // messages even after the Sender is dropped. + destroy.ok() + } + Poll::Pending => { + // No DESTROY message yet, so it's time to poll the non-priority + // message queue + None + } + }; + + if let Some(destroy) = destroy_cell { + // Drop the inner state, closing this stream + self.0 = None; + return Poll::Ready(Some(AnyChanMsg::Destroy(destroy))); + } + + let res = task::ready!(inner.cell_rx.poll_next_unpin(cx)); + + // Our CircuitRxSender impl will never send DESTROY messages + // on the cell_rx queue (they're always sent via the oneshot channel) + debug_assert!(!matches!(res, Some(AnyChanMsg::Destroy(_)))); + + Poll::Ready(res) + } +} + +/// Error returned when trying to write to a [`CircuitRxSender`] +#[derive(thiserror::Error, Clone, Debug)] +pub(crate) enum SendError { + /// The underlying MPSC channel rejected the message + #[error("{0}")] + Channel(#[from] mq_queue::SendError<::SendError>), + + /// The receiver has dropped + /// + // Note: technically, there are two "Disconnected" variants: + // this one, for the oneshot channel, and a second, hidden variant + // inside mq_queue:SendError, for the mq_queue one. + // + // It would be nice if we only had one variant covering both cases, + // but this will have to do for now. + #[error("the receiver has dropped")] + Disconnected, + + /// The sender is closed + /// + /// Returned if the [`CircuitRxSender`] is used after a DESTROY cell has been written to it. + #[error("sender has closed")] + Closed, +} + +impl CircuitRxSender { + /// Send a cell down this channel + /// + /// If the sender is already closed (i.e., if we have already sent DESTROY), + /// this will return an error. + /// + // In practice, we never write more than 1 DESTROY cell to this sender, + // because the channel reactor removes the circuit (and corresponding CircuitRxSender) + // from its circ map after the first DESTROY. + pub(crate) async fn send(&mut self, msg: AnyChanMsg) -> Result<(), SendError> { + if let AnyChanMsg::Destroy(d) = msg { + let inner = self.take_inner()?; + + if inner.destroy_tx.send(d).is_err() { + return Err(SendError::Disconnected); + } + + Ok(()) + } else { + self.borrow_for_sending()?.cell_tx.send(msg).await?; + Ok(()) + } + } + + /// Borrow the [`CircuitRxSenderInner`] state for sending. + /// + /// Returns an error if the sender is closed. + fn borrow_for_sending(&mut self) -> Result<&mut CircuitRxSenderInner, SendError> { + self.0.as_mut().ok_or_else(|| SendError::Closed) + } + + /// Take the inner [`CircuitRxSenderInner`], closing the sender. + /// + /// Returns an error if the sender is already closed. + fn take_inner(&mut self) -> Result { + self.0.take().ok_or_else(|| SendError::Closed) + } +} + +#[cfg(test)] +pub(crate) mod test { + // @@ begin test lint list maintained by maint/add_warning @@ + #![allow(clippy::bool_assert_comparison)] + #![allow(clippy::clone_on_copy)] + #![allow(clippy::dbg_macro)] + #![allow(clippy::mixed_attributes_style)] + #![allow(clippy::print_stderr)] + #![allow(clippy::print_stdout)] + #![allow(clippy::single_char_pattern)] + #![allow(clippy::unwrap_used)] + #![allow(clippy::unchecked_time_subtraction)] + #![allow(clippy::useless_vec)] + #![allow(clippy::needless_pass_by_value)] + //! + + use super::*; + + use tor_cell::chancell::msg::{self, DestroyReason}; + use tor_rtmock::MockRuntime; + + use std::task::Waker; + + /// Make an MPSC queue, of the type we use to send cells + /// from the channel reactor to the circuit reactor, + /// but a fake one for testing + #[cfg(test)] + pub(crate) fn fake_mpsc(buffer: usize) -> (CircuitRxSender, CircuitRxReceiver) { + let (tx, rx) = crate::fake_mpsc(buffer); + + crate::circuit::circ_sender::channel(tx, rx) + } + + /// A DESTROY message + fn destroy_msg(reason: DestroyReason) -> AnyChanMsg { + AnyChanMsg::Destroy(msg::Destroy::new(reason)) + } + + /// A RELAY message + fn relay_msg() -> AnyChanMsg { + AnyChanMsg::Relay(msg::Relay::new(b"hello")) + } + + macro_rules! assert_eos { + ($tx:expr, $rx:expr) => {{ + assert!($rx.next().await.is_none()); + // Cannot send any more cells once the sender is closed + let err = $tx.send(relay_msg()).await.unwrap_err(); + assert!(matches!(err, SendError::Closed)); + }}; + } + + /// The buffer size to use for the fake MPSC queues + const BUFFER_SIZE: usize = 16; + + #[test] + fn destroy_skips_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + tx.send(relay_msg()).await.unwrap(); + tx.send(destroy_msg(DestroyReason::HIBERNATING)) + .await + .unwrap(); + + // Destroy skips the queue + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_on_empty_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + tx.send(destroy_msg(DestroyReason::HIBERNATING)) + .await + .unwrap(); + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_after_data() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + for _ in 0..3 { + tx.send(relay_msg()).await.unwrap(); + } + + for _ in 0..3 { + let data = rx.next().await.unwrap(); + assert!(matches!(data, AnyChanMsg::Relay(_))); + } + + let mut noop_cx = Context::from_waker(Waker::noop()); + // The queue is now empty + assert!(rx.poll_next_unpin(&mut noop_cx).is_pending()); + + tx.send(destroy_msg(DestroyReason::PROTOCOL)).await.unwrap(); + + let destroy = rx.next().await.unwrap(); + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_full_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + // Fill the queue with data... + loop { + let fut = Box::pin(tx.send(relay_msg())); + match futures::poll!(fut) { + Poll::Pending => { + // Full, time to break + break; + } + Poll::Ready(res) => { + let () = res.unwrap(); + } + } + } + // ...followed by a destroy + tx.send(destroy_msg(DestroyReason::INTERNAL)).await.unwrap(); + + // The destroy cell goes through even though the queue is full, + // ahead of all the queued data + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } +} -- GitLab From 4f5571fbd2d887c630ad02f36affaeb1db81db3a Mon Sep 17 00:00:00 2001 From: Gabriela Moldovan Date: Thu, 4 Jun 2026 15:56:56 +0000 Subject: [PATCH 2/4] proto: Remove now-unused import `CircuitRxSender` is no longer a `Sink`, so we don't need this import anymore. --- crates/tor-proto/src/relay/reactor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/tor-proto/src/relay/reactor.rs b/crates/tor-proto/src/relay/reactor.rs index f0bdb19e7f..3b0a135815 100644 --- a/crates/tor-proto/src/relay/reactor.rs +++ b/crates/tor-proto/src/relay/reactor.rs @@ -218,7 +218,7 @@ pub(crate) mod test { use crate::stream::flow_ctrl::params::FlowCtrlParameters; use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter}; - use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _}; + use futures::{AsyncReadExt as _, StreamExt as _}; use tracing_test::traced_test; use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg}; -- GitLab From 6b206002d3aaadeee3d2442637c62d1b752b0960 Mon Sep 17 00:00:00 2001 From: Gabriela Moldovan Date: Wed, 13 May 2026 13:53:25 +0000 Subject: [PATCH 3/4] proto: Replace CircuitRx{Sender,Receiver} with new channel type This is needed for relays as part of #2490. Note that changing this type affects the client implementation too (i.e. clients will start prioritizing inbound DESTROY, discarding any queued data without forwarding it to their local streams). But that's okay, because it will generally only affect misbehaving clients, and clients unlucky enough to encounter a hibernating relay. --- crates/tor-proto/src/channel.rs | 3 +++ crates/tor-proto/src/circuit.rs | 12 +----------- crates/tor-proto/src/relay/channel/create_handler.rs | 1 + 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index dbf8d7fcaf..650d22dba5 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -863,9 +863,11 @@ impl Channel { // TODO: blocking is risky, but so is unbounded. let (sender, receiver) = MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); let (createdsender, createdreceiver) = oneshot::channel::(); let (tx, rx) = oneshot::channel(); + self.send_control(CtrlMsg::AllocateCircuit { created_sender: createdsender, sender, @@ -911,6 +913,7 @@ impl Channel { // TODO: blocking is risky, but so is unbounded. let (sender, receiver) = MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); let (createdsender, createdreceiver) = oneshot::channel::(); let (tx, rx) = oneshot::channel(); diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index eb434dfcaa..894f52415a 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -19,17 +19,7 @@ pub use unique_id::UniqId; use crate::ccparams::CongestionControlParams; use crate::stream::flow_ctrl::params::FlowCtrlParameters; -use tor_cell::chancell::msg::AnyChanMsg; -use tor_memquota::mq_queue::{self, MpscSpec}; - -/// The following two MPSCs take any channel message as the receiving end can be either a client or -/// a relay circuit reactor. The reactor itself will convert into its restricted message set. On -/// error, the circuit will shutdown as it will be considered a protocol violation. -/// -/// MPSC queue for inbound data on its way from channel to circuit, sender -pub(crate) type CircuitRxSender = mq_queue::Sender; -/// MPSC queue for inbound data on its way from channel to circuit, receiver -pub(crate) type CircuitRxReceiver = mq_queue::Receiver; +pub(crate) use circ_sender::{CircuitRxReceiver, CircuitRxSender}; /// Estimated upper bound for the likely number of hops. pub(crate) const HOPS: usize = 6; diff --git a/crates/tor-proto/src/relay/channel/create_handler.rs b/crates/tor-proto/src/relay/channel/create_handler.rs index 110961ceaf..f62c6e3889 100644 --- a/crates/tor-proto/src/relay/channel/create_handler.rs +++ b/crates/tor-proto/src/relay/channel/create_handler.rs @@ -153,6 +153,7 @@ impl CreateRequestHandler { let time_provider = DynTimeProvider::new(runtime.clone()); let account = memquota.as_raw_account(); let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); // TODO(relay): Do we really want a client padding machine here? let (padding_ctrl, padding_stream) = -- GitLab From d68fede4bcd2d28d4bb6add00193e6170dea97da Mon Sep 17 00:00:00 2001 From: Gabriela Moldovan Date: Wed, 13 May 2026 17:47:20 +0000 Subject: [PATCH 4/4] proto: Update the tests to use the new CircuitRx{Receiver,Sender}s --- crates/tor-proto/src/channel/circmap.rs | 3 ++- crates/tor-proto/src/channel/reactor.rs | 2 +- crates/tor-proto/src/circuit.rs | 2 ++ crates/tor-proto/src/client/circuit.rs | 13 +------------ crates/tor-proto/src/relay/reactor.rs | 2 +- crates/tor-proto/src/streammap.rs | 11 ++++++++++- 6 files changed, 17 insertions(+), 16 deletions(-) diff --git a/crates/tor-proto/src/channel/circmap.rs b/crates/tor-proto/src/channel/circmap.rs index ab284cc19f..1801070eb3 100644 --- a/crates/tor-proto/src/channel/circmap.rs +++ b/crates/tor-proto/src/channel/circmap.rs @@ -364,7 +364,8 @@ mod test { #![allow(clippy::needless_pass_by_value)] //! use super::*; - use crate::{client::circuit::padding::new_padding, fake_mpsc}; + use crate::circuit::test::fake_mpsc; + use crate::client::circuit::padding::new_padding; use tor_basic_utils::test_rng::testing_rng; use tor_rtcompat::DynTimeProvider; diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 9e6c3f6559..f43cb9c203 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -899,9 +899,9 @@ pub(crate) mod test { #![allow(clippy::unwrap_used)] use super::*; use crate::channel::{Canonicity, ChannelMode, ClosedUnexpectedly, UniqId}; + use crate::circuit::test::fake_mpsc; use crate::client::circuit::CircParameters; use crate::client::circuit::padding::new_padding; - use crate::fake_mpsc; use crate::peer::{PeerAddr, PeerInfo}; use crate::util::{DummyTimeoutEstimator, fake_mq}; use futures::sink::SinkExt; diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 894f52415a..b221b4bdfe 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -97,6 +97,8 @@ pub(crate) mod test { #[cfg(feature = "relay")] use crate::relay::{CircNetParameters, CongestionControlNetParams}; + pub(crate) use super::circ_sender::test::fake_mpsc; + /// Return a new [`CircNetParameters`] using default values for unit tests. They are based on /// consensus defaults but should not be considered to be accurate from the one used on the /// production network. diff --git a/crates/tor-proto/src/client/circuit.rs b/crates/tor-proto/src/client/circuit.rs index 61c3f84155..2b015c0f32 100644 --- a/crates/tor-proto/src/client/circuit.rs +++ b/crates/tor-proto/src/client/circuit.rs @@ -85,9 +85,6 @@ use tor_memquota::derive_deftly_template_HasMemoryCost; use crate::crypto::handshake::ntor::NtorPublicKey; -#[cfg(test)] -use crate::stream::{StreamMpscReceiver, StreamMpscSender}; - pub use crate::crypto::binding::CircuitBinding; pub use path::{Path, PathEntry}; @@ -1011,6 +1008,7 @@ pub(crate) mod test { use crate::channel::test::{CodecResult, new_reactor}; use crate::circuit::CircuitRxSender; use crate::circuit::reactor::test::rmsg_to_ccmsg; + use crate::circuit::test::fake_mpsc; use crate::client::circuit::padding::new_padding; use crate::client::stream::DataStream; use crate::congestion::params::CongestionControlParams; @@ -1039,7 +1037,6 @@ pub(crate) mod test { }; use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg}; use tor_linkspec::OwnedCircTarget; - use tor_memquota::HasMemoryCost; use tor_rtcompat::Runtime; use tor_rtcompat::SpawnExt; use tracing::trace; @@ -1098,14 +1095,6 @@ pub(crate) mod test { const EXAMPLE_ED_ID: [u8; 32] = [6; 32]; const EXAMPLE_RSA_ID: [u8; 20] = [10; 20]; - /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing - #[cfg(test)] - pub(crate) fn fake_mpsc( - buffer: usize, - ) -> (StreamMpscSender, StreamMpscReceiver) { - crate::fake_mpsc(buffer) - } - /// return an example OwnedCircTarget that can get used for an ntor handshake. fn example_target() -> OwnedCircTarget { let mut builder = OwnedCircTarget::builder(); diff --git a/crates/tor-proto/src/relay/reactor.rs b/crates/tor-proto/src/relay/reactor.rs index 3b0a135815..ee0e8c6649 100644 --- a/crates/tor-proto/src/relay/reactor.rs +++ b/crates/tor-proto/src/relay/reactor.rs @@ -207,12 +207,12 @@ pub(crate) mod test { use super::*; use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg}; + use crate::circuit::test::fake_mpsc; use crate::circuit::{CircParameters, CircuitRxSender}; use crate::client::circuit::padding::new_padding; use crate::congestion::test_utils::params::build_cc_vegas_params; use crate::crypto::cell::RelayCellBody; use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer}; - use crate::fake_mpsc; use crate::memquota::SpecificAccount as _; use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel}; use crate::stream::flow_ctrl::params::FlowCtrlParameters; diff --git a/crates/tor-proto/src/streammap.rs b/crates/tor-proto/src/streammap.rs index f71eafff27..a1283d48d0 100644 --- a/crates/tor-proto/src/streammap.rs +++ b/crates/tor-proto/src/streammap.rs @@ -584,11 +584,20 @@ mod test { #![allow(clippy::needless_pass_by_value)] //! use super::*; - use crate::client::circuit::test::fake_mpsc; use crate::stream::queue::fake_stream_queue; + use crate::stream::{StreamMpscReceiver, StreamMpscSender}; use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow}; + use std::fmt::Debug; + use tor_memquota::HasMemoryCost; use web_time_compat::InstantExt; + /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing + fn fake_mpsc( + buffer: usize, + ) -> (StreamMpscSender, StreamMpscReceiver) { + crate::fake_mpsc(buffer) + } + #[test] fn test_wrapping_next_stream_id() { let one = StreamId::new(1).unwrap(); -- GitLab