Remove manual future polling from circuit reactor
Motivation
Currently, the circuit reactor main loop contains a hand-rolled future
implementation (using poll_fn
), which polls various futures,
driving them to completion (almost) in lock-step to avoid blocking the reactor
loop on any of them.
IMO this implementation is hard to grok and quite fragile, and so could benefit from being refactored to use some of our existing abstractions for dealing with futures.
I consider this a prerequisite for #1803
Proposed refactoring steps (tentative)
Considerations:
- Need to be mindful about future cancellation
- Can use
prepare_send_from
to checkchan_sender
sink readiness. Butprepare_send_from
only guarantees the sink is ready for one message, whereas many of the reactor functions (handle_cell
,send_relay_cell
, etc.) handle multiple messages at once, and combine message processing with sending. We'll need to rewrite them to- separate message processing/generation from sending
- process one message at a time
Currently, send_relay_cell
is called from multiple functions. After this
refactoring, it will only be called from run_once()
, when the
prepare_send_from
future resolves (i.e. when the chan_sender
sink is ready).
I propose we rewrite run_once()
like so:
/// Instructions for sending a relay cell.
//
// The contents of this struct are passed to `send_relay_cell`
struct SendRelayCell {
hop: HopNum,
early: bool,
msg: AnyRelayMsgOuter,
}
impl Reactor {
async fn run_once(&mut self) -> StdResult<(), ReactorError> {
if self.hops.is_empty() {
self.wait_for_create().await?;
return Ok(());
}
// XXX: continue reading from input even if chan_sender is not ready?
let (msg, sendable) = self.chan_sender.prepare_send_from(async {
select_biased! {
// Select between receiving control messages,
// `ChanCell`s from `input`, and ready messages
// from self.hops[i].map.poll_ready_streams
}
// The above select_biased! resolves to a Result<Option<SendRelayMsg>>,
// where `SendRelayMsg` contains the args needed by `send_relay_cell`
// to encode and encrypt the cell for a given hop
}).await?;
// FLUSHING: `chan_sender` flushing is handled internally by
// the `prepare_send_from` impl.
if let Some(msg) = msg {
// NOTE: send_relay_cell no longer writes to `self.chan_sender`
// directly, but rather to the provided `sendable` ready-to-send
// sink (which is actually a view of the `chan_sender` sink).
self.send_relay_cell(sendable, msg);
}
}
}
Some observations:
-
we stop reading from all of our inputs if
chan_sender
is not ready, including the control channel -
currently, it is possible for control messages to add unboundedly to
chan_sender
:/// Sender object used to actually send cells. /// /// NOTE: Control messages could potentially add unboundedly to this, although that's /// not likely to happen (and isn't triggereable from the network, either).
After this refactoring, this won't be the case anymore, because we will avoid writing to
chan_sender
if it's not ready. Sochan_sender
probably won't need to beSometimesUnboundedSink
anymore. -
send_outbound()
:- replace
send_outbound()
with iterator-like structure that asynchronously pulls one message at a time from the streams of each hop (essentially an iterator over the outbound messages of each hop. Round robin, ensuring fairness across hops? Fairness across streams is guaranteed byStreamMap
) - stop using
poll_ready_unpin_bool()
(its caller will be usingprepare_send_from()
to check if the sink is ready to receive a message) - remove the
send_relay_cell()
call (send_relay_cell()
will be called by the caller ofprepare_send_from()
when the sink is ready)
- replace
-
control
message processing:- instead of calling
send_relay_cell()
,handle_control()
will returnOption<SendRelayCell>
containing the args that need to be passed tosend_relay_cell()
, which will get called outside ofhandle_control()
, when thechan_sender
sink is ready
- instead of calling
-
input
processing:-
handle_relay_cell()
will need some rethinking
-
The above glosses over many non-trivial details that will need to be ironed out.