Loading crates/arti-client/src/client.rs +12 −6 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ use crate::address::IntoTorAddr; use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig}; use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort}; use tor_config::MutCfg; use tor_dirmgr::DirEvent; use tor_dirmgr::{event::FlagListener, DirEvent}; use tor_persist::{FsStateMgr, StateMgr}; use tor_proto::circuit::ClientCirc; use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters}; Loading Loading @@ -59,7 +59,7 @@ pub struct TorClient<R: Runtime> { /// them on-demand. circmgr: Arc<tor_circmgr::CircMgr<R>>, /// Directory manager for keeping our directory material up to date. dirmgr: Arc<tor_dirmgr::DirMgr<R>>, dirmgr: Arc<dyn tor_dirmgr::DirProvider<R, EventStream = FlagListener<DirEvent>>>, /// Location on disk where we store persistent data. statemgr: FsStateMgr, /// Client address configuration Loading Loading @@ -474,8 +474,12 @@ impl<R: Runtime> TorClient<R> { self.dirmgr.bootstrap().await?; self.circmgr .update_network_parameters(self.dirmgr.netdir()?.params()); self.circmgr.update_network_parameters( self.dirmgr .latest_netdir() .ok_or(ErrorDetail::DirMgr(tor_dirmgr::Error::DirectoryNotPresent))? .params(), ); Ok(()) } Loading Loading @@ -779,7 +783,9 @@ impl<R: Runtime> TorClient<R> { /// This function is unstable. It is only enabled if the crate was /// built with the `experimental-api` feature. #[cfg(feature = "experimental-api")] pub fn dirmgr(&self) -> Arc<tor_dirmgr::DirMgr<R>> { pub fn dirmgr( &self, ) -> Arc<dyn tor_dirmgr::DirProvider<R, EventStream = FlagListener<DirEvent>>> { Arc::clone(&self.dirmgr) } Loading @@ -802,7 +808,7 @@ impl<R: Runtime> TorClient<R> { self.wait_for_bootstrap().await?; let dir = self .dirmgr .opt_netdir() .latest_netdir() .ok_or(ErrorDetail::BootstrapRequired { action: "launch a circuit", })?; Loading crates/tor-dirmgr/src/lib.rs +68 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ use tor_circmgr::CircMgr; use tor_netdir::NetDir; use tor_netdoc::doc::netstatus::ConsensusFlavor; use async_trait::async_trait; use futures::{channel::oneshot, task::SpawnExt}; use tor_rtcompat::{Runtime, SleepProviderExt}; use tracing::{debug, info, trace, warn}; Loading @@ -99,6 +100,73 @@ pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder}; /// A Result as returned by this crate. pub type Result<T> = std::result::Result<T, Error>; /// Trait for DirMgr implementations #[async_trait] // NOTE: I am not yet clear in how to determine wether Send + Sync is satisfied? pub trait DirProvider<R: Runtime>: Send + Sync { /// Stream of events produced by a DirProvider. type EventStream: futures::stream::Stream<Item = DirEvent>; /// Return a handle to our latest directory, if we have one. fn latest_netdir(&self) -> Option<Arc<NetDir>>; /// Return a new asynchronous stream that will receive notification /// whenever the consensus has changed. /// /// Multiple events may be batched up into a single item: each time /// this stream yields an event, all you can assume is that the event has /// occurred at least once. fn events(&self) -> Self::EventStream; /// Try to change our configuration to `new_config`. /// /// Actual behavior will depend on the value of `how`. fn reconfigure( &self, new_config: &DirMgrConfig, how: tor_config::Reconfigure, ) -> std::result::Result<(), tor_config::ReconfigureError>; /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet. async fn bootstrap(&self) -> Result<()>; /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes /// in the latest directory's bootstrap status. /// /// Note that this stream can be lossy: the caller will not necessarily /// observe every event on the stream fn bootstrap_events(&self) -> event::DirBootstrapEvents; } #[async_trait] impl<R: Runtime> DirProvider<R> for DirMgr<R> { type EventStream = event::FlagListener<DirEvent>; fn latest_netdir(&self) -> Option<Arc<NetDir>> { self.opt_netdir() } fn events(&self) -> Self::EventStream { self.events.subscribe() } fn reconfigure( &self, new_config: &DirMgrConfig, how: tor_config::Reconfigure, ) -> std::result::Result<(), tor_config::ReconfigureError> { self.reconfigure(new_config, how) } async fn bootstrap(&self) -> Result<()> { self.bootstrap().await } fn bootstrap_events(&self) -> event::DirBootstrapEvents { self.bootstrap_events() } } /// A directory manager to download, fetch, and cache a Tor directory. /// /// A DirMgr can operate in three modes: Loading Loading
crates/arti-client/src/client.rs +12 −6 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ use crate::address::IntoTorAddr; use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig}; use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort}; use tor_config::MutCfg; use tor_dirmgr::DirEvent; use tor_dirmgr::{event::FlagListener, DirEvent}; use tor_persist::{FsStateMgr, StateMgr}; use tor_proto::circuit::ClientCirc; use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters}; Loading Loading @@ -59,7 +59,7 @@ pub struct TorClient<R: Runtime> { /// them on-demand. circmgr: Arc<tor_circmgr::CircMgr<R>>, /// Directory manager for keeping our directory material up to date. dirmgr: Arc<tor_dirmgr::DirMgr<R>>, dirmgr: Arc<dyn tor_dirmgr::DirProvider<R, EventStream = FlagListener<DirEvent>>>, /// Location on disk where we store persistent data. statemgr: FsStateMgr, /// Client address configuration Loading Loading @@ -474,8 +474,12 @@ impl<R: Runtime> TorClient<R> { self.dirmgr.bootstrap().await?; self.circmgr .update_network_parameters(self.dirmgr.netdir()?.params()); self.circmgr.update_network_parameters( self.dirmgr .latest_netdir() .ok_or(ErrorDetail::DirMgr(tor_dirmgr::Error::DirectoryNotPresent))? .params(), ); Ok(()) } Loading Loading @@ -779,7 +783,9 @@ impl<R: Runtime> TorClient<R> { /// This function is unstable. It is only enabled if the crate was /// built with the `experimental-api` feature. #[cfg(feature = "experimental-api")] pub fn dirmgr(&self) -> Arc<tor_dirmgr::DirMgr<R>> { pub fn dirmgr( &self, ) -> Arc<dyn tor_dirmgr::DirProvider<R, EventStream = FlagListener<DirEvent>>> { Arc::clone(&self.dirmgr) } Loading @@ -802,7 +808,7 @@ impl<R: Runtime> TorClient<R> { self.wait_for_bootstrap().await?; let dir = self .dirmgr .opt_netdir() .latest_netdir() .ok_or(ErrorDetail::BootstrapRequired { action: "launch a circuit", })?; Loading
crates/tor-dirmgr/src/lib.rs +68 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,7 @@ use tor_circmgr::CircMgr; use tor_netdir::NetDir; use tor_netdoc::doc::netstatus::ConsensusFlavor; use async_trait::async_trait; use futures::{channel::oneshot, task::SpawnExt}; use tor_rtcompat::{Runtime, SleepProviderExt}; use tracing::{debug, info, trace, warn}; Loading @@ -99,6 +100,73 @@ pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder}; /// A Result as returned by this crate. pub type Result<T> = std::result::Result<T, Error>; /// Trait for DirMgr implementations #[async_trait] // NOTE: I am not yet clear in how to determine wether Send + Sync is satisfied? pub trait DirProvider<R: Runtime>: Send + Sync { /// Stream of events produced by a DirProvider. type EventStream: futures::stream::Stream<Item = DirEvent>; /// Return a handle to our latest directory, if we have one. fn latest_netdir(&self) -> Option<Arc<NetDir>>; /// Return a new asynchronous stream that will receive notification /// whenever the consensus has changed. /// /// Multiple events may be batched up into a single item: each time /// this stream yields an event, all you can assume is that the event has /// occurred at least once. fn events(&self) -> Self::EventStream; /// Try to change our configuration to `new_config`. /// /// Actual behavior will depend on the value of `how`. fn reconfigure( &self, new_config: &DirMgrConfig, how: tor_config::Reconfigure, ) -> std::result::Result<(), tor_config::ReconfigureError>; /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet. async fn bootstrap(&self) -> Result<()>; /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes /// in the latest directory's bootstrap status. /// /// Note that this stream can be lossy: the caller will not necessarily /// observe every event on the stream fn bootstrap_events(&self) -> event::DirBootstrapEvents; } #[async_trait] impl<R: Runtime> DirProvider<R> for DirMgr<R> { type EventStream = event::FlagListener<DirEvent>; fn latest_netdir(&self) -> Option<Arc<NetDir>> { self.opt_netdir() } fn events(&self) -> Self::EventStream { self.events.subscribe() } fn reconfigure( &self, new_config: &DirMgrConfig, how: tor_config::Reconfigure, ) -> std::result::Result<(), tor_config::ReconfigureError> { self.reconfigure(new_config, how) } async fn bootstrap(&self) -> Result<()> { self.bootstrap().await } fn bootstrap_events(&self) -> event::DirBootstrapEvents { self.bootstrap_events() } } /// A directory manager to download, fetch, and cache a Tor directory. /// /// A DirMgr can operate in three modes: Loading