Commit 34c10fea authored by Nick Mathewson's avatar Nick Mathewson 🦀
Browse files

Use an mpsc::unbounded() channel in GuardMgr.

The advantage here is that we no longer have to use a futures-aware
Mutex, or a blocking send operation, and therefore can simplify a
bunch of the GuardMgr APIs to no longer be async.  That'll avoid
having to propagate the asyncness up the stack.

The disadvantage is that unbounded channels are just that: nothing
in the channel prevents us from overfilling it.  Fortunately, the
process that consumes from the channel shouldn't block much, and
the channel only gets filled when we're planning a circuit path.
parent dcca0ec3
Loading
Loading
Loading
Loading
+4 −5
Original line number Diff line number Diff line
@@ -8,10 +8,9 @@ use crate::GuardMgrInner;

use futures::{
    channel::{mpsc, oneshot},
    lock::Mutex,
    stream::{self, StreamExt},
};
use std::sync::Weak;
use std::sync::{Mutex, Weak};

/// A message sent by to the [`report_status_events()`] task.
#[derive(Debug)]
@@ -48,7 +47,7 @@ pub(crate) type MsgResult = Result<Msg, futures::channel::oneshot::Canceled>;
pub(crate) async fn report_status_events(
    runtime: impl tor_rtcompat::SleepProvider,
    inner: Weak<Mutex<GuardMgrInner>>,
    ctrl: mpsc::Receiver<MsgResult>,
    ctrl: mpsc::UnboundedReceiver<MsgResult>,
) {
    // Multiplexes a bunch of one-shot receivers to tell us about guard
    // status outcomes.
@@ -72,7 +71,7 @@ pub(crate) async fn report_status_events(
            Some(Ok(Msg::Status(id, status))) => {
                // We've got a report about a guard status.
                if let Some(inner) = inner.upgrade() {
                    let mut inner = inner.lock().await;
                    let mut inner = inner.lock().expect("Poisoned lock");
                    inner.handle_msg(id, status, &runtime);
                } else {
                    // The guard manager has gone away.
@@ -110,7 +109,7 @@ pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
) {
    loop {
        let delay = if let Some(inner) = inner.upgrade() {
            let mut inner = inner.lock().await;
            let mut inner = inner.lock().expect("Poisoned lock");
            let wallclock = runtime.wallclock();
            let now = runtime.now();
            inner.run_periodic_events(wallclock, now)
+18 −24
Original line number Diff line number Diff line
@@ -129,13 +129,11 @@
//     filtered

use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::task::{SpawnError, SpawnExt};
use futures::SinkExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime};
use tracing::{debug, info, trace, warn};

@@ -169,13 +167,6 @@ pub struct GuardMgr<R: Runtime> {
    runtime: R,

    /// Internal state for the guard manager.
    // TODO: I wish I could use a regular mutex rather than a
    // futures::lock::Mutex, but I don't see how that's feasible.  We
    // need to get access to inner.ctrl and then send over it, which
    // means we need an async mutex.
    //
    // Conceivably, I could move ctrl out to GuardMgr, and then put it
    // under a sync::Mutex.  Is that smart?
    inner: Arc<Mutex<GuardMgrInner>>,
}

@@ -213,7 +204,14 @@ struct GuardMgrInner {

    /// A mpsc channel, used to tell the task running in
    /// [`daemon::report_status_events`] about a new event to monitor.
    ctrl: mpsc::Sender<daemon::MsgResult>,
    ///
    /// This uses an `UnboundedSener` so that we don't have to await
    /// while sending the message, which in turn allows the GuardMgr
    /// API to be simpler.  The risk, however, is that there's no
    /// backpressure in the event that the task running
    /// [`daemon::report_status_events`] fails to read from this
    /// channel.
    ctrl: mpsc::UnboundedSender<daemon::MsgResult>,

    /// Information about guards that we've given out, but where we have
    /// not yet heard whether the guard was successful.
@@ -249,7 +247,7 @@ impl<R: Runtime> GuardMgr<R> {
    where
        S: StateMgr + Send + Sync + 'static,
    {
        let (ctrl, rcv) = mpsc::channel(32);
        let (ctrl, rcv) = mpsc::unbounded();
        let default_storage = state_mgr.create_handle("default_guards");
        let active_guards = default_storage.load()?.unwrap_or_else(GuardSet::new);
        let inner = Arc::new(Mutex::new(GuardMgrInner {
@@ -280,7 +278,7 @@ impl<R: Runtime> GuardMgr<R> {
    /// Return true if we were able to save, and false if we couldn't
    /// get the lock.
    pub async fn update_persistent_state(&self) -> Result<bool, GuardMgrError> {
        let inner = self.inner.lock().await;
        let inner = self.inner.lock().expect("Poisoned lock");
        if inner.default_storage.try_lock()? {
            trace!("Flushing guard state to disk.");
            inner.default_storage.store(&inner.active_guards)?;
@@ -302,7 +300,7 @@ impl<R: Runtime> GuardMgr<R> {
        trace!("Updating guard state from network directory");
        let now = self.runtime.wallclock();

        let mut inner = self.inner.lock().await;
        let mut inner = self.inner.lock().expect("Poisoned lock");

        inner.update(now, Some(netdir));
    }
@@ -321,7 +319,7 @@ impl<R: Runtime> GuardMgr<R> {
    /// things somehow. (TODO)
    pub async fn note_internet_activity(&self) {
        let now = self.runtime.now();
        let mut inner = self.inner.lock().await;
        let mut inner = self.inner.lock().expect("Poisoned lock");
        inner.last_time_on_internet = Some(now);
    }

@@ -345,7 +343,7 @@ impl<R: Runtime> GuardMgr<R> {
        };

        let now = self.runtime.wallclock();
        let mut inner = self.inner.lock().await;
        let mut inner = self.inner.lock().expect("Poisoned lock");

        let restrictive_filter = frac_permitted < inner.params.filter_threshold;

@@ -394,7 +392,7 @@ impl<R: Runtime> GuardMgr<R> {
        let now = self.runtime.now();
        let wallclock = self.runtime.wallclock();

        let mut inner = self.inner.lock().await;
        let mut inner = self.inner.lock().expect("Poisoned lock");

        // (I am not 100% sure that we need to consider_all_retries here, but
        // it should _probably_ not hurt.)
@@ -419,12 +417,9 @@ impl<R: Runtime> GuardMgr<R> {

        inner.active_guards.record_attempt(&guard_id, now);

        // Have to do this while not holding lock, since it awaits.
        // TODO: I wish this function didn't have to be async.
        inner
            .ctrl
            .send(Ok(daemon::Msg::Observe(rcv)))
            .await
            .unbounded_send(Ok(daemon::Msg::Observe(rcv)))
            .expect("Guard observer task exited prematurely");

        Ok((guard_id, monitor, usable))
@@ -437,11 +432,10 @@ impl<R: Runtime> GuardMgr<R> {
        let (snd, rcv) = futures::channel::oneshot::channel();
        let pingmsg = daemon::Msg::Ping(snd);
        {
            let mut inner = self.inner.lock().await;
            let inner = self.inner.lock().expect("Poisoned lock");
            inner
                .ctrl
                .send(Ok(pingmsg))
                .await
                .unbounded_send(Ok(pingmsg))
                .expect("Guard observer task exited permaturely.");
        }
        let _ = rcv.await;