Commit 30e77785 authored by eta's avatar eta
Browse files

Merge branch 'task-scheduler-2' into 'main'

Implement a periodic task scheduler, and a basic dormant mode

See merge request !429
parents 791394cd ee47a166
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ postage = { version = "0.4", default-features = false, features = ["futures-trai
tracing = "0.1.18"
serde = { version = "1.0.103", features = ["derive"] }
thiserror = "1"
pin-project = "1"

[dev-dependencies]
tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0", features=["tokio", "native-tls" ] }
+111 −35
Original line number Diff line number Diff line
@@ -23,11 +23,13 @@ use futures::task::SpawnExt;
use std::convert::TryInto;
use std::net::IpAddr;
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

use crate::err::ErrorDetail;
use crate::{status, util, TorClientBuilder};
use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule};
use tracing::{debug, error, info, warn};

/// An active client session on the Tor network.
@@ -83,6 +85,12 @@ pub struct TorClient<R: Runtime> {
    /// bootstrapping. If this is `false`, we will just call `wait_for_bootstrap`
    /// instead.
    should_bootstrap: BootstrapBehavior,

    /// Handles to periodic background tasks, useful for suspending them later.
    periodic_task_handles: Vec<TaskHandle>,

    /// Shared boolean for whether we're currently in "dormant mode" or not.
    dormant: Arc<AtomicBool>,
}

/// Preferences for whether a [`TorClient`] should bootstrap on its own or not.
@@ -103,6 +111,17 @@ pub enum BootstrapBehavior {
    Manual,
}

/// What level of sleep to put a Tor client into.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum DormantMode {
    /// The client functions as normal, and background tasks run periodically.
    Normal,
    /// Background tasks are suspended, conserving CPU usage. Attempts to use the client will
    /// wake it back up again.
    Soft,
}

/// Preferences for how to route a stream over the Tor network.
#[derive(Debug, Default, Clone)]
pub struct StreamPrefs {
@@ -354,6 +373,8 @@ impl<R: Runtime> TorClient<R> {
            .build(runtime.clone(), Arc::clone(&circmgr), dir_cfg)
            .map_err(crate::Error::into_detail)?;

        let mut periodic_task_handles = vec![];

        let conn_status = chanmgr.bootstrap_events();
        let dir_status = dirmgr.bootstrap_events();
        runtime
@@ -364,9 +385,12 @@ impl<R: Runtime> TorClient<R> {
            ))
            .map_err(|e| ErrorDetail::from_spawn("top-level status reporter", e))?;

        let (expiry_sched, expiry_handle) = TaskSchedule::new(runtime.clone());
        periodic_task_handles.push(expiry_handle);

        runtime
            .spawn(continually_expire_channels(
                runtime.clone(),
                expiry_sched,
                Arc::downgrade(&chanmgr),
            ))
            .map_err(|e| ErrorDetail::from_spawn("channel expiration task", e))?;
@@ -381,25 +405,34 @@ impl<R: Runtime> TorClient<R> {
            ))
            .map_err(|e| ErrorDetail::from_spawn("circmgr parameter updater", e))?;

        let (persist_sched, persist_handle) = TaskSchedule::new(runtime.clone());
        periodic_task_handles.push(persist_handle);

        runtime
            .spawn(update_persistent_state(
                runtime.clone(),
                persist_sched,
                Arc::downgrade(&circmgr),
                statemgr.clone(),
            ))
            .map_err(|e| ErrorDetail::from_spawn("persistent state updater", e))?;

        let (timeout_sched, timeout_handle) = TaskSchedule::new(runtime.clone());
        periodic_task_handles.push(timeout_handle);

        runtime
            .spawn(continually_launch_timeout_testing_circuits(
                runtime.clone(),
                timeout_sched,
                Arc::downgrade(&circmgr),
                Arc::downgrade(&dirmgr),
            ))
            .map_err(|e| ErrorDetail::from_spawn("timeout-probe circuit launcher", e))?;

        let (preempt_sched, preempt_handle) = TaskSchedule::new(runtime.clone());
        periodic_task_handles.push(preempt_handle);

        runtime
            .spawn(continually_preemptively_build_circuits(
                runtime.clone(),
                preempt_sched,
                Arc::downgrade(&circmgr),
                Arc::downgrade(&dirmgr),
            ))
@@ -420,6 +453,8 @@ impl<R: Runtime> TorClient<R> {
            status_receiver,
            bootstrap_in_progress: Arc::new(AsyncMutex::new(())),
            should_bootstrap: autobootstrap,
            periodic_task_handles,
            dormant: Arc::new(AtomicBool::new(false)),
        })
    }

@@ -499,6 +534,10 @@ impl<R: Runtime> TorClient<R> {
                self.bootstrap_in_progress.lock().await;
            }
        }
        // NOTE(eta): will need to be changed when hard dormant mode is introduced
        if self.dormant.load(Ordering::SeqCst) {
            self.set_dormant(DormantMode::Normal);
        }
        Ok(())
    }

@@ -860,6 +899,32 @@ impl<R: Runtime> TorClient<R> {
    pub fn bootstrap_events(&self) -> status::BootstrapEvents {
        self.status_receiver.clone()
    }

    /// Change the client's current dormant mode, putting background tasks to sleep
    /// or waking them up as appropriate.
    ///
    /// This can be used to conserve CPU usage if you aren't planning on using the
    /// client for a while, especially on mobile platforms.
    ///
    /// See the [`DormantMode`] documentation for more details.
    pub fn set_dormant(&self, mode: DormantMode) {
        let is_dormant = matches!(mode, DormantMode::Soft);

        // Do an atomic compare-exchange. If it succeeds, we just flipped `self.dormant`.
        if self
            .dormant
            .compare_exchange(!is_dormant, is_dormant, Ordering::SeqCst, Ordering::SeqCst)
            .is_ok()
        {
            for task in self.periodic_task_handles.iter() {
                if is_dormant {
                    task.cancel();
                } else {
                    task.fire();
                }
            }
        }
    }
}

/// Alias for TorError::from(Error)
@@ -922,14 +987,14 @@ async fn keep_circmgr_params_updated<R: Runtime>(
///
/// This is a daemon task: it runs indefinitely in the background.
async fn update_persistent_state<R: Runtime>(
    runtime: R,
    mut sched: TaskSchedule<R>,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    statemgr: FsStateMgr,
) {
    // TODO: Consider moving this function into tor-circmgr after we have more
    // experience with the state system.

    loop {
    while sched.next().await.is_some() {
        if let Some(circmgr) = Weak::upgrade(&circmgr) {
            use tor_persist::LockStatus::*;

@@ -968,7 +1033,7 @@ async fn update_persistent_state<R: Runtime>(
        // we should be updating more frequently when the data is volatile
        // or has important info to save, and not at all when there are no
        // changes.
        runtime.sleep(Duration::from_secs(60)).await;
        sched.fire_in(Duration::from_secs(60));
    }

    error!("State update task is exiting prematurely.");
@@ -987,11 +1052,12 @@ async fn update_persistent_state<R: Runtime>(
/// see [`tor_circmgr::CircMgr::launch_timeout_testing_circuit_if_appropriate`]
/// for more information.
async fn continually_launch_timeout_testing_circuits<R: Runtime>(
    rt: R,
    mut sched: TaskSchedule<R>,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
    while sched.next().await.is_some() {
        if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
            if let Some(netdir) = dm.latest_netdir() {
                if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
                    warn!("Problem launching a timeout testing circuit: {}", e);
@@ -1003,11 +1069,14 @@ async fn continually_launch_timeout_testing_circuits<R: Runtime>(
                    .expect("Out-of-bounds value from BoundedInt32");

                drop((cm, dm));
            rt.sleep(delay).await;
                sched.fire_in(delay);
            } else {
                // TODO(eta): ideally, this should wait until we successfully bootstrap using
                //            the bootstrap status API
            rt.sleep(Duration::from_secs(10)).await;
                sched.fire_in(Duration::from_secs(10));
            }
        } else {
            return;
        }
    }
}
@@ -1024,19 +1093,23 @@ async fn continually_launch_timeout_testing_circuits<R: Runtime>(
/// This would be better handled entirely within `tor-circmgr`, like
/// other daemon tasks.
async fn continually_preemptively_build_circuits<R: Runtime>(
    rt: R,
    mut sched: TaskSchedule<R>,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
    while sched.next().await.is_some() {
        if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
            if let Some(netdir) = dm.latest_netdir() {
                cm.launch_circuits_preemptively(DirInfo::Directory(&netdir))
                    .await;
            rt.sleep(Duration::from_secs(10)).await;
                sched.fire_in(Duration::from_secs(10));
            } else {
                // TODO(eta): ideally, this should wait until we successfully bootstrap using
                //            the bootstrap status API
            rt.sleep(Duration::from_secs(10)).await;
                sched.fire_in(Duration::from_secs(10));
            }
        } else {
            return;
        }
    }
}
@@ -1046,8 +1119,11 @@ async fn continually_preemptively_build_circuits<R: Runtime>(
/// Exist when we find that `chanmgr` is dropped
///
/// This is a daemon task that runs indefinitely in the background
async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmgr::ChanMgr<R>>) {
    loop {
async fn continually_expire_channels<R: Runtime>(
    mut sched: TaskSchedule<R>,
    chanmgr: Weak<tor_chanmgr::ChanMgr<R>>,
) {
    while sched.next().await.is_some() {
        let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
            cm.expire_channels()
        } else {
@@ -1055,7 +1131,7 @@ async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmg
            return;
        };
        // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
        rt.sleep(Duration::from_secs(delay.as_secs())).await;
        sched.fire_in(Duration::from_secs(delay.as_secs()));
    }
}

+1 −1
Original line number Diff line number Diff line
@@ -225,7 +225,7 @@ pub mod status;

pub use address::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr, TorAddrError};
pub use builder::TorClientBuilder;
pub use client::{BootstrapBehavior, StreamPrefs, TorClient};
pub use client::{BootstrapBehavior, DormantMode, StreamPrefs, TorClient};
pub use config::TorClientConfig;

pub use tor_circmgr::isolation;
+4 −0
Original line number Diff line number Diff line
@@ -175,6 +175,7 @@ pub mod task;

mod compound;
mod opaque;
pub mod scheduler;
mod timer;
mod traits;

@@ -405,6 +406,9 @@ pub mod cond {
///
/// (This is a macro so that it can repeat the closure as multiple separate
/// expressions, so it can take on two different types, if needed.)
//
// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use
//            this macro, like in scheduler.rs
#[macro_export]
#[cfg(all(
    any(feature = "native-tls", feature = "rustls"),
+280 −0
Original line number Diff line number Diff line
//! Utilities for dealing with periodic recurring tasks.

use crate::SleepProvider;
use futures::channel::mpsc;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use pin_project::pin_project;

/// A command sent from task handles to schedule objects.
#[derive(Copy, Clone)]
enum SchedulerCommand {
    /// Run the task now.
    Fire,
    /// Run the task at the provided `Instant`.
    FireAt(Instant),
    /// Cancel a pending execution, if there is one.
    Cancel,
}

/// A remotely-controllable trigger for recurring tasks.
///
/// This implements [`Stream`], and is intended to be used in a `while` loop; you should
/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
#[pin_project(project = TaskScheduleP)]
pub struct TaskSchedule<R: SleepProvider> {
    /// If we're waiting for a deadline to expire, the future for that.
    sleep: Option<Pin<Box<R::SleepFuture>>>,
    /// Receiver of scheduler commands from handles.
    rx: UnboundedReceiver<SchedulerCommand>,
    /// Runtime.
    rt: R,
    /// Whether or not to yield a result immediately when polled, once.
    ///
    /// This is used to avoid having to create a `SleepFuture` with zero duration,
    /// which is potentially a bit wasteful.
    instant_fire: bool,
}

/// A handle used to control a [`TaskSchedule`].
#[derive(Clone)]
pub struct TaskHandle {
    /// Sender of scheduler commands to the corresponding schedule.
    tx: UnboundedSender<SchedulerCommand>,
}

impl<R: SleepProvider> TaskSchedule<R> {
    /// Create a new schedule, and corresponding handle.
    pub fn new(rt: R) -> (Self, TaskHandle) {
        let (tx, rx) = mpsc::unbounded();
        (
            Self {
                sleep: None,
                rx,
                rt,
                // Start off ready.
                instant_fire: true,
            },
            TaskHandle { tx },
        )
    }

    /// Trigger the schedule after `dur`.
    pub fn fire_in(&mut self, dur: Duration) {
        self.instant_fire = false;
        self.sleep = Some(Box::pin(self.rt.sleep(dur)));
    }
}

impl TaskHandle {
    /// Trigger this handle's corresponding schedule now.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn fire(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
    }
    /// Trigger this handle's corresponding schedule at `instant`.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn fire_at(&self, instant: Instant) -> bool {
        self.tx
            .unbounded_send(SchedulerCommand::FireAt(instant))
            .is_ok()
    }
    /// Cancel a pending firing of the handle's corresponding schedule.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn cancel(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
    }
}

// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
//            to require `R: Unpin`. Accordingly, all the fields are mutable references.
impl<R: SleepProvider> TaskScheduleP<'_, R> {
    /// Handle an internal command.
    fn handle_command(&mut self, cmd: SchedulerCommand) {
        match cmd {
            SchedulerCommand::Fire => {
                *self.instant_fire = true;
                *self.sleep = None;
            }
            SchedulerCommand::FireAt(instant) => {
                let now = self.rt.now();
                let dur = instant.saturating_duration_since(now);
                *self.instant_fire = false;
                *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
            }
            SchedulerCommand::Cancel => {
                *self.instant_fire = false;
                *self.sleep = None;
            }
        }
    }
}

impl<R: SleepProvider> Stream for TaskSchedule<R> {
    type Item = ();

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
            match maybe_cmd {
                Some(c) => this.handle_command(c),
                None => {
                    // All task handles dropped; return end of stream.
                    return Poll::Ready(None);
                }
            }
        }
        if *this.instant_fire {
            *this.instant_fire = false;
            return Poll::Ready(Some(()));
        }
        if this
            .sleep
            .as_mut()
            .map(|x| x.as_mut().poll(cx).is_ready())
            .unwrap_or(false)
        {
            *this.sleep = None;
            return Poll::Ready(Some(()));
        }
        Poll::Pending
    }
}

// test_with_all_runtimes! only exists if these features are satisfied.
#[cfg(all(
    test,
    any(feature = "native-tls", feature = "rustls"),
    any(feature = "tokio", feature = "async-std"),
))]
mod test {
    use crate::scheduler::TaskSchedule;
    use crate::{test_with_all_runtimes, SleepProvider};
    use futures::FutureExt;
    use futures::StreamExt;
    use std::time::{Duration, Instant};

    #[test]
    fn it_fires_immediately() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, _hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());
        });
    }

    #[test]
    #[allow(clippy::unwrap_used)]
    fn it_dies_if_dropped() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            drop(hdl);
            assert!(sch.next().now_or_never().unwrap().is_none());
        });
    }

    #[test]
    fn it_fires_on_demand() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            assert!(sch.next().now_or_never().is_none());
            assert!(hdl.fire());
            assert!(sch.next().now_or_never().is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_cancels_instant_firings() {
        // NOTE(eta): this test very much assumes that unbounded channels will
        //            transmit things instantly. If it breaks, that's probably why.
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            assert!(sch.next().now_or_never().is_none());
            assert!(hdl.fire());
            assert!(hdl.cancel());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_fires_after_self_reschedule() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, _hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            sch.fire_in(Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());
            assert!(sch.next().await.is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_fires_after_external_reschedule() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());
            assert!(sch.next().await.is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_cancels_delayed_firings() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(50)).await;

            assert!(sch.next().now_or_never().is_none());

            hdl.cancel();

            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(100)).await;

            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn last_fire_wins() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));
            hdl.fire();

            assert!(sch.next().now_or_never().is_some());
            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(150)).await;

            assert!(sch.next().now_or_never().is_none());
        });
    }
}