Commit b27ffe14 authored by Ian Jackson's avatar Ian Jackson 💬
Browse files

Merge branch 'dir-provider-redux' into 'main'

Alternative DirProvider setup

See merge request !347
parents f7009e23 241fbd87
Loading
Loading
Loading
Loading
+62 −7
Original line number Diff line number Diff line
@@ -3,14 +3,47 @@
#![allow(missing_docs, clippy::missing_docs_in_private_items)]

use crate::{err::ErrorDetail, BootstrapBehavior, Result, TorClient, TorClientConfig};
use std::sync::Arc;
use tor_dirmgr::DirMgrConfig;
use tor_rtcompat::Runtime;

/// An object that knows how to construct some kind of DirProvider.
///
/// Note that this type is only actually exposed when the `experimental-api`
/// feature is enabled.
#[allow(unreachable_pub)]
pub trait DirProviderBuilder<R: Runtime> {
    fn build(
        &self,
        runtime: R,
        circmgr: Arc<tor_circmgr::CircMgr<R>>,
        config: DirMgrConfig,
    ) -> Result<Arc<dyn tor_dirmgr::DirProvider + Send + Sync + 'static>>;
}

/// A DirProviderBuilder that constructs a regular DirMgr.
#[derive(Clone, Debug)]
struct DirMgrBuilder {}

impl<R: Runtime> DirProviderBuilder<R> for DirMgrBuilder {
    fn build(
        &self,
        runtime: R,
        circmgr: Arc<tor_circmgr::CircMgr<R>>,
        config: DirMgrConfig,
    ) -> Result<Arc<dyn tor_dirmgr::DirProvider + Send + Sync + 'static>> {
        let dirmgr = tor_dirmgr::DirMgr::create_unbootstrapped(config, runtime, circmgr)
            .map_err(ErrorDetail::from)?;
        Ok(Arc::new(dirmgr))
    }
}

/// An object for constructing a [`TorClient`].
///
/// Returned by [`TorClient::builder()`].
#[derive(Debug, Clone)]
#[derive(Clone)]
#[must_use]
pub struct TorClientBuilder<R> {
pub struct TorClientBuilder<R: Runtime> {
    /// The runtime for the client to use
    runtime: R,
    /// The client's configuration.
@@ -18,15 +51,21 @@ pub struct TorClientBuilder<R> {
    /// How the client should behave when it is asked to do something on the Tor
    /// network before `bootstrap()` is called.
    bootstrap_behavior: BootstrapBehavior,
    /// Optional object to construct a DirProvider.
    ///
    /// Wrapped in an Arc so that we don't need to force DirProviderBuilder to
    /// implement Clone.
    dirmgr_builder: Arc<dyn DirProviderBuilder<R>>,
}

impl<R> TorClientBuilder<R> {
impl<R: Runtime> TorClientBuilder<R> {
    /// Construct a new TorClientBuilder with the given runtime.
    pub(crate) fn new(runtime: R) -> Self {
        Self {
            runtime,
            config: TorClientConfig::default(),
            bootstrap_behavior: BootstrapBehavior::default(),
            dirmgr_builder: Arc::new(DirMgrBuilder {}),
        }
    }

@@ -46,9 +85,20 @@ impl<R> TorClientBuilder<R> {
        self.bootstrap_behavior = bootstrap_behavior;
        self
    }

    /// Override the default function used to construct the directory provider.
    ///
    /// Only available when compiled with the `experimental-api` feature: this
    /// code is unstable.
    #[cfg(all(feature = "experimental-api", feature = "error_detail"))]
    pub fn dirmgr_builder<B>(mut self, builder: Arc<dyn DirProviderBuilder<R>>) -> Self
    where
        B: DirProviderBuilder<R> + 'static,
    {
        self.dirmgr_builder = builder;
        self
    }

impl<R: Runtime> TorClientBuilder<R> {
    /// Create a `TorClient` from this builder, without automatically launching
    /// the bootstrap process.
    ///
@@ -65,7 +115,12 @@ impl<R: Runtime> TorClientBuilder<R> {
    /// process (for example, you might wish to avoid initiating network
    /// connections until explicit user confirmation is given).
    pub fn create_unbootstrapped(self) -> Result<TorClient<R>> {
        TorClient::create_inner(self.runtime, self.config, self.bootstrap_behavior)
        TorClient::create_inner(
            self.runtime,
            self.config,
            self.bootstrap_behavior,
            self.dirmgr_builder.as_ref(),
        )
        .map_err(ErrorDetail::into)
    }

+21 −17
Original line number Diff line number Diff line
@@ -55,7 +55,7 @@ pub struct TorClient<R: Runtime> {
    /// them on-demand.
    circmgr: Arc<tor_circmgr::CircMgr<R>>,
    /// Directory manager for keeping our directory material up to date.
    dirmgr: Arc<tor_dirmgr::DirMgr<R>>,
    dirmgr: Arc<dyn tor_dirmgr::DirProvider + Send + Sync>,
    /// Location on disk where we store persistent data.
    statemgr: FsStateMgr,
    /// Client address configuration
@@ -337,6 +337,7 @@ impl<R: Runtime> TorClient<R> {
        runtime: R,
        config: TorClientConfig,
        autobootstrap: BootstrapBehavior,
        dirmgr_builder: &dyn crate::builder::DirProviderBuilder<R>,
    ) -> StdResult<Self, ErrorDetail> {
        let circ_cfg = config.get_circmgr_config()?;
        let dir_cfg = config.get_dirmgr_config()?;
@@ -352,11 +353,10 @@ impl<R: Runtime> TorClient<R> {
        let circmgr =
            tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))
                .map_err(ErrorDetail::CircMgrSetup)?;
        let dirmgr = tor_dirmgr::DirMgr::create_unbootstrapped(
            dir_cfg,
            runtime.clone(),
            Arc::clone(&circmgr),
        )?;

        let dirmgr = dirmgr_builder
            .build(runtime.clone(), Arc::clone(&circmgr), dir_cfg)
            .map_err(crate::Error::into_detail)?;

        let conn_status = chanmgr.bootstrap_events();
        let dir_status = dirmgr.bootstrap_events();
@@ -471,8 +471,12 @@ impl<R: Runtime> TorClient<R> {

        self.dirmgr.bootstrap().await?;

        self.circmgr
            .update_network_parameters(self.dirmgr.netdir()?.params());
        self.circmgr.update_network_parameters(
            self.dirmgr
                .latest_netdir()
                .ok_or(ErrorDetail::DirMgr(tor_dirmgr::Error::DirectoryNotPresent))?
                .params(),
        );

        // Since we succeeded, disarm the unlock guard.
        unlock_guard.disarm();
@@ -779,7 +783,7 @@ impl<R: Runtime> TorClient<R> {
    /// This function is unstable. It is only enabled if the crate was
    /// built with the `experimental-api` feature.
    #[cfg(feature = "experimental-api")]
    pub fn dirmgr(&self) -> Arc<tor_dirmgr::DirMgr<R>> {
    pub fn dirmgr(&self) -> Arc<dyn tor_dirmgr::DirProvider + Send + Sync> {
        Arc::clone(&self.dirmgr)
    }

@@ -813,7 +817,7 @@ impl<R: Runtime> TorClient<R> {
        self.wait_for_bootstrap().await?;
        let dir = self
            .dirmgr
            .opt_netdir()
            .latest_netdir()
            .ok_or(ErrorDetail::BootstrapRequired {
                action: "launch a circuit",
            })?;
@@ -879,7 +883,7 @@ where
async fn keep_circmgr_params_updated<R: Runtime>(
    mut events: impl futures::Stream<Item = DirEvent> + Unpin,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    dirmgr: Weak<tor_dirmgr::DirMgr<R>>,
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
    use DirEvent::*;
    while let Some(event) = events.next().await {
@@ -887,7 +891,7 @@ async fn keep_circmgr_params_updated<R: Runtime>(
            NewConsensus => {
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
                    let netdir = dm
                        .netdir()
                        .latest_netdir()
                        .expect("got new consensus event, without a netdir?");
                    cm.update_network_parameters(netdir.params());
                    cm.update_network(&netdir);
@@ -899,7 +903,7 @@ async fn keep_circmgr_params_updated<R: Runtime>(
            NewDescriptors => {
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
                    let netdir = dm
                        .netdir()
                        .latest_netdir()
                        .expect("got new descriptors event, without a netdir?");
                    cm.update_network(&netdir);
                } else {
@@ -988,10 +992,10 @@ async fn update_persistent_state<R: Runtime>(
async fn continually_launch_timeout_testing_circuits<R: Runtime>(
    rt: R,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    dirmgr: Weak<tor_dirmgr::DirMgr<R>>,
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
        if let Some(netdir) = dm.opt_netdir() {
        if let Some(netdir) = dm.latest_netdir() {
            if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
                warn!("Problem launching a timeout testing circuit: {}", e);
            }
@@ -1025,10 +1029,10 @@ async fn continually_launch_timeout_testing_circuits<R: Runtime>(
async fn continually_preemptively_build_circuits<R: Runtime>(
    rt: R,
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
    dirmgr: Weak<tor_dirmgr::DirMgr<R>>,
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
) {
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
        if let Some(netdir) = dm.opt_netdir() {
        if let Some(netdir) = dm.latest_netdir() {
            cm.launch_circuits_preemptively(DirInfo::Directory(&netdir))
                .await;
            rt.sleep(Duration::from_secs(10)).await;
+7 −0
Original line number Diff line number Diff line
@@ -206,6 +206,13 @@ impl Error {
    }
}

impl Error {
    /// Consume this error and return the underlying error detail object.
    pub(crate) fn into_detail(self) -> ErrorDetail {
        *self.detail
    }
}

impl ErrorDetail {
    /// Construct a new `Error` from a `SpawnError`.
    pub(crate) fn from_spawn(spawning: &'static str, err: SpawnError) -> ErrorDetail {
+3 −0
Original line number Diff line number Diff line
@@ -240,3 +240,6 @@ pub use err::ErrorDetail;

/// Alias for the [`Result`] type corresponding to the high-level [`Error`].
pub type Result<T> = std::result::Result<T, Error>;

#[cfg(feature = "experimental-api")]
pub use builder::DirProviderBuilder;
+2 −2
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ use std::{borrow::Cow, fmt, time::SystemTime};
use derive_more::Display;
use futures::{Stream, StreamExt};
use tor_chanmgr::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_dirmgr::{DirBootstrapEvents, DirBootstrapStatus};
use tor_dirmgr::DirBootstrapStatus;
use tracing::debug;

/// Information about how ready a [`crate::TorClient`] is to handle requests.
@@ -157,7 +157,7 @@ impl fmt::Display for BootstrapStatus {
pub(crate) async fn report_status(
    mut sender: postage::watch::Sender<BootstrapStatus>,
    conn_status: ConnStatusEvents,
    dir_status: DirBootstrapEvents,
    dir_status: impl Stream<Item = DirBootstrapStatus> + Unpin,
) {
    /// Internal enumeration to combine incoming status changes.
    enum Event {
Loading