diff --git a/crates/arti-client/Cargo.toml b/crates/arti-client/Cargo.toml index 949be675358efc08ddb8e9359891e92215f3a4d6..c3c103286c8e78cfec139e583d854690afa9a73b 100644 --- a/crates/arti-client/Cargo.toml +++ b/crates/arti-client/Cargo.toml @@ -49,6 +49,7 @@ postage = { version = "0.4", default-features = false, features = ["futures-trai tracing = "0.1.18" serde = { version = "1.0.103", features = ["derive"] } thiserror = "1" +pin-project = "1" [dev-dependencies] tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0", features=["tokio", "native-tls" ] } diff --git a/crates/arti-client/src/client.rs b/crates/arti-client/src/client.rs index f2c7f5c20f0a3b92f83149e365fdb2cc255d94ce..187a03267445a551dce9b38b8a1eec8deb1a83d0 100644 --- a/crates/arti-client/src/client.rs +++ b/crates/arti-client/src/client.rs @@ -23,11 +23,13 @@ use futures::task::SpawnExt; use std::convert::TryInto; use std::net::IpAddr; use std::result::Result as StdResult; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use crate::err::ErrorDetail; use crate::{status, util, TorClientBuilder}; +use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; use tracing::{debug, error, info, warn}; /// An active client session on the Tor network. @@ -83,6 +85,12 @@ pub struct TorClient<R: Runtime> { /// bootstrapping. If this is `false`, we will just call `wait_for_bootstrap` /// instead. should_bootstrap: BootstrapBehavior, + + /// Handles to periodic background tasks, useful for suspending them later. + periodic_task_handles: Vec<TaskHandle>, + + /// Shared boolean for whether we're currently in "dormant mode" or not. + dormant: Arc<AtomicBool>, } /// Preferences for whether a [`TorClient`] should bootstrap on its own or not. @@ -103,6 +111,17 @@ pub enum BootstrapBehavior { Manual, } +/// What level of sleep to put a Tor client into. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum DormantMode { + /// The client functions as normal, and background tasks run periodically. + Normal, + /// Background tasks are suspended, conserving CPU usage. Attempts to use the client will + /// wake it back up again. + Soft, +} + /// Preferences for how to route a stream over the Tor network. #[derive(Debug, Default, Clone)] pub struct StreamPrefs { @@ -354,6 +373,8 @@ impl<R: Runtime> TorClient<R> { .build(runtime.clone(), Arc::clone(&circmgr), dir_cfg) .map_err(crate::Error::into_detail)?; + let mut periodic_task_handles = vec![]; + let conn_status = chanmgr.bootstrap_events(); let dir_status = dirmgr.bootstrap_events(); runtime @@ -364,9 +385,12 @@ impl<R: Runtime> TorClient<R> { )) .map_err(|e| ErrorDetail::from_spawn("top-level status reporter", e))?; + let (expiry_sched, expiry_handle) = TaskSchedule::new(runtime.clone()); + periodic_task_handles.push(expiry_handle); + runtime .spawn(continually_expire_channels( - runtime.clone(), + expiry_sched, Arc::downgrade(&chanmgr), )) .map_err(|e| ErrorDetail::from_spawn("channel expiration task", e))?; @@ -381,25 +405,34 @@ impl<R: Runtime> TorClient<R> { )) .map_err(|e| ErrorDetail::from_spawn("circmgr parameter updater", e))?; + let (persist_sched, persist_handle) = TaskSchedule::new(runtime.clone()); + periodic_task_handles.push(persist_handle); + runtime .spawn(update_persistent_state( - runtime.clone(), + persist_sched, Arc::downgrade(&circmgr), statemgr.clone(), )) .map_err(|e| ErrorDetail::from_spawn("persistent state updater", e))?; + let (timeout_sched, timeout_handle) = TaskSchedule::new(runtime.clone()); + periodic_task_handles.push(timeout_handle); + runtime .spawn(continually_launch_timeout_testing_circuits( - runtime.clone(), + timeout_sched, Arc::downgrade(&circmgr), Arc::downgrade(&dirmgr), )) .map_err(|e| ErrorDetail::from_spawn("timeout-probe circuit launcher", e))?; + let (preempt_sched, preempt_handle) = TaskSchedule::new(runtime.clone()); + periodic_task_handles.push(preempt_handle); + runtime .spawn(continually_preemptively_build_circuits( - runtime.clone(), + preempt_sched, Arc::downgrade(&circmgr), Arc::downgrade(&dirmgr), )) @@ -420,6 +453,8 @@ impl<R: Runtime> TorClient<R> { status_receiver, bootstrap_in_progress: Arc::new(AsyncMutex::new(())), should_bootstrap: autobootstrap, + periodic_task_handles, + dormant: Arc::new(AtomicBool::new(false)), }) } @@ -499,6 +534,10 @@ impl<R: Runtime> TorClient<R> { self.bootstrap_in_progress.lock().await; } } + // NOTE(eta): will need to be changed when hard dormant mode is introduced + if self.dormant.load(Ordering::SeqCst) { + self.set_dormant(DormantMode::Normal); + } Ok(()) } @@ -860,6 +899,32 @@ impl<R: Runtime> TorClient<R> { pub fn bootstrap_events(&self) -> status::BootstrapEvents { self.status_receiver.clone() } + + /// Change the client's current dormant mode, putting background tasks to sleep + /// or waking them up as appropriate. + /// + /// This can be used to conserve CPU usage if you aren't planning on using the + /// client for a while, especially on mobile platforms. + /// + /// See the [`DormantMode`] documentation for more details. + pub fn set_dormant(&self, mode: DormantMode) { + let is_dormant = matches!(mode, DormantMode::Soft); + + // Do an atomic compare-exchange. If it succeeds, we just flipped `self.dormant`. + if self + .dormant + .compare_exchange(!is_dormant, is_dormant, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + for task in self.periodic_task_handles.iter() { + if is_dormant { + task.cancel(); + } else { + task.fire(); + } + } + } + } } /// Alias for TorError::from(Error) @@ -922,14 +987,14 @@ async fn keep_circmgr_params_updated<R: Runtime>( /// /// This is a daemon task: it runs indefinitely in the background. async fn update_persistent_state<R: Runtime>( - runtime: R, + mut sched: TaskSchedule<R>, circmgr: Weak<tor_circmgr::CircMgr<R>>, statemgr: FsStateMgr, ) { // TODO: Consider moving this function into tor-circmgr after we have more // experience with the state system. - loop { + while sched.next().await.is_some() { if let Some(circmgr) = Weak::upgrade(&circmgr) { use tor_persist::LockStatus::*; @@ -968,7 +1033,7 @@ async fn update_persistent_state<R: Runtime>( // we should be updating more frequently when the data is volatile // or has important info to save, and not at all when there are no // changes. - runtime.sleep(Duration::from_secs(60)).await; + sched.fire_in(Duration::from_secs(60)); } error!("State update task is exiting prematurely."); @@ -987,27 +1052,31 @@ async fn update_persistent_state<R: Runtime>( /// see [`tor_circmgr::CircMgr::launch_timeout_testing_circuit_if_appropriate`] /// for more information. async fn continually_launch_timeout_testing_circuits<R: Runtime>( - rt: R, + mut sched: TaskSchedule<R>, circmgr: Weak<tor_circmgr::CircMgr<R>>, dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>, ) { - while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - if let Some(netdir) = dm.latest_netdir() { - if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) { - warn!("Problem launching a timeout testing circuit: {}", e); + while sched.next().await.is_some() { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { + if let Some(netdir) = dm.latest_netdir() { + if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) { + warn!("Problem launching a timeout testing circuit: {}", e); + } + let delay = netdir + .params() + .cbt_testing_delay + .try_into() + .expect("Out-of-bounds value from BoundedInt32"); + + drop((cm, dm)); + sched.fire_in(delay); + } else { + // TODO(eta): ideally, this should wait until we successfully bootstrap using + // the bootstrap status API + sched.fire_in(Duration::from_secs(10)); } - let delay = netdir - .params() - .cbt_testing_delay - .try_into() - .expect("Out-of-bounds value from BoundedInt32"); - - drop((cm, dm)); - rt.sleep(delay).await; } else { - // TODO(eta): ideally, this should wait until we successfully bootstrap using - // the bootstrap status API - rt.sleep(Duration::from_secs(10)).await; + return; } } } @@ -1024,19 +1093,23 @@ async fn continually_launch_timeout_testing_circuits<R: Runtime>( /// This would be better handled entirely within `tor-circmgr`, like /// other daemon tasks. async fn continually_preemptively_build_circuits<R: Runtime>( - rt: R, + mut sched: TaskSchedule<R>, circmgr: Weak<tor_circmgr::CircMgr<R>>, dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>, ) { - while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { - if let Some(netdir) = dm.latest_netdir() { - cm.launch_circuits_preemptively(DirInfo::Directory(&netdir)) - .await; - rt.sleep(Duration::from_secs(10)).await; + while sched.next().await.is_some() { + if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) { + if let Some(netdir) = dm.latest_netdir() { + cm.launch_circuits_preemptively(DirInfo::Directory(&netdir)) + .await; + sched.fire_in(Duration::from_secs(10)); + } else { + // TODO(eta): ideally, this should wait until we successfully bootstrap using + // the bootstrap status API + sched.fire_in(Duration::from_secs(10)); + } } else { - // TODO(eta): ideally, this should wait until we successfully bootstrap using - // the bootstrap status API - rt.sleep(Duration::from_secs(10)).await; + return; } } } @@ -1046,8 +1119,11 @@ async fn continually_preemptively_build_circuits<R: Runtime>( /// Exist when we find that `chanmgr` is dropped /// /// This is a daemon task that runs indefinitely in the background -async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmgr::ChanMgr<R>>) { - loop { +async fn continually_expire_channels<R: Runtime>( + mut sched: TaskSchedule<R>, + chanmgr: Weak<tor_chanmgr::ChanMgr<R>>, +) { + while sched.next().await.is_some() { let delay = if let Some(cm) = Weak::upgrade(&chanmgr) { cm.expire_channels() } else { @@ -1055,7 +1131,7 @@ async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmg return; }; // This will sometimes be an underestimate, but it's no big deal; we just sleep some more. - rt.sleep(Duration::from_secs(delay.as_secs())).await; + sched.fire_in(Duration::from_secs(delay.as_secs())); } } diff --git a/crates/arti-client/src/lib.rs b/crates/arti-client/src/lib.rs index 8e16156db864506243b7d31248559219df305b97..c7b3ddedacfcb1ec2b0ee24f21f95acb0704e000 100644 --- a/crates/arti-client/src/lib.rs +++ b/crates/arti-client/src/lib.rs @@ -225,7 +225,7 @@ pub mod status; pub use address::{DangerouslyIntoTorAddr, IntoTorAddr, TorAddr, TorAddrError}; pub use builder::TorClientBuilder; -pub use client::{BootstrapBehavior, StreamPrefs, TorClient}; +pub use client::{BootstrapBehavior, DormantMode, StreamPrefs, TorClient}; pub use config::TorClientConfig; pub use tor_circmgr::isolation; diff --git a/crates/tor-rtcompat/src/lib.rs b/crates/tor-rtcompat/src/lib.rs index 3d8605a9bf1b8e0871598f46eb30247a150d0d78..33f3ed9f464384976f28cce4e51f077714f3f9ae 100644 --- a/crates/tor-rtcompat/src/lib.rs +++ b/crates/tor-rtcompat/src/lib.rs @@ -175,6 +175,7 @@ pub mod task; mod compound; mod opaque; +pub mod scheduler; mod timer; mod traits; @@ -405,6 +406,9 @@ pub mod cond { /// /// (This is a macro so that it can repeat the closure as multiple separate /// expressions, so it can take on two different types, if needed.) +// +// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use +// this macro, like in scheduler.rs #[macro_export] #[cfg(all( any(feature = "native-tls", feature = "rustls"), diff --git a/crates/tor-rtcompat/src/scheduler.rs b/crates/tor-rtcompat/src/scheduler.rs new file mode 100644 index 0000000000000000000000000000000000000000..5512b4ee3e46d8d268640afda84a0e1f8824500b --- /dev/null +++ b/crates/tor-rtcompat/src/scheduler.rs @@ -0,0 +1,280 @@ +//! Utilities for dealing with periodic recurring tasks. + +use crate::SleepProvider; +use futures::channel::mpsc; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::{Stream, StreamExt}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use pin_project::pin_project; + +/// A command sent from task handles to schedule objects. +#[derive(Copy, Clone)] +enum SchedulerCommand { + /// Run the task now. + Fire, + /// Run the task at the provided `Instant`. + FireAt(Instant), + /// Cancel a pending execution, if there is one. + Cancel, +} + +/// A remotely-controllable trigger for recurring tasks. +/// +/// This implements [`Stream`], and is intended to be used in a `while` loop; you should +/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar. +#[pin_project(project = TaskScheduleP)] +pub struct TaskSchedule<R: SleepProvider> { + /// If we're waiting for a deadline to expire, the future for that. + sleep: Option<Pin<Box<R::SleepFuture>>>, + /// Receiver of scheduler commands from handles. + rx: UnboundedReceiver<SchedulerCommand>, + /// Runtime. + rt: R, + /// Whether or not to yield a result immediately when polled, once. + /// + /// This is used to avoid having to create a `SleepFuture` with zero duration, + /// which is potentially a bit wasteful. + instant_fire: bool, +} + +/// A handle used to control a [`TaskSchedule`]. +#[derive(Clone)] +pub struct TaskHandle { + /// Sender of scheduler commands to the corresponding schedule. + tx: UnboundedSender<SchedulerCommand>, +} + +impl<R: SleepProvider> TaskSchedule<R> { + /// Create a new schedule, and corresponding handle. + pub fn new(rt: R) -> (Self, TaskHandle) { + let (tx, rx) = mpsc::unbounded(); + ( + Self { + sleep: None, + rx, + rt, + // Start off ready. + instant_fire: true, + }, + TaskHandle { tx }, + ) + } + + /// Trigger the schedule after `dur`. + pub fn fire_in(&mut self, dur: Duration) { + self.instant_fire = false; + self.sleep = Some(Box::pin(self.rt.sleep(dur))); + } +} + +impl TaskHandle { + /// Trigger this handle's corresponding schedule now. + /// + /// Returns `true` if the schedule still exists, and `false` otherwise. + pub fn fire(&self) -> bool { + self.tx.unbounded_send(SchedulerCommand::Fire).is_ok() + } + /// Trigger this handle's corresponding schedule at `instant`. + /// + /// Returns `true` if the schedule still exists, and `false` otherwise. + pub fn fire_at(&self, instant: Instant) -> bool { + self.tx + .unbounded_send(SchedulerCommand::FireAt(instant)) + .is_ok() + } + /// Cancel a pending firing of the handle's corresponding schedule. + /// + /// Returns `true` if the schedule still exists, and `false` otherwise. + pub fn cancel(&self) -> bool { + self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok() + } +} + +// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want +// to require `R: Unpin`. Accordingly, all the fields are mutable references. +impl<R: SleepProvider> TaskScheduleP<'_, R> { + /// Handle an internal command. + fn handle_command(&mut self, cmd: SchedulerCommand) { + match cmd { + SchedulerCommand::Fire => { + *self.instant_fire = true; + *self.sleep = None; + } + SchedulerCommand::FireAt(instant) => { + let now = self.rt.now(); + let dur = instant.saturating_duration_since(now); + *self.instant_fire = false; + *self.sleep = Some(Box::pin(self.rt.sleep(dur))); + } + SchedulerCommand::Cancel => { + *self.instant_fire = false; + *self.sleep = None; + } + } + } +} + +impl<R: SleepProvider> Stream for TaskSchedule<R> { + type Item = (); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut this = self.project(); + while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) { + match maybe_cmd { + Some(c) => this.handle_command(c), + None => { + // All task handles dropped; return end of stream. + return Poll::Ready(None); + } + } + } + if *this.instant_fire { + *this.instant_fire = false; + return Poll::Ready(Some(())); + } + if this + .sleep + .as_mut() + .map(|x| x.as_mut().poll(cx).is_ready()) + .unwrap_or(false) + { + *this.sleep = None; + return Poll::Ready(Some(())); + } + Poll::Pending + } +} + +// test_with_all_runtimes! only exists if these features are satisfied. +#[cfg(all( + test, + any(feature = "native-tls", feature = "rustls"), + any(feature = "tokio", feature = "async-std"), +))] +mod test { + use crate::scheduler::TaskSchedule; + use crate::{test_with_all_runtimes, SleepProvider}; + use futures::FutureExt; + use futures::StreamExt; + use std::time::{Duration, Instant}; + + #[test] + fn it_fires_immediately() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, _hdl) = TaskSchedule::new(rt); + assert!(sch.next().now_or_never().is_some()); + }); + } + + #[test] + #[allow(clippy::unwrap_used)] + fn it_dies_if_dropped() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt); + drop(hdl); + assert!(sch.next().now_or_never().unwrap().is_none()); + }); + } + + #[test] + fn it_fires_on_demand() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt); + assert!(sch.next().now_or_never().is_some()); + + assert!(sch.next().now_or_never().is_none()); + assert!(hdl.fire()); + assert!(sch.next().now_or_never().is_some()); + assert!(sch.next().now_or_never().is_none()); + }); + } + + #[test] + fn it_cancels_instant_firings() { + // NOTE(eta): this test very much assumes that unbounded channels will + // transmit things instantly. If it breaks, that's probably why. + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt); + assert!(sch.next().now_or_never().is_some()); + + assert!(sch.next().now_or_never().is_none()); + assert!(hdl.fire()); + assert!(hdl.cancel()); + assert!(sch.next().now_or_never().is_none()); + }); + } + + #[test] + fn it_fires_after_self_reschedule() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, _hdl) = TaskSchedule::new(rt); + assert!(sch.next().now_or_never().is_some()); + + sch.fire_in(Duration::from_millis(100)); + + assert!(sch.next().now_or_never().is_none()); + assert!(sch.next().await.is_some()); + assert!(sch.next().now_or_never().is_none()); + }); + } + + #[test] + fn it_fires_after_external_reschedule() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt); + assert!(sch.next().now_or_never().is_some()); + + hdl.fire_at(Instant::now() + Duration::from_millis(100)); + + assert!(sch.next().now_or_never().is_none()); + assert!(sch.next().await.is_some()); + assert!(sch.next().now_or_never().is_none()); + }); + } + + #[test] + fn it_cancels_delayed_firings() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt.clone()); + assert!(sch.next().now_or_never().is_some()); + + hdl.fire_at(Instant::now() + Duration::from_millis(100)); + + assert!(sch.next().now_or_never().is_none()); + + rt.sleep(Duration::from_millis(50)).await; + + assert!(sch.next().now_or_never().is_none()); + + hdl.cancel(); + + assert!(sch.next().now_or_never().is_none()); + + rt.sleep(Duration::from_millis(100)).await; + + assert!(sch.next().now_or_never().is_none()); + }); + } + + #[test] + fn last_fire_wins() { + test_with_all_runtimes!(|rt| async move { + let (mut sch, hdl) = TaskSchedule::new(rt.clone()); + assert!(sch.next().now_or_never().is_some()); + + hdl.fire_at(Instant::now() + Duration::from_millis(100)); + hdl.fire(); + + assert!(sch.next().now_or_never().is_some()); + assert!(sch.next().now_or_never().is_none()); + + rt.sleep(Duration::from_millis(150)).await; + + assert!(sch.next().now_or_never().is_none()); + }); + } +}