Commit 2ac2a476 authored by opara's avatar opara 🙃
Browse files

tor-proto: fix flow control for half-streams

This moves the window-based flow control for half-streams out of the
`HalfStream` and into the `HalfStreamWindowFlowCtrl` object.

Now that it's applied only in `HalfStreamWindowFlowCtrl` and not
generally for all half-streams, we no longer apply window-based flow
control to half-streams when they're really using xon/xoff-based flow
control.
parent c7086451
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -667,9 +667,6 @@ impl CircHopOutbound {

        // We need to handle SENDME/XON/XOFF messages here, not in the stream's recv() method, or
        // else we'd never notice them if the stream isn't reading.
        //
        // TODO: this logic is the same as `HalfStream::handle_msg`; we should refactor this if
        // possible
        match msg.cmd() {
            RelayCmd::SENDME => {
                ent.put_for_incoming_sendme(msg)?;
+46 −10
Original line number Diff line number Diff line
@@ -2,9 +2,12 @@

use tor_cell::relaycell::flow_ctrl::{Xoff, Xon, XonKbpsEwma};
use tor_cell::relaycell::msg::{AnyRelayMsg, Sendme};
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};

use crate::congestion::sendme::{self, StreamSendWindow};
use crate::congestion::sendme::{
    self, StreamRecvWindow, StreamSendWindow, cmd_counts_towards_windows,
};
use crate::stream::RECV_WINDOW_INIT;
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks};
use crate::{Error, Result};

@@ -75,24 +78,57 @@ impl FlowCtrlHooks for WindowFlowCtrl {

/// State for window-based flow control on a half-stream.
#[derive(Debug)]
pub(crate) struct HalfStreamWindowFlowCtrl {}
pub(crate) struct HalfStreamWindowFlowCtrl {
    /// The original [`WindowFlowCtrl`] from the full stream.
    ///
    /// We keep this since we need to continue validating any incoming messages.
    inner: WindowFlowCtrl,
    /// The stream's receive window.
    ///
    /// When it was a full-stream, the receive window was tracked by the `DataStream`.
    /// But since the `DataStream` has gone away, we need to track it ourselves.
    recv_window: StreamRecvWindow,
}

impl HalfStreamWindowFlowCtrl {
    /// Returns a new sendme-window-based state for a half-stream.
    pub(crate) fn new(_flow_ctrl: WindowFlowCtrl) -> Self {
        // XXXX
        Self {}
    pub(crate) fn new(flow_ctrl: WindowFlowCtrl) -> Self {
        Self {
            inner: flow_ctrl,
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
            //             so a malicious peer can send us slightly more data than they should
            //             be able to; see arti#230.
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
        }
    }
}

impl HalfStreamFlowCtrlHooks for HalfStreamWindowFlowCtrl {
    fn handle_incoming_dropped(&mut self, msg_count: u16) -> Result<()> {
        // XXXX
        todo!()
        self.recv_window.decrement_n(msg_count)
    }

    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
        // XXXX
        todo!()
        match msg.cmd() {
            RelayCmd::SENDME => {
                self.inner.put_for_incoming_sendme(msg)?;
                Ok(None)
            }
            RelayCmd::XON => {
                self.inner.handle_incoming_xon(msg)?;
                Ok(None)
            }
            RelayCmd::XOFF => {
                self.inner.handle_incoming_xoff(msg)?;
                Ok(None)
            }
            cmd if cmd_counts_towards_windows(cmd) => {
                // Discard the returned bool since we aren't sending any more SENDMEs.
                let _ = self.recv_window.take()?;
                Ok(Some(msg))
            }
            // Nothing to do here.
            _ => Ok(Some(msg)),
        }
    }
}
+28 −9
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ use std::sync::Arc;
use postage::watch;
use tor_cell::relaycell::flow_ctrl::{FlowCtrlVersion, Xoff, Xon, XonKbpsEwma};
use tor_cell::relaycell::msg::AnyRelayMsg;
use tor_cell::relaycell::{RelayMsg, UnparsedRelayMsg};
use tor_cell::relaycell::{RelayCmd, RelayMsg, UnparsedRelayMsg};
use tracing::trace;

use super::reader::DrainRateRequest;
@@ -229,25 +229,44 @@ impl FlowCtrlHooks for XonXoffFlowCtrl {

/// State for XON/XOFF flow control on a half-stream.
#[derive(Debug)]
pub(crate) struct HalfStreamXonXoffFlowCtrl {}
pub(crate) struct HalfStreamXonXoffFlowCtrl {
    /// The original [`XonXoffFlowCtrl`] from the full stream.
    ///
    /// We keep this since we need to continue validating any incoming messages
    /// and continue applying the sidechannel mitigations.
    inner: XonXoffFlowCtrl,
}

impl HalfStreamXonXoffFlowCtrl {
    /// Returns a new xon/xoff-based state for a half-stream.
    pub(crate) fn new(_flow_ctrl: XonXoffFlowCtrl) -> Self {
        // XXXX
        Self {}
    pub(crate) fn new(flow_ctrl: XonXoffFlowCtrl) -> Self {
        Self { inner: flow_ctrl }
    }
}

impl HalfStreamFlowCtrlHooks for HalfStreamXonXoffFlowCtrl {
    fn handle_incoming_dropped(&mut self, _msg_count: u16) -> Result<()> {
        // XXXX
        todo!()
        // Nothing to do here.
        Ok(())
    }

    fn handle_incoming_msg(&mut self, msg: UnparsedRelayMsg) -> Result<Option<UnparsedRelayMsg>> {
        // XXXX
        todo!()
        match msg.cmd() {
            RelayCmd::SENDME => {
                self.inner.put_for_incoming_sendme(msg)?;
                Ok(None)
            }
            RelayCmd::XON => {
                self.inner.handle_incoming_xon(msg)?;
                Ok(None)
            }
            RelayCmd::XOFF => {
                self.inner.handle_incoming_xoff(msg)?;
                Ok(None)
            }
            // Nothing to do here.
            _ => Ok(Some(msg)),
        }
    }
}

+6 −9
Original line number Diff line number Diff line
@@ -2,11 +2,9 @@

mod halfstream;

use crate::congestion::sendme;
use crate::stream::RECV_WINDOW_INIT;
use crate::stream::StreamMpscReceiver;
use crate::stream::cmdcheck::AnyCmdChecker;
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, HalfStreamFlowCtrlHooks, StreamFlowCtrl};
use crate::stream::queue::StreamQueueSender;
use crate::util::stream_poll_set::{KeyAlreadyInsertedError, StreamPollSet};
use crate::{Error, Result};
@@ -460,13 +458,12 @@ impl StreamMap {
                        ..
                    },
            } = ent;
            // FIXME(eta): we don't copy the receive window, instead just creating a new one,
            //             so a malicious peer can send us slightly more data than they should
            //             be able to; see arti#230.
            let mut recv_window = sendme::StreamRecvWindow::new(RECV_WINDOW_INIT);
            recv_window.decrement_n(dropped)?;

            let mut flow_ctrl = flow_ctrl.half_stream();
            flow_ctrl.handle_incoming_dropped(dropped)?;

            // TODO: would be nice to avoid new_ref.
            let half_stream = HalfStream::new(flow_ctrl, recv_window, cmd_checker);
            let half_stream = HalfStream::new(flow_ctrl, cmd_checker);
            let explicitly_dropped = why == TR::StreamTargetClosed;

            let prev = self.closed_streams.insert(
+18 −48
Original line number Diff line number Diff line
@@ -4,10 +4,9 @@
//! we might still receive some cells.

use crate::Result;
use crate::congestion::sendme::{StreamRecvWindow, cmd_counts_towards_windows};
use crate::stream::cmdcheck::{AnyCmdChecker, StreamStatus};
use crate::stream::flow_ctrl::state::{FlowCtrlHooks, StreamFlowCtrl};
use tor_cell::relaycell::{RelayCmd, UnparsedRelayMsg};
use crate::stream::flow_ctrl::state::{HalfStreamFlowCtrl, HalfStreamFlowCtrlHooks};
use tor_cell::relaycell::UnparsedRelayMsg;

/// Type to track state of half-closed streams.
///
@@ -22,24 +21,16 @@ pub(crate) struct HalfStream {
    /// Flow control for this stream.
    ///
    /// Used to process incoming flow control messages (SENDME, XON, etc).
    flow_control: StreamFlowCtrl,
    /// Receive window for this stream. Used to detect whether we get too
    /// many data cells.
    recvw: StreamRecvWindow,
    flow_control: HalfStreamFlowCtrl,
    /// Object to tell us which cells to accept on this stream.
    cmd_checker: AnyCmdChecker,
}

impl HalfStream {
    /// Create a new half-closed stream.
    pub(crate) fn new(
        flow_control: StreamFlowCtrl,
        recvw: StreamRecvWindow,
        cmd_checker: AnyCmdChecker,
    ) -> Self {
    pub(crate) fn new(flow_control: HalfStreamFlowCtrl, cmd_checker: AnyCmdChecker) -> Self {
        HalfStream {
            flow_control,
            recvw,
            cmd_checker,
        }
    }
@@ -53,29 +44,12 @@ impl HalfStream {
    pub(crate) fn handle_msg(&mut self, msg: UnparsedRelayMsg) -> Result<StreamStatus> {
        use StreamStatus::*;

        // We handle SENDME/XON/XOFF separately, and don't give it to the checker.
        //
        // TODO: this logic is the same as `CircHop::deliver_msg_to_stream`; we should refactor this
        // if possible
        match msg.cmd() {
            RelayCmd::SENDME => {
                self.flow_control.put_for_incoming_sendme(msg)?;
                return Ok(Open);
            }
            RelayCmd::XON => {
                self.flow_control.handle_incoming_xon(msg)?;
                return Ok(Open);
            }
            RelayCmd::XOFF => {
                self.flow_control.handle_incoming_xoff(msg)?;
        let Some(msg) = self.flow_control.handle_incoming_msg(msg)? else {
            // The flow control code consumed the message,
            // which means that it was a flow control message.
            // We don't give flow control messages to the checker below.
            return Ok(Open);
            }
            _ => {}
        }

        if cmd_counts_towards_windows(msg.cmd()) {
            self.recvw.take()?;
        }
        };

        let status = self.cmd_checker.check_msg(&msg)?;
        self.cmd_checker.consume_checked_msg(msg)?;
@@ -99,10 +73,9 @@ mod test {
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use crate::{
        client::stream::OutboundDataCmdChecker,
        congestion::sendme::{StreamRecvWindow, StreamSendWindow},
    };
    use crate::stream::RECV_WINDOW_INIT;
    use crate::stream::flow_ctrl::state::StreamFlowCtrl;
    use crate::{client::stream::OutboundDataCmdChecker, congestion::sendme::StreamSendWindow};
    use rand::{CryptoRng, Rng};
    use tor_basic_utils::test_rng::testing_rng;
    use tor_cell::relaycell::{
@@ -132,8 +105,7 @@ mod test {
        let sendw = StreamSendWindow::new(450);

        let mut hs = HalfStream::new(
            StreamFlowCtrl::new_window(sendw),
            StreamRecvWindow::new(20),
            StreamFlowCtrl::new_window(sendw).half_stream(),
            OutboundDataCmdChecker::new_any(),
        );

@@ -156,8 +128,7 @@ mod test {

    fn hs_new() -> HalfStream {
        HalfStream::new(
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
            StreamRecvWindow::new(20),
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
            OutboundDataCmdChecker::new_any(),
        )
    }
@@ -171,9 +142,9 @@ mod test {
        hs.handle_msg(to_unparsed(&mut rng, msg::Connected::new_empty().into()))
            .unwrap();

        // 20 data cells are okay.
        // `RECV_WINDOW_INIT` (500) data cells are okay.
        let m = msg::Data::new(&b"this offer is unrepeatable"[..]).unwrap();
        for _ in 0_u8..20 {
        for _ in 0_u16..RECV_WINDOW_INIT {
            assert!(
                hs.handle_msg(to_unparsed(&mut rng, m.clone().into()))
                    .is_ok()
@@ -216,8 +187,7 @@ mod test {
                .unwrap();
        }
        let mut hs = HalfStream::new(
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)),
            StreamRecvWindow::new(20),
            StreamFlowCtrl::new_window(StreamSendWindow::new(20)).half_stream(),
            cmd_checker,
        );
        let e = hs