diff --git a/crates/tor-proto/src/channel.rs b/crates/tor-proto/src/channel.rs index dbf8d7fcaf8c8d99a8603d4467c9621a5b44c98d..650d22dba5c0827c8b7cf357b4d81953c1cf27e8 100644 --- a/crates/tor-proto/src/channel.rs +++ b/crates/tor-proto/src/channel.rs @@ -863,9 +863,11 @@ impl Channel { // TODO: blocking is risky, but so is unbounded. let (sender, receiver) = MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); let (createdsender, createdreceiver) = oneshot::channel::(); let (tx, rx) = oneshot::channel(); + self.send_control(CtrlMsg::AllocateCircuit { created_sender: createdsender, sender, @@ -911,6 +913,7 @@ impl Channel { // TODO: blocking is risky, but so is unbounded. let (sender, receiver) = MpscSpec::new(128).new_mq(time_prov.clone(), memquota.as_raw_account())?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); let (createdsender, createdreceiver) = oneshot::channel::(); let (tx, rx) = oneshot::channel(); diff --git a/crates/tor-proto/src/channel/circmap.rs b/crates/tor-proto/src/channel/circmap.rs index ab284cc19f19b0b446483cb506b703c1c7208605..1801070eb3227e8435e19e1158fbc24b25b34f8f 100644 --- a/crates/tor-proto/src/channel/circmap.rs +++ b/crates/tor-proto/src/channel/circmap.rs @@ -364,7 +364,8 @@ mod test { #![allow(clippy::needless_pass_by_value)] //! use super::*; - use crate::{client::circuit::padding::new_padding, fake_mpsc}; + use crate::circuit::test::fake_mpsc; + use crate::client::circuit::padding::new_padding; use tor_basic_utils::test_rng::testing_rng; use tor_rtcompat::DynTimeProvider; diff --git a/crates/tor-proto/src/channel/reactor.rs b/crates/tor-proto/src/channel/reactor.rs index 9e6c3f65598360103c2e3a8005c418c25f02939c..f43cb9c2032c8f76d244a244eb75389c2b4801c5 100644 --- a/crates/tor-proto/src/channel/reactor.rs +++ b/crates/tor-proto/src/channel/reactor.rs @@ -899,9 +899,9 @@ pub(crate) mod test { #![allow(clippy::unwrap_used)] use super::*; use crate::channel::{Canonicity, ChannelMode, ClosedUnexpectedly, UniqId}; + use crate::circuit::test::fake_mpsc; use crate::client::circuit::CircParameters; use crate::client::circuit::padding::new_padding; - use crate::fake_mpsc; use crate::peer::{PeerAddr, PeerInfo}; use crate::util::{DummyTimeoutEstimator, fake_mq}; use futures::sink::SinkExt; diff --git a/crates/tor-proto/src/circuit.rs b/crates/tor-proto/src/circuit.rs index 85186c50e778b3fae194bd4ac9424f5fb0986470..b221b4bdfed7b174752e973a9d6a76f817730431 100644 --- a/crates/tor-proto/src/circuit.rs +++ b/crates/tor-proto/src/circuit.rs @@ -4,6 +4,7 @@ pub(crate) mod cell_sender; pub(crate) mod celltypes; +pub(crate) mod circ_sender; pub(crate) mod circhop; pub(crate) mod create; pub(crate) mod padding; @@ -18,17 +19,7 @@ pub use unique_id::UniqId; use crate::ccparams::CongestionControlParams; use crate::stream::flow_ctrl::params::FlowCtrlParameters; -use tor_cell::chancell::msg::AnyChanMsg; -use tor_memquota::mq_queue::{self, MpscSpec}; - -/// The following two MPSCs take any channel message as the receiving end can be either a client or -/// a relay circuit reactor. The reactor itself will convert into its restricted message set. On -/// error, the circuit will shutdown as it will be considered a protocol violation. -/// -/// MPSC queue for inbound data on its way from channel to circuit, sender -pub(crate) type CircuitRxSender = mq_queue::Sender; -/// MPSC queue for inbound data on its way from channel to circuit, receiver -pub(crate) type CircuitRxReceiver = mq_queue::Receiver; +pub(crate) use circ_sender::{CircuitRxReceiver, CircuitRxSender}; /// Estimated upper bound for the likely number of hops. pub(crate) const HOPS: usize = 6; @@ -106,6 +97,8 @@ pub(crate) mod test { #[cfg(feature = "relay")] use crate::relay::{CircNetParameters, CongestionControlNetParams}; + pub(crate) use super::circ_sender::test::fake_mpsc; + /// Return a new [`CircNetParameters`] using default values for unit tests. They are based on /// consensus defaults but should not be considered to be accurate from the one used on the /// production network. diff --git a/crates/tor-proto/src/circuit/circ_sender.rs b/crates/tor-proto/src/circuit/circ_sender.rs new file mode 100644 index 0000000000000000000000000000000000000000..fea32aec85fb6abac39e8a3a41a59bb789af291a --- /dev/null +++ b/crates/tor-proto/src/circuit/circ_sender.rs @@ -0,0 +1,371 @@ +//! Implements a sender and a [`Stream`] type for sending messages +//! from a [channel](crate::channel) to a circuit, +//! prioritizing the delivery of `DESTROY` messages. +//! +//! [`CircuitRxSender`] and [`CircuitRxReceiver`] take any channel message, +//! because the receiving end can be either a client or a relay circuit reactor. +//! The reactor itself will convert into its restricted message set. + +use std::pin::Pin; +use std::task::{self, Context, Poll}; + +use futures::{FutureExt as _, SinkExt as _, Stream, StreamExt as _}; +use oneshot_fused_workaround as oneshot; +use tor_basic_utils::assert_val_impl_trait; +use tor_cell::chancell::msg::{AnyChanMsg, Destroy}; +use tor_memquota::mq_queue::{self, ChannelSpec, MpscSpec}; + +/// The sending end of the SPSC queue for inbound data on its way from channel to circuit +/// +/// A [`CircuitRxSender`] sender is closed for sending as soon as the first +/// `DESTROY` message is sent, and will discard any unflushed cells +/// from its underlying [`mq_queue`], by dropping it. +/// +/// ## No [`Sink`](futures::Sink) implementation +/// +/// This type intentionally does not implement [`Sink`](futures::Sink). +/// Instead it provides a [`send()`](CircuitRxSender::send) function +/// similar to [`SinkExt::send`](futures::SinkExt::send). +/// +/// The reason for doing it this way is because we cannot provide +/// a correct `Sink::poll:ready()` implementation +/// that wouldn't block DESTROY cells from being sent +/// when our underlying MPSC sender is full: +/// `SinkExt::send()` calls `poll_ready()` followed by `start_send()`, +/// so in order for our `poll_ready()` implementation to not block DESTROY +/// on the MPSC queue's readiness, it would need to know whether +/// the cell that will be sent via `start_send()` is a DESTROY or not, +/// but that's not possible because of the way the `Sink`/`SinkExt` traits +/// are designed. +#[derive(Debug)] +pub(crate) struct CircuitRxSender(Option); + +/// The inner state of a [`CircuitRxSender`]. +#[derive(Debug)] +struct CircuitRxSenderInner { + /// Sender for sending `DESTROY` to [`CircuitRxReceiver`] + destroy_tx: oneshot::Sender, + /// Sender for sending all other [`AnyChanMsg`]s to [`CircuitRxReceiver`] + cell_tx: mq_queue::Sender, +} + +/// The receiving end of the SPSC queue for inbound data on its way from channel to circuit +/// +/// A [`CircuitRxReceiver`] stream ends as soon as the first `DESTROY` message +/// is received, causing the stream to discard any unflushed cells +/// from its underlying [`mq_queue`], by dropping it. +#[derive(Debug)] +pub(crate) struct CircuitRxReceiver(Option); + +/// The inner state of a [`CircuitRxReceiver`]. +#[derive(Debug)] +struct CircuitRxReceiverInner { + /// Receiver for receiving `DESTROY` from [`CircuitRxSender`] + destroy_rx: oneshot::Receiver, + /// Receiver for receiving all other [`AnyChanMsg`]s from [`CircuitRxReceiver`] + cell_rx: mq_queue::Receiver, +} + +/// Wrap the sender and receiver of an [`mq_queue`] channel +/// into [`CircuitRxSender`] and [`CircuitRxReceiver`]. +/// +/// The returned channel will ensure any DESTROY messages sent +/// over the [`CircuitRxSender`] will be delivered +/// by the [`CircuitRxReceiver`] immediately, +/// ahead of any other messages that might already be queued, +/// which will be discarded. +/// +/// We are fine with the resulting data loss, because inbound DESTROY +/// can be indicative of malicious activity on the circuit. +/// We choose to err on the safe side, and free up the resources associated +/// with such circuits as soon as possible. +/// DESTROY messages are also sent by relays when they're about to hibernate, +/// and by clients once they've decided to stop using a circuit. +/// In the latter case, the lack of an `RELAY_COMMAND_END_ACK` +/// does mean that this prioritization can cause data loss +/// (if the client closes the circuit immediately after END-ing a stream). +/// However, this is a deficiency in the protocol, +/// and not something we want to fix by implementing custom flushing logic +/// in the reactor. See torspec#196 and the discussion in #2490. +/// +/// Note: the underlying buffer of the [`mq_queue`] will only be freed +/// once both the [`CircuitRxSender`] and [`CircuitRxReceiver`] are dropped; +/// in other words, after a `DESTROY` cell has been obtained from the [`CircuitRxReceiver`], +/// via its [`Stream`] implementation +pub(crate) fn channel( + cell_tx: mq_queue::Sender, + cell_rx: mq_queue::Receiver, +) -> (CircuitRxSender, CircuitRxReceiver) { + let (destroy_tx, destroy_rx) = oneshot::channel(); + let sender = CircuitRxSender(Some(CircuitRxSenderInner { + destroy_tx, + cell_tx, + })); + + let receiver = CircuitRxReceiver(Some(CircuitRxReceiverInner { + destroy_rx, + cell_rx, + })); + + (sender, receiver) +} + +impl Stream for CircuitRxReceiver { + type Item = AnyChanMsg; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(inner) = self.0.as_mut() else { + return Poll::Ready(None); + }; + + // It's important that destroy_rx is fused, + // because we call poll_unpin() unconditionally below. + assert_val_impl_trait!(inner.destroy_rx, futures_util::future::FusedFuture); + + // First, check if we have a DESTROY message ready + let destroy_cell = match inner.destroy_rx.poll_unpin(cx) { + Poll::Ready(destroy) => { + // If destroy.is_err(), it means the CircuitRxSender was dropped, + // but there may be more data buffered in the underlying mpsc, + // so we need to continue polling cell_rx. + // + // This is important, because we want to preserve the behavior + // of the mq_queue, whose Receiver will continue yielding queued + // messages even after the Sender is dropped. + destroy.ok() + } + Poll::Pending => { + // No DESTROY message yet, so it's time to poll the non-priority + // message queue + None + } + }; + + if let Some(destroy) = destroy_cell { + // Drop the inner state, closing this stream + self.0 = None; + return Poll::Ready(Some(AnyChanMsg::Destroy(destroy))); + } + + let res = task::ready!(inner.cell_rx.poll_next_unpin(cx)); + + // Our CircuitRxSender impl will never send DESTROY messages + // on the cell_rx queue (they're always sent via the oneshot channel) + debug_assert!(!matches!(res, Some(AnyChanMsg::Destroy(_)))); + + Poll::Ready(res) + } +} + +/// Error returned when trying to write to a [`CircuitRxSender`] +#[derive(thiserror::Error, Clone, Debug)] +pub(crate) enum SendError { + /// The underlying MPSC channel rejected the message + #[error("{0}")] + Channel(#[from] mq_queue::SendError<::SendError>), + + /// The receiver has dropped + /// + // Note: technically, there are two "Disconnected" variants: + // this one, for the oneshot channel, and a second, hidden variant + // inside mq_queue:SendError, for the mq_queue one. + // + // It would be nice if we only had one variant covering both cases, + // but this will have to do for now. + #[error("the receiver has dropped")] + Disconnected, + + /// The sender is closed + /// + /// Returned if the [`CircuitRxSender`] is used after a DESTROY cell has been written to it. + #[error("sender has closed")] + Closed, +} + +impl CircuitRxSender { + /// Send a cell down this channel + /// + /// If the sender is already closed (i.e., if we have already sent DESTROY), + /// this will return an error. + /// + // In practice, we never write more than 1 DESTROY cell to this sender, + // because the channel reactor removes the circuit (and corresponding CircuitRxSender) + // from its circ map after the first DESTROY. + pub(crate) async fn send(&mut self, msg: AnyChanMsg) -> Result<(), SendError> { + if let AnyChanMsg::Destroy(d) = msg { + let inner = self.take_inner()?; + + if inner.destroy_tx.send(d).is_err() { + return Err(SendError::Disconnected); + } + + Ok(()) + } else { + self.borrow_for_sending()?.cell_tx.send(msg).await?; + Ok(()) + } + } + + /// Borrow the [`CircuitRxSenderInner`] state for sending. + /// + /// Returns an error if the sender is closed. + fn borrow_for_sending(&mut self) -> Result<&mut CircuitRxSenderInner, SendError> { + self.0.as_mut().ok_or_else(|| SendError::Closed) + } + + /// Take the inner [`CircuitRxSenderInner`], closing the sender. + /// + /// Returns an error if the sender is already closed. + fn take_inner(&mut self) -> Result { + self.0.take().ok_or_else(|| SendError::Closed) + } +} + +#[cfg(test)] +pub(crate) mod test { + // @@ begin test lint list maintained by maint/add_warning @@ + #![allow(clippy::bool_assert_comparison)] + #![allow(clippy::clone_on_copy)] + #![allow(clippy::dbg_macro)] + #![allow(clippy::mixed_attributes_style)] + #![allow(clippy::print_stderr)] + #![allow(clippy::print_stdout)] + #![allow(clippy::single_char_pattern)] + #![allow(clippy::unwrap_used)] + #![allow(clippy::unchecked_time_subtraction)] + #![allow(clippy::useless_vec)] + #![allow(clippy::needless_pass_by_value)] + //! + + use super::*; + + use tor_cell::chancell::msg::{self, DestroyReason}; + use tor_rtmock::MockRuntime; + + use std::task::Waker; + + /// Make an MPSC queue, of the type we use to send cells + /// from the channel reactor to the circuit reactor, + /// but a fake one for testing + #[cfg(test)] + pub(crate) fn fake_mpsc(buffer: usize) -> (CircuitRxSender, CircuitRxReceiver) { + let (tx, rx) = crate::fake_mpsc(buffer); + + crate::circuit::circ_sender::channel(tx, rx) + } + + /// A DESTROY message + fn destroy_msg(reason: DestroyReason) -> AnyChanMsg { + AnyChanMsg::Destroy(msg::Destroy::new(reason)) + } + + /// A RELAY message + fn relay_msg() -> AnyChanMsg { + AnyChanMsg::Relay(msg::Relay::new(b"hello")) + } + + macro_rules! assert_eos { + ($tx:expr, $rx:expr) => {{ + assert!($rx.next().await.is_none()); + // Cannot send any more cells once the sender is closed + let err = $tx.send(relay_msg()).await.unwrap_err(); + assert!(matches!(err, SendError::Closed)); + }}; + } + + /// The buffer size to use for the fake MPSC queues + const BUFFER_SIZE: usize = 16; + + #[test] + fn destroy_skips_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + tx.send(relay_msg()).await.unwrap(); + tx.send(destroy_msg(DestroyReason::HIBERNATING)) + .await + .unwrap(); + + // Destroy skips the queue + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_on_empty_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + tx.send(destroy_msg(DestroyReason::HIBERNATING)) + .await + .unwrap(); + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_after_data() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + for _ in 0..3 { + tx.send(relay_msg()).await.unwrap(); + } + + for _ in 0..3 { + let data = rx.next().await.unwrap(); + assert!(matches!(data, AnyChanMsg::Relay(_))); + } + + let mut noop_cx = Context::from_waker(Waker::noop()); + // The queue is now empty + assert!(rx.poll_next_unpin(&mut noop_cx).is_pending()); + + tx.send(destroy_msg(DestroyReason::PROTOCOL)).await.unwrap(); + + let destroy = rx.next().await.unwrap(); + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } + + #[test] + fn destroy_full_queue() { + MockRuntime::test_with_various(|_rt| async move { + let (mut tx, mut rx) = fake_mpsc(BUFFER_SIZE); + + // Fill the queue with data... + loop { + let fut = Box::pin(tx.send(relay_msg())); + match futures::poll!(fut) { + Poll::Pending => { + // Full, time to break + break; + } + Poll::Ready(res) => { + let () = res.unwrap(); + } + } + } + // ...followed by a destroy + tx.send(destroy_msg(DestroyReason::INTERNAL)).await.unwrap(); + + // The destroy cell goes through even though the queue is full, + // ahead of all the queued data + let destroy = rx.next().await.unwrap(); + + assert!(matches!(destroy, AnyChanMsg::Destroy(_))); + // And we've reached EOS + assert_eos!(tx, rx); + }); + } +} diff --git a/crates/tor-proto/src/client/circuit.rs b/crates/tor-proto/src/client/circuit.rs index 61c3f84155b0dbad365384870cad960a6c92b8e3..2b015c0f32f64f226c28d9808a4399244d679879 100644 --- a/crates/tor-proto/src/client/circuit.rs +++ b/crates/tor-proto/src/client/circuit.rs @@ -85,9 +85,6 @@ use tor_memquota::derive_deftly_template_HasMemoryCost; use crate::crypto::handshake::ntor::NtorPublicKey; -#[cfg(test)] -use crate::stream::{StreamMpscReceiver, StreamMpscSender}; - pub use crate::crypto::binding::CircuitBinding; pub use path::{Path, PathEntry}; @@ -1011,6 +1008,7 @@ pub(crate) mod test { use crate::channel::test::{CodecResult, new_reactor}; use crate::circuit::CircuitRxSender; use crate::circuit::reactor::test::rmsg_to_ccmsg; + use crate::circuit::test::fake_mpsc; use crate::client::circuit::padding::new_padding; use crate::client::stream::DataStream; use crate::congestion::params::CongestionControlParams; @@ -1039,7 +1037,6 @@ pub(crate) mod test { }; use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg}; use tor_linkspec::OwnedCircTarget; - use tor_memquota::HasMemoryCost; use tor_rtcompat::Runtime; use tor_rtcompat::SpawnExt; use tracing::trace; @@ -1098,14 +1095,6 @@ pub(crate) mod test { const EXAMPLE_ED_ID: [u8; 32] = [6; 32]; const EXAMPLE_RSA_ID: [u8; 20] = [10; 20]; - /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing - #[cfg(test)] - pub(crate) fn fake_mpsc( - buffer: usize, - ) -> (StreamMpscSender, StreamMpscReceiver) { - crate::fake_mpsc(buffer) - } - /// return an example OwnedCircTarget that can get used for an ntor handshake. fn example_target() -> OwnedCircTarget { let mut builder = OwnedCircTarget::builder(); diff --git a/crates/tor-proto/src/relay/channel/create_handler.rs b/crates/tor-proto/src/relay/channel/create_handler.rs index 110961ceaf604d0475dbbab10f5f0ae6360ec7d1..f62c6e3889df3c4b720ab9ade1006583d859e39c 100644 --- a/crates/tor-proto/src/relay/channel/create_handler.rs +++ b/crates/tor-proto/src/relay/channel/create_handler.rs @@ -153,6 +153,7 @@ impl CreateRequestHandler { let time_provider = DynTimeProvider::new(runtime.clone()); let account = memquota.as_raw_account(); let (sender, receiver) = MpscSpec::new(10_000_000).new_mq(time_provider, account)?; + let (sender, receiver) = crate::circuit::circ_sender::channel(sender, receiver); // TODO(relay): Do we really want a client padding machine here? let (padding_ctrl, padding_stream) = diff --git a/crates/tor-proto/src/relay/reactor.rs b/crates/tor-proto/src/relay/reactor.rs index f0bdb19e7f61b935dec917bbc7486276b8d532f2..ee0e8c6649cf30fefb82283b07265d2efe948c1d 100644 --- a/crates/tor-proto/src/relay/reactor.rs +++ b/crates/tor-proto/src/relay/reactor.rs @@ -207,18 +207,18 @@ pub(crate) mod test { use super::*; use crate::circuit::reactor::test::{AllowAllStreamsFilter, rmsg_to_ccmsg}; + use crate::circuit::test::fake_mpsc; use crate::circuit::{CircParameters, CircuitRxSender}; use crate::client::circuit::padding::new_padding; use crate::congestion::test_utils::params::build_cc_vegas_params; use crate::crypto::cell::RelayCellBody; use crate::crypto::cell::{InboundRelayLayer, OutboundRelayLayer}; - use crate::fake_mpsc; use crate::memquota::SpecificAccount as _; use crate::relay::channel::test::{DummyChan, DummyChanProvider, working_dummy_channel}; use crate::stream::flow_ctrl::params::FlowCtrlParameters; use crate::stream::incoming::{IncomingStream, IncomingStreamRequestFilter}; - use futures::{AsyncReadExt as _, SinkExt as _, StreamExt as _}; + use futures::{AsyncReadExt as _, StreamExt as _}; use tracing_test::traced_test; use tor_cell::chancell::{ChanCell, ChanCmd, msg as chanmsg}; diff --git a/crates/tor-proto/src/streammap.rs b/crates/tor-proto/src/streammap.rs index f71eafff270f5262f03319782634f5d24a73abd4..a1283d48d00d9c0577efd79d030542053c3b545a 100644 --- a/crates/tor-proto/src/streammap.rs +++ b/crates/tor-proto/src/streammap.rs @@ -584,11 +584,20 @@ mod test { #![allow(clippy::needless_pass_by_value)] //! use super::*; - use crate::client::circuit::test::fake_mpsc; use crate::stream::queue::fake_stream_queue; + use crate::stream::{StreamMpscReceiver, StreamMpscSender}; use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow}; + use std::fmt::Debug; + use tor_memquota::HasMemoryCost; use web_time_compat::InstantExt; + /// Make an MPSC queue, of the type we use in Channels, but a fake one for testing + fn fake_mpsc( + buffer: usize, + ) -> (StreamMpscSender, StreamMpscReceiver) { + crate::fake_mpsc(buffer) + } + #[test] fn test_wrapping_next_stream_id() { let one = StreamId::new(1).unwrap();