Skip to content

Remove manual future polling from circuit reactor

Motivation

Currently, the circuit reactor main loop contains a hand-rolled future implementation (using poll_fn), which polls various futures, driving them to completion (almost) in lock-step to avoid blocking the reactor loop on any of them.

IMO this implementation is hard to grok and quite fragile, and so could benefit from being refactored to use some of our existing abstractions for dealing with futures.

I consider this a prerequisite for #1803

Proposed refactoring steps (tentative)

Considerations:

  • Need to be mindful about future cancellation
  • Can use prepare_send_from to check chan_sender sink readiness. But prepare_send_from only guarantees the sink is ready for one message, whereas many of the reactor functions (handle_cell, send_relay_cell, etc.) handle multiple messages at once, and combine message processing with sending. We'll need to rewrite them to
    • separate message processing/generation from sending
    • process one message at a time

Currently, send_relay_cell is called from multiple functions. After this refactoring, it will only be called from run_once(), when the prepare_send_from future resolves (i.e. when the chan_sender sink is ready).

I propose we rewrite run_once() like so:

/// Instructions for sending a relay cell.
//
// The contents of this struct are passed to `send_relay_cell`
struct SendRelayCell {
    hop: HopNum,
    early: bool,
    msg: AnyRelayMsgOuter,
}

impl Reactor {
    async fn run_once(&mut self) -> StdResult<(), ReactorError> {
        if self.hops.is_empty() {
            self.wait_for_create().await?;

            return Ok(());
        }

        // XXX: continue reading from input even if chan_sender is not ready?
        let (msg, sendable) = self.chan_sender.prepare_send_from(async {
            select_biased! {
                // Select between receiving control messages,
                // `ChanCell`s from `input`, and ready messages
                // from  self.hops[i].map.poll_ready_streams
            }
            // The above select_biased! resolves to a Result<Option<SendRelayMsg>>,
            // where `SendRelayMsg` contains the args needed by `send_relay_cell`
            // to encode and encrypt the cell for a given hop
        }).await?;

        // FLUSHING: `chan_sender` flushing is handled internally by
        // the `prepare_send_from` impl.

        if let Some(msg) = msg {
            // NOTE: send_relay_cell no longer writes to `self.chan_sender`
            // directly, but rather to the provided `sendable` ready-to-send
            // sink (which is actually a view of the `chan_sender` sink).
            self.send_relay_cell(sendable, msg);
        }
    }
}

Some observations:

  • we stop reading from all of our inputs if chan_sender is not ready, including the control channel

  • currently, it is possible for control messages to add unboundedly to chan_sender:

    /// Sender object used to actually send cells.
    ///
    /// NOTE: Control messages could potentially add unboundedly to this, although that's
    ///       not likely to happen (and isn't triggereable from the network, either).

    After this refactoring, this won't be the case anymore, because we will avoid writing to chan_sender if it's not ready. So chan_sender probably won't need to be SometimesUnboundedSink anymore.

  • send_outbound():

    • replace send_outbound() with iterator-like structure that asynchronously pulls one message at a time from the streams of each hop (essentially an iterator over the outbound messages of each hop. Round robin, ensuring fairness across hops? Fairness across streams is guaranteed by StreamMap)
    • stop using poll_ready_unpin_bool() (its caller will be using prepare_send_from() to check if the sink is ready to receive a message)
    • remove the send_relay_cell() call (send_relay_cell() will be called by the caller of prepare_send_from() when the sink is ready)
  • control message processing:

    • instead of calling send_relay_cell(), handle_control() will return Option<SendRelayCell> containing the args that need to be passed to send_relay_cell(), which will get called outside of handle_control(), when the chan_sender sink is ready
  • input processing:

    • handle_relay_cell() will need some rethinking

The above glosses over many non-trivial details that will need to be ironed out.

Edited by gabi-250