Commit 371437d3 authored by eta's avatar eta 💚
Browse files

Refactor tor_proto::channel::Reactor to use an UnboundedSender

There wasn't any good reason for tor-proto's channel reactor to use a
shedload of oneshot channels instead of just an mpsc UnboundedSender,
and the whole `CtrlResult` thing made even less sense.

Straighten this code out by replacing all of that machinery with a
simple UnboundedSender, instead.

(part of #218)
parent e9cbf8c5
......@@ -59,7 +59,7 @@ mod handshake;
mod reactor;
mod unique_id;
use crate::channel::reactor::{CtrlMsg, CtrlResult};
use crate::channel::reactor::CtrlMsg;
pub use crate::channel::unique_id::UniqId;
use crate::circuit;
use crate::circuit::celltypes::CreateResponse;
......@@ -135,10 +135,7 @@ struct ChannelImpl {
// it all the time.
circmap: Weak<Mutex<circmap::CircMap>>,
/// A stream used to send control messages to the Reactor.
sendctrl: mpsc::Sender<CtrlResult>,
/// A oneshot sender used to tell the Reactor task to shut down.
sendclosed: Option<oneshot::Sender<CtrlMsg>>,
control: mpsc::UnboundedSender<CtrlMsg>,
/// Context for allocating unique circuit log identifiers.
circ_unique_id_ctx: unique_id::CircUniqIdContext,
}
......@@ -209,15 +206,13 @@ impl Channel {
use circmap::{CircIdRange, CircMap};
let circmap = Arc::new(Mutex::new(CircMap::new(CircIdRange::High)));
let (sendctrl, recvctrl) = mpsc::channel::<CtrlResult>(128);
let (sendclosed, recvclosed) = oneshot::channel::<CtrlMsg>();
let (control_tx, control_rx) = mpsc::unbounded();
let inner = ChannelImpl {
tls: tls_sink,
link_protocol,
circmap: Arc::downgrade(&circmap),
sendctrl,
sendclosed: Some(sendclosed),
control: control_tx,
circ_unique_id_ctx: unique_id::CircUniqIdContext::new(),
};
let inner = Mutex::new(inner);
......@@ -233,8 +228,7 @@ impl Channel {
let reactor = reactor::Reactor::new(
&Arc::clone(&channel),
circmap,
recvctrl,
recvclosed,
control_rx,
tls_stream,
unique_id,
);
......@@ -346,27 +340,25 @@ impl Channel {
// TODO: blocking is risky, but so is unbounded.
let (sender, receiver) = mpsc::channel(128);
let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
let (send_circ_destroy, recv_circ_destroy) = oneshot::channel();
let (circ_unique_id, id) = {
let (circ_unique_id, id, reactor_tx) = {
let mut inner = self.inner.lock().await;
inner
.sendctrl
.send(Ok(CtrlMsg::Register(recv_circ_destroy)))
.await
.map_err(|_| Error::InternalError("Can't queue circuit closer".into()))?;
if let Some(circmap) = inner.circmap.upgrade() {
let my_unique_id = self.unique_id;
let circ_unique_id = inner.circ_unique_id_ctx.next(my_unique_id);
let mut cmap = circmap.lock().await;
(circ_unique_id, cmap.add_ent(rng, createdsender, sender)?)
(
circ_unique_id,
cmap.add_ent(rng, createdsender, sender)?,
inner.control.clone(),
)
} else {
return Err(Error::ChannelClosed);
}
};
trace!("{}: Allocated CircId {}", circ_unique_id, id);
let destroy_handle = CircDestroyHandle::new(id, send_circ_destroy);
let destroy_handle = CircDestroyHandle::new(id, reactor_tx);
Ok(circuit::PendingClientCirc::new(
id,
......@@ -411,9 +403,8 @@ impl ChannelImpl {
/// Shut down this channel's reactor; causes all circuits using
/// this channel to become unusable.
fn shutdown_reactor(&mut self) {
if let Some(sender) = self.sendclosed.take() {
let _ignore = sender.send(CtrlMsg::Shutdown);
}
// FIXME(eta): this shouldn't be required
let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
}
}
......@@ -422,26 +413,20 @@ impl ChannelImpl {
pub(crate) struct CircDestroyHandle {
/// The circuit ID in question
id: CircId,
/// A oneshot sender to tell the reactor. This has to be a oneshot,
/// so that we can send to it on drop.
sender: Option<oneshot::Sender<CtrlMsg>>,
/// A sender to tell the reactor.
sender: mpsc::UnboundedSender<CtrlMsg>,
}
impl CircDestroyHandle {
/// Create a new CircDestroyHandle
fn new(id: CircId, sender: oneshot::Sender<CtrlMsg>) -> Self {
CircDestroyHandle {
id,
sender: Some(sender),
}
fn new(id: CircId, sender: mpsc::UnboundedSender<CtrlMsg>) -> Self {
CircDestroyHandle { id, sender }
}
}
impl Drop for CircDestroyHandle {
fn drop(&mut self) {
if let Some(s) = self.sender.take() {
let _ignore_cancel = s.send(CtrlMsg::CloseCircuit(self.id));
}
let _ignore_cancel = self.sender.unbounded_send(CtrlMsg::CloseCircuit(self.id));
}
}
......@@ -463,7 +448,7 @@ pub(crate) mod test {
pub(crate) struct FakeChanHandle {
pub(crate) cells: mpsc::Receiver<ChanCell>,
circmap: Arc<Mutex<circmap::CircMap>>,
ignore_control_msgs: mpsc::Receiver<CtrlResult>,
ignore_control_msgs: mpsc::UnboundedReceiver<CtrlMsg>,
}
/// Make a new fake reactor-less channel. For testing only, obviously.
......@@ -471,7 +456,7 @@ pub(crate) mod test {
/// This function is used for testing _circuits_, not channels.
pub(crate) fn fake_channel() -> (Arc<Channel>, FakeChanHandle) {
let (cell_send, cell_recv) = mpsc::channel(64);
let (ctrl_send, ctrl_recv) = mpsc::channel(64);
let (control_tx, control_rx) = mpsc::unbounded();
let cell_send = cell_send.sink_map_err(|_| {
tor_cell::Error::InternalError("Error from mpsc stream while testing".into())
......@@ -484,8 +469,7 @@ pub(crate) mod test {
link_protocol: 4,
tls: Box::new(cell_send),
circmap: Arc::downgrade(&circmap),
sendctrl: ctrl_send,
sendclosed: None,
control: control_tx,
circ_unique_id_ctx: unique_id::CircUniqIdContext::new(),
};
let channel = Channel {
......@@ -498,7 +482,7 @@ pub(crate) mod test {
let handle = FakeChanHandle {
cells: cell_recv,
circmap,
ignore_control_msgs: ctrl_recv,
ignore_control_msgs: control_rx,
};
(Arc::new(channel), handle)
......
......@@ -14,7 +14,7 @@ use crate::{Error, Result};
use tor_cell::chancell::msg::{Destroy, DestroyReason};
use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::select_biased;
use futures::sink::SinkExt;
......@@ -31,25 +31,10 @@ use tracing::{debug, trace};
pub(super) enum CtrlMsg {
/// Shut down the reactor.
Shutdown,
/// Register a new one-shot receiver that can send a CtrlMsg to the
/// reactor.
Register(oneshot::Receiver<CtrlMsg>),
/// Tell the reactor that a given circuit has gone away.
CloseCircuit(CircId),
}
/// Type returned by a oneshot channel for a CtrlMsg.
///
/// TODO: copy documentation from circuit::reactor if we don't unify
/// these types somehow.
pub(super) type CtrlResult = std::result::Result<CtrlMsg, oneshot::Canceled>;
/// A stream to multiplex over a bunch of oneshot CtrlMsg replies.
///
/// TODO: copy documentation from circuit::reactor if we don't unify
/// these types somehow.
type OneshotStream = stream::FuturesUnordered<oneshot::Receiver<CtrlMsg>>;
/// Object to handle incoming cells and background tasks on a channel.
///
/// This type is returned when you finish a channel; you need to spawn a
......@@ -64,7 +49,7 @@ where
///
/// TODO: copy documentation from circuit::reactor if we don't unify
/// these types somehow.
control: stream::Fuse<stream::Select<mpsc::Receiver<CtrlResult>, OneshotStream>>,
control: mpsc::UnboundedReceiver<CtrlMsg>,
/// A Stream from which we can read ChanCells. This should be backed
/// by a TLS connection.
input: stream::Fuse<T>,
......@@ -95,16 +80,12 @@ where
pub(super) fn new(
channel: &Arc<super::Channel>,
circmap: Arc<Mutex<CircMap>>,
control: mpsc::Receiver<CtrlResult>,
closeflag: oneshot::Receiver<CtrlMsg>,
control: mpsc::UnboundedReceiver<CtrlMsg>,
input: T,
unique_id: UniqId,
) -> Self {
let oneshots = stream::FuturesUnordered::new();
oneshots.push(closeflag);
let control = stream::select(control, oneshots);
Reactor {
control: control.fuse(),
control,
input: input.fuse(),
channel: Arc::downgrade(channel),
circs: circmap,
......@@ -150,11 +131,9 @@ where
// we got a control message!
ctrl = self.control.next() => {
match ctrl {
Some(Ok(CtrlMsg::Shutdown)) =>
return Err(ReactorError::Shutdown),
Some(Ok(msg)) => self.handle_control(msg).await?,
Some(Err(_)) => (), // sender cancelled; ignore.
None => panic!() // should be impossible.
Some(CtrlMsg::Shutdown) => return Err(ReactorError::Shutdown),
Some(msg) => self.handle_control(msg).await?,
None => return Err(ReactorError::Shutdown),
}
}
// we got a cell or a close.
......@@ -176,18 +155,11 @@ where
trace!("{}: reactor received {:?}", self.unique_id, msg);
match msg {
CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
CtrlMsg::Register(ch) => self.register(ch),
CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
}
Ok(())
}
/// Ensure that we get a message on self.control when `ch` fires.
fn register(&mut self, ch: oneshot::Receiver<CtrlMsg>) {
let (_, stream) = self.control.get_mut().get_mut();
stream.push(ch);
}
/// Helper: process a cell on a channel. Most cell types get ignored
/// or rejected; a few get delivered to circuits.
async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> {
......@@ -438,8 +410,6 @@ pub(crate) mod test {
let (pending, _circr) = chan.new_circ(&mut rng).await.unwrap();
reactor.run_once().await.unwrap();
let id = pending.peek_circid().await;
{
......@@ -473,8 +443,6 @@ pub(crate) mod test {
let circparams = CircParameters::default();
reactor.run_once().await.unwrap();
let id = pending.peek_circid().await;
{
......@@ -587,6 +555,7 @@ pub(crate) mod test {
#[async_test]
async fn deliver_relay() {
use crate::circuit::celltypes::ClientCircChanMsg;
use futures::channel::oneshot;
use tor_cell::chancell::msg;
let (_chan, mut reactor, _output, mut input) = new_reactor();
......@@ -664,6 +633,7 @@ pub(crate) mod test {
#[async_test]
async fn deliver_destroy() {
use crate::circuit::celltypes::*;
use futures::channel::oneshot;
use tor_cell::chancell::msg;
let (_chan, mut reactor, _output, mut input) = new_reactor();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment