Commit 16ec1d21 authored by Nick Mathewson's avatar Nick Mathewson 🤹
Browse files

Allow type of timeout estimator to change at runtime.

This is a big change, but it does simplify the type of Builder a
little, and isolates locking across different (potential) timeout
estimator types.
parent fe85f44f
Loading
Loading
Loading
Loading
+52 −55
Original line number Diff line number Diff line
//! Facilities to build circuits directly, instead of via a circuit manager.

use crate::path::{OwnedPath, TorPath};
use crate::timeouts::{pareto::ParetoTimeoutEstimator, Action, TimeoutEstimator};
use crate::timeouts::{self, pareto::ParetoTimeoutEstimator, Action};
use crate::{Error, Result};
use async_trait::async_trait;
use futures::channel::oneshot;
@@ -130,30 +130,21 @@ impl Buildable for Arc<ClientCirc> {
///
/// In general, you should not need to construct or use this object yourself,
/// unless you are choosing your own paths.
struct Builder<
    R: Runtime,
    C: Buildable + Sync + Send + 'static,
    T: TimeoutEstimator + Send + Sync + 'static,
> {
struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
    /// The runtime used by this circuit builder.
    runtime: R,
    /// A channel manager that this circuit builder uses to make channels.
    chanmgr: Arc<ChanMgr<R>>,
    /// An estimator to determine the correct timeouts for circuit building.
    timeouts: T,
    timeouts: timeouts::Estimator,
    /// We don't actually hold any clientcircs, so we need to put this
    /// type here so the compiler won't freak out.
    _phantom: std::marker::PhantomData<C>,
}

impl<
        R: Runtime,
        C: Buildable + Sync + Send + 'static,
        T: TimeoutEstimator + Send + Sync + 'static,
    > Builder<R, C, T>
{
impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
    /// Construct a new [`Builder`].
    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: T) -> Self {
    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
        Builder {
            runtime,
            chanmgr,
@@ -271,7 +262,7 @@ impl<
/// unless you are choosing your own paths.
pub struct CircuitBuilder<R: Runtime> {
    /// The underlying [`Builder`] object
    builder: Arc<Builder<R, Arc<ClientCirc>, ParetoTimeoutEstimator>>,
    builder: Arc<Builder<R, Arc<ClientCirc>>>,
    /// Configuration for how to choose paths for circuits.
    path_config: crate::PathConfig,
    /// State-manager object to use in storing current state.
@@ -300,6 +291,7 @@ impl<R: Runtime> CircuitBuilder<R> {
                ParetoTimeoutEstimator::default()
            }
        };
        let timeouts = timeouts::Estimator::new(timeouts);

        CircuitBuilder {
            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
@@ -313,16 +305,14 @@ impl<R: Runtime> CircuitBuilder<R> {
    pub fn save_state(&self) -> Result<()> {
        // TODO: someday we'll want to only do this if there is something
        // changed.
        let state = self.builder.timeouts.build_state();
        self.storage.store(&state)?;
        Ok(())
        self.builder.timeouts.save_state(&self.storage)
    }

    /// Reconfigure this builder using the latest set of network parameters.
    ///
    /// (NOTE: for now, this only affects circuit timeout estimation.)
    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
        self.builder.timeouts.update_params(p.into());
        self.builder.timeouts.update_params(p);
    }

    /// DOCDOC
@@ -421,6 +411,7 @@ where
mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::timeouts::TimeoutEstimator;
    use futures::channel::oneshot;
    use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
    use std::sync::Mutex;
@@ -602,24 +593,28 @@ mod test {
        }
    }
    impl TimeoutEstimator for Arc<Mutex<TimeoutRecorder>> {
        fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
            if !is_last {
                return;
            }

            let mut h = self.lock().unwrap();
            h.hist.push((true, hop, delay));
            let mut this = self.lock().unwrap();
            this.hist.push((true, hop, delay));
        }
        fn note_circ_timeout(&self, hop: u8, delay: Duration) {
            let mut h = self.lock().unwrap();
            h.hist.push((false, hop, delay));
        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
            let mut this = self.lock().unwrap();
            this.hist.push((false, hop, delay));
        }
        fn timeouts(&self, _action: &Action) -> (Duration, Duration) {
        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
            (Duration::from_secs(3), Duration::from_secs(100))
        }
        fn learning_timeouts(&self) -> bool {
            false
        }
        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}

        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
            None
        }
    }

    /// Testing only: create a bogus circuit target
@@ -649,8 +644,11 @@ mod test {
            ]);
            let chanmgr = Arc::new(ChanMgr::new(rt.clone()));
            let timeouts = Arc::new(Mutex::new(TimeoutRecorder::new()));
            let builder: Builder<_, Mutex<FakeCirc>, _> =
                Builder::new(rt.clone(), chanmgr, Arc::clone(&timeouts));
            let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
                rt.clone(),
                chanmgr,
                timeouts::Estimator::new(Arc::clone(&timeouts)),
            );
            let builder = Arc::new(builder);
            let rng =
                StdRng::from_rng(rand::thread_rng()).expect("couldn't construct temporary rng");
@@ -677,15 +675,14 @@ mod test {
            );

            {
                let mut h = timeouts.lock().unwrap();
                assert_eq!(h.hist.len(), 2);
                assert!(h.hist[0].0); // completed
                assert_eq!(h.hist[0].1, 0); // last hop completed
                let timeouts = timeouts.lock().unwrap();
                assert_eq!(timeouts.hist.len(), 2);
                assert!(timeouts.hist[0].0); // completed
                assert_eq!(timeouts.hist[0].1, 0); // last hop completed
                                                   // TODO: test time elapsed, once wait_for is more reliable.
                assert!(h.hist[1].0); // completed
                assert_eq!(h.hist[1].1, 2); // last hop completed
                assert!(timeouts.hist[1].0); // completed
                assert_eq!(timeouts.hist[1].1, 2); // last hop completed
                                                   // TODO: test time elapsed, once wait_for is more reliable.
                h.hist.clear();
            }

            // Try a very long timeout.
@@ -700,11 +697,10 @@ mod test {
            assert!(outcome.is_err());

            {
                let mut h = timeouts.lock().unwrap();
                assert_eq!(h.hist.len(), 1);
                assert!(!h.hist[0].0);
                assert_eq!(h.hist[0].1, 2);
                h.hist.clear();
                let timeouts = timeouts.lock().unwrap();
                assert_eq!(timeouts.hist.len(), 3);
                assert!(!timeouts.hist[2].0);
                assert_eq!(timeouts.hist[2].1, 2);
            }

            // Now try a recordable timeout.
@@ -721,26 +717,27 @@ mod test {
                rt.advance(Duration::from_millis(100)).await;
            }
            {
                let h = timeouts.lock().unwrap();
                dbg!(&h.hist);
                let timeouts = timeouts.lock().unwrap();
                dbg!(&timeouts.hist);
                assert!(timeouts.hist.len() >= 4);
                // First we notice a circuit timeout after 2 hops
                assert!(!h.hist[0].0);
                assert_eq!(h.hist[0].1, 2);
                assert!(!timeouts.hist[3].0);
                assert_eq!(timeouts.hist[3].1, 2);
                // TODO: check timeout more closely.
                assert!(h.hist[0].2 < Duration::from_secs(100));
                assert!(h.hist[0].2 >= Duration::from_secs(3));
                assert!(timeouts.hist[3].2 < Duration::from_secs(100));
                assert!(timeouts.hist[3].2 >= Duration::from_secs(3));

                // This test is not reliable under test coverage; see arti#149.
                #[cfg(not(tarpaulin))]
                {
                    assert_eq!(h.hist.len(), 2);
                    assert_eq!(timeouts.hist.len(), 5);
                    // Then we notice a circuit completing at its third hop.
                    assert!(h.hist[1].0);
                    assert_eq!(h.hist[1].1, 2);
                    assert!(timeouts.hist[4].0);
                    assert_eq!(timeouts.hist[4].1, 2);
                    // TODO: check timeout more closely.
                    assert!(h.hist[1].2 < Duration::from_secs(100));
                    assert!(h.hist[1].2 >= Duration::from_secs(5));
                    assert!(h.hist[0].2 < h.hist[1].2);
                    assert!(timeouts.hist[4].2 < Duration::from_secs(100));
                    assert!(timeouts.hist[4].2 >= Duration::from_secs(5));
                    assert!(timeouts.hist[3].2 < timeouts.hist[4].2);
                }
            }
            HOP3_DELAY.store(300, SeqCst); // undo previous run.
+16 −3
Original line number Diff line number Diff line
@@ -10,8 +10,11 @@

use std::time::Duration;

pub(crate) mod estimator;
pub(crate) mod pareto;

pub(crate) use estimator::Estimator;

/// An object that calculates circuit timeout thresholds from the history
/// of circuit build times.
pub(crate) trait TimeoutEstimator {
@@ -23,7 +26,7 @@ pub(crate) trait TimeoutEstimator {
    /// circuit.
    ///
    /// If this is the last hop of the circuit, then `is_last` is true.
    fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool);
    fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool);

    /// Record that a circuit failed to complete because it took too long.
    ///
@@ -32,7 +35,7 @@ pub(crate) trait TimeoutEstimator {
    ///
    /// The `delay` number is the amount of time after we first launched the
    /// circuit.
    fn note_circ_timeout(&self, hop: u8, delay: Duration);
    fn note_circ_timeout(&mut self, hop: u8, delay: Duration);

    /// Return the current estimation for how long we should wait for a given
    /// [`Action`] to complete.
@@ -43,11 +46,21 @@ pub(crate) trait TimeoutEstimator {
    /// building it in order see how long it takes.  After `abandon`
    /// has elapsed since circuit launch, the circuit should be
    /// abandoned completely.
    fn timeouts(&self, action: &Action) -> (Duration, Duration);
    fn timeouts(&mut self, action: &Action) -> (Duration, Duration);

    /// Return true if we're currently trying to learn more timeouts
    /// by launching testing circuits.
    fn learning_timeouts(&self) -> bool;

    /// Replace the network parameters used by this estimator (if any)
    /// with ones derived from `params`.
    fn update_params(&mut self, params: &tor_netdir::params::NetParameters);

    /// Construct a new ParetoTimeoutState to represent the current state
    /// of this estimator, if it is possible to store the state to disk.
    ///
    /// TODO: change the type used for the state.
    fn build_state(&mut self) -> Option<pareto::ParetoTimeoutState>;
}

/// A possible action for which we can try to estimate a timeout.
+131 −0
Original line number Diff line number Diff line
//! Declarations for a [`TimeoutEstimator`] type that can change implementation.

use crate::timeouts::{Action, TimeoutEstimator};
use std::sync::Mutex;
use std::time::Duration;
use tor_netdir::params::NetParameters;

/// A timeout estimator that can change its inner implementation and share its
/// implementation among multiple threads.
pub(crate) struct Estimator {
    /// The estimator we're currently using.
    inner: Mutex<Box<dyn TimeoutEstimator + Send + 'static>>,
}

impl Estimator {
    /// Construct a new estimator from some variant.
    pub(crate) fn new(est: impl TimeoutEstimator + Send + 'static) -> Self {
        Self {
            inner: Mutex::new(Box::new(est)),
        }
    }

    /// Record that a given circuit hop has completed.
    ///
    /// The `hop` number is a zero-indexed value for which hop just completed.
    ///
    /// The `delay` value is the amount of time after we first launched the
    /// circuit.
    ///
    /// If this is the last hop of the circuit, then `is_last` is true.
    pub(crate) fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");

        inner.note_hop_completed(hop, delay, is_last);
    }

    /// Record that a circuit failed to complete because it took too long.
    ///
    /// The `hop` number is a the number of hops that were successfully
    /// completed.
    ///
    /// The `delay` number is the amount of time after we first launched the
    /// circuit.
    pub(crate) fn note_circ_timeout(&self, hop: u8, delay: Duration) {
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
        inner.note_circ_timeout(hop, delay);
    }

    /// Return the current estimation for how long we should wait for a given
    /// [`Action`] to complete.
    ///
    /// This function should return a 2-tuple of `(timeout, abandon)`
    /// durations.  After `timeout` has elapsed since circuit launch,
    /// the circuit should no longer be used, but we should still keep
    /// building it in order see how long it takes.  After `abandon`
    /// has elapsed since circuit launch, the circuit should be
    /// abandoned completely.
    pub(crate) fn timeouts(&self, action: &Action) -> (Duration, Duration) {
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");

        inner.timeouts(action)
    }

    /// Return true if we're currently trying to learn more timeouts
    /// by launching testing circuits.
    pub(crate) fn learning_timeouts(&self) -> bool {
        let inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
        inner.learning_timeouts()
    }

    /// Replace the network parameters used by this estimator (if any)
    /// with ones derived from `params`.
    pub(crate) fn update_params(&self, params: &NetParameters) {
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
        inner.update_params(params);
    }

    /// Store any state associated with this timeout esimator into `storage`.
    pub(crate) fn save_state(&self, storage: &crate::TimeoutStateHandle) -> crate::Result<()> {
        let state = {
            let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
            inner.build_state()
        };
        if let Some(state) = state {
            storage.store(&state)?;
        }
        Ok(())
    }
}

/*
/// An enum that can hold an estimator state.
enum EstimatorInner {
    Pareto(ParetoTimeoutEstimator),
}

impl TimeoutEstimatorImpl for EstimatorInner {
    fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
        match self {
            EstimatorInner::Pareto(mut p) => p.note_hop_completed(hop, delay, is_last)
        }
    }

    fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
        match self {
            EstimatorInner::Pareto(mut p) => p.note_circ_timeout(hop, delay)
        }
    }

    fn timeouts(&mut self, action: &Action) -> (Duration, Duration) {
        match self {
            EstimatorInner::Pareto(mut p) => p.timeouts(action)
        }
    }

    fn learning_timeouts(&self) -> bool {
        match self {
            EstimatorInner::Pareto(p) => p.learning_timeouts()
        }
    }

    fn update_params(&mut self, params: &tor_netdir::NetParameters) {
        match self {
            EstimatorInner::Pareto(mut p) => p.update_params(params),
        }
    }


}

*/
+90 −143

File changed.

Preview size limit exceeded, changes collapsed.

+9 −0
Original line number Diff line number Diff line
@@ -293,6 +293,15 @@ impl Default for NetParameters {
}

impl NetParameters {
    /// Construct a new NetParameters from a given list of key=value parameters.
    ///
    /// Unrecognized parameters are ignored.
    pub fn from_map(p: &tor_netdoc::doc::netstatus::NetParams<i32>) -> Self {
        let mut params = NetParameters::default();
        let _ = params.saturating_update(p.iter());
        params
    }

    /// Replace a list of parameters, using the logic of
    /// `set_saturating`.
    ///