Commit 9a94f72e authored by eta's avatar eta
Browse files

Add tests & address review commentary

parent ca421399
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -57,7 +57,7 @@ use tor_rtcompat::Runtime;
use futures::task::SpawnExt;
use std::convert::TryInto;
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{debug, warn};

pub mod build;
@@ -155,7 +155,7 @@ pub struct CircMgr<R: Runtime> {
    /// The underlying circuit manager object that implements our behavior.
    mgr: Arc<mgr::AbstractCircMgr<build::CircuitBuilder<R>, R>>,
    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
    preemptive: Arc<Mutex<PreemptiveCircuitPredictor>>,
    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
}

impl<R: Runtime> CircMgr<R> {
@@ -194,7 +194,7 @@ impl<R: Runtime> CircMgr<R> {
        let mgr = mgr::AbstractCircMgr::new(builder, runtime.clone(), circuit_timing);
        let circmgr = Arc::new(CircMgr {
            mgr: Arc::new(mgr),
            preemptive,
            predictor: preemptive,
        });

        runtime.spawn(continually_expire_circuits(
@@ -267,13 +267,14 @@ impl<R: Runtime> CircMgr<R> {
        isolation: StreamIsolation,
    ) -> Result<Arc<ClientCirc>> {
        self.expire_circuits();
        let time = Instant::now();
        {
            let mut predictive = self.preemptive.lock().expect("preemptive lock poisoned");
            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
            if ports.is_empty() {
                predictive.note_usage(None);
                predictive.note_usage(None, time);
            } else {
                for port in ports.iter() {
                    predictive.note_usage(Some(*port));
                    predictive.note_usage(Some(*port), time);
                }
            }
        }
@@ -295,9 +296,9 @@ impl<R: Runtime> CircMgr<R> {
        if self.mgr.n_circs() >= PREEMPTIVE_CIRCUIT_THRESHOLD {
            return;
        }
        debug!("Launching circuits preemptively.");
        debug!("Checking preemptive circuit predictions.");
        let circs = {
            let preemptive = self.preemptive.lock().expect("preemptive lock poisoned");
            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
            preemptive.predict()
        };

@@ -307,7 +308,7 @@ impl<R: Runtime> CircMgr<R> {
        let results = futures::future::join_all(futures).await;
        for (i, result) in results.iter().enumerate() {
            match result {
                Ok(_) => debug!("Build succeeded for {:?}", circs[i]),
                Ok(_) => debug!("Circuit exists (or was created) for {:?}", circs[i]),
                Err(e) => warn!("Failed to build preemptive circuit {:?}: {}", circs[i], e),
            }
        }
+155 −7
Original line number Diff line number Diff line
@@ -94,7 +94,7 @@ pub(crate) trait AbstractSpec: Clone + Debug {
    /// `usage`.
    ///
    /// By default, this calls `abstract_spec_find_supported`.
    fn find_supported<'a, 'b, C>(
    fn find_supported<'a, 'b, C: AbstractCirc>(
        list: impl Iterator<Item = &'b mut OpenEntry<Self, C>>,
        usage: &Self::Usage,
    ) -> Vec<&'b mut OpenEntry<Self, C>> {
@@ -107,11 +107,11 @@ pub(crate) trait AbstractSpec: Clone + Debug {
///
/// This returns the all circuits in `list` for which `circuit.spec.supports(usage)` returns
/// `true`.
pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C>(
pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C: AbstractCirc>(
    list: impl Iterator<Item = &'b mut OpenEntry<S, C>>,
    usage: &S::Usage,
) -> Vec<&'b mut OpenEntry<S, C>> {
    list.filter(|circ| circ.spec.supports(usage)).collect()
    list.filter(|circ| circ.supports(usage)).collect()
}

/// Minimal abstract view of a circuit.
@@ -249,6 +249,7 @@ pub(crate) trait AbstractCircBuilder: Send + Sync {
/// All circuits start out "unused" and become "dirty" when their spec
/// is first restricted -- that is, when they are first handed out to be
/// used for a request.
#[derive(Debug, Clone, PartialEq)]
enum ExpirationInfo {
    /// The circuit has never been used.
    Unused {
@@ -278,9 +279,10 @@ impl ExpirationInfo {
}

/// An entry for an open circuit held by an `AbstractCircMgr`.
#[derive(PartialEq, Debug, Clone)]
pub(crate) struct OpenEntry<S, C> {
    /// Current AbstractCircSpec for this circuit's permitted usages.
    pub(crate) spec: S,
    spec: S,
    /// The circuit under management.
    circ: Arc<C>,
    /// When does this circuit expire?
@@ -640,7 +642,7 @@ pub(crate) struct AbstractCircMgr<B: AbstractCircBuilder, R: Runtime> {
    builder: B,
    /// An asynchronous runtime to use for launching tasks and
    /// checking timeouts.
    pub(crate) runtime: R,
    runtime: R,
    /// A CircList to manage our list of circuits, requests, and
    /// pending circuits.
    circs: sync::Mutex<CircList<B>>,
@@ -1145,9 +1147,11 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::Error;
    use crate::usage::{ExitPolicy, SupportedCircUsage};
    use crate::{Error, StreamIsolation, TargetCircUsage, TargetPort};
    use std::collections::BTreeSet;
    use std::sync::atomic::{self, AtomicUsize};
    use tor_netdir::testnet;
    use tor_rtcompat::SleepProvider;
    use tor_rtmock::MockSleepRuntime;
    use tracing::trace;
@@ -1165,7 +1169,7 @@ mod test {
        }
    }

    #[derive(Debug)]
    #[derive(Debug, PartialEq, Clone)]
    struct FakeCirc {
        id: FakeId,
    }
@@ -1784,4 +1788,148 @@ mod test {
            assert!(Arc::ptr_eq(&imap2, &imap1));
        });
    }

    /// Returns three exit policies; one that permits nothing, one that permits ports 80
    /// and 443 only, and one that permits all ports.
    fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
        // FIXME(eta): the below is copypasta; would be nice to have a better way of
        //             constructing ExitPolicy objects for testing maybe
        let network = testnet::construct_netdir()
            .unwrap()
            .unwrap_if_sufficient()
            .unwrap();

        // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
        // exits.  Odd-numbered ones allow only ports 80 and 443;
        // even-numbered ones allow all ports.
        let id_noexit = [0x05; 32].into();
        let id_webexit = [0x11; 32].into();
        let id_fullexit = [0x20; 32].into();

        let not_exit = network.by_id(&id_noexit).unwrap();
        let web_exit = network.by_id(&id_webexit).unwrap();
        let full_exit = network.by_id(&id_fullexit).unwrap();

        let ep_none = ExitPolicy::from_relay(&not_exit);
        let ep_web = ExitPolicy::from_relay(&web_exit);
        let ep_full = ExitPolicy::from_relay(&full_exit);
        (ep_none, ep_web, ep_full)
    }

    #[test]
    fn test_find_supported() {
        let (ep_none, ep_web, ep_full) = get_exit_policies();
        let fake_circ = Arc::new(FakeCirc { id: FakeId::next() });
        let expiration = ExpirationInfo::Unused {
            use_before: Instant::now() + Duration::from_secs(60 * 60),
        };

        let mut entry_none = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_none,
                isolation: None,
            },
            Arc::clone(&fake_circ),
            expiration.clone(),
        );
        let mut entry_none_c = entry_none.clone();
        let mut entry_web = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_web,
                isolation: None,
            },
            Arc::clone(&fake_circ),
            expiration.clone(),
        );
        let mut entry_web_c = entry_web.clone();
        let mut entry_full = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_full,
                isolation: None,
            },
            Arc::clone(&fake_circ),
            expiration,
        );
        let mut entry_full_c = entry_full.clone();

        let usage_web = TargetCircUsage::Exit {
            ports: vec![TargetPort::ipv4(80)],
            isolation: StreamIsolation::no_isolation(),
        };
        let empty: Vec<&OpenEntry<SupportedCircUsage, FakeCirc>> = vec![];

        assert_eq!(
            SupportedCircUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
            empty
        );

        // HACK(eta): We have to faff around with clones and such because
        //            `abstract_spec_find_supported` has a silly signature that involves `&mut`
        //            refs, which we can't have more than one of.

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_web
            ),
            vec![&mut entry_web_c]
        );

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_web
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );

        // Test preemptive circuit usage:

        let usage_preemptive_web = TargetCircUsage::Preemptive {
            port: Some(TargetPort::ipv4(80)),
        };
        let usage_preemptive_dns = TargetCircUsage::Preemptive { port: None };

        // shouldn't return anything unless there are >=2 circuits

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_dns
            ),
            empty
        );

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_dns
            ),
            vec![&mut entry_none_c, &mut entry_web_c]
        );

        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_preemptive_web
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );
    }
}
+76 −3
Original line number Diff line number Diff line
@@ -37,8 +37,81 @@ impl PreemptiveCircuitPredictor {
            .collect()
    }

    /// Note the use of a new port.
    pub(crate) fn note_usage(&mut self, port: Option<TargetPort>) {
        self.usages.insert(port, Instant::now());
    /// Note the use of a new port at the provided `time`.
    pub(crate) fn note_usage(&mut self, port: Option<TargetPort>, time: Instant) {
        self.usages.insert(port, time);
    }
}

#[cfg(test)]
mod test {
    use crate::{PreemptiveCircuitPredictor, TargetCircUsage, TargetPort};
    use std::time::{Duration, Instant};

    #[test]
    fn predicts_starting_ports() {
        let predictor = PreemptiveCircuitPredictor::new(vec![]);

        let mut results = predictor.predict();
        results.sort();
        assert_eq!(
            predictor.predict(),
            vec![TargetCircUsage::Preemptive { port: None }]
        );

        let predictor =
            PreemptiveCircuitPredictor::new(vec![TargetPort::ipv4(80), TargetPort::ipv6(80)]);

        let mut results = predictor.predict();
        results.sort();
        assert_eq!(
            results,
            vec![
                TargetCircUsage::Preemptive { port: None },
                TargetCircUsage::Preemptive {
                    port: Some(TargetPort::ipv4(80))
                },
                TargetCircUsage::Preemptive {
                    port: Some(TargetPort::ipv6(80))
                },
            ]
        )
    }

    #[test]
    fn predicts_used_ports() {
        let mut predictor = PreemptiveCircuitPredictor::new(vec![]);

        assert_eq!(
            predictor.predict(),
            vec![TargetCircUsage::Preemptive { port: None }]
        );

        predictor.note_usage(Some(TargetPort::ipv4(1234)), Instant::now());

        let mut results = predictor.predict();
        results.sort();
        assert_eq!(
            results,
            vec![
                TargetCircUsage::Preemptive { port: None },
                TargetCircUsage::Preemptive {
                    port: Some(TargetPort::ipv4(1234))
                }
            ]
        );
    }

    #[test]
    fn does_not_predict_old_ports() {
        let mut predictor = PreemptiveCircuitPredictor::new(vec![]);
        let more_than_an_hour_ago = Instant::now() - Duration::from_secs(60 * 60 + 1);

        predictor.note_usage(Some(TargetPort::ipv4(2345)), more_than_an_hour_ago);

        assert_eq!(
            predictor.predict(),
            vec![TargetCircUsage::Preemptive { port: None }]
        );
    }
}
+28 −15
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ use tor_netdir::Relay;
use tor_netdoc::types::policy::PortPolicy;
use tor_rtcompat::Runtime;

use crate::mgr::{abstract_spec_find_supported, OpenEntry};
use crate::mgr::{abstract_spec_find_supported, AbstractCirc, OpenEntry};
use crate::{Error, Result};

/// An exit policy, as supported by the last hop of a circuit.
@@ -27,7 +27,7 @@ pub(crate) struct ExitPolicy {
///
/// Ordinarily, this is a TCP port, plus a flag to indicate whether we
/// must support IPv4 or IPv6.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)]
pub struct TargetPort {
    /// True if this is a request to connect to an IPv6 address
    ipv6: bool,
@@ -106,7 +106,7 @@ impl TargetPort {
//
// This type is re-exported by `arti-client`: any changes to it must be
// reflected in `arti-client`'s version.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct IsolationToken(u64);

#[allow(clippy::new_without_default)]
@@ -143,7 +143,7 @@ impl IsolationToken {
///
/// If two streams are isolated from one another, they may not share
/// a circuit.
#[derive(Copy, Clone, Eq, Debug, PartialEq, derive_builder::Builder)]
#[derive(Copy, Clone, Eq, Debug, PartialEq, PartialOrd, Ord, derive_builder::Builder)]
pub struct StreamIsolation {
    /// Any isolation token set on the stream.
    #[builder(default = "IsolationToken::no_isolation()")]
@@ -207,18 +207,10 @@ impl ExitPolicy {
///
/// This type should stay internal to the circmgr crate for now: we'll probably
/// want to refactor it a lot.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum TargetCircUsage {
    /// Use for BEGINDIR-based non-anonymous directory connections
    Dir,
    /// Built preemptively, to reduce the likelihood that clients have to wait for a circuit
    /// to be available.
    Preemptive {
        /// A port the circuit has to allow, if specified.
        ///
        /// If this is `None`, we just want a circuit capable of doing DNS resolution.
        port: Option<TargetPort>,
    },
    /// Use to exit to one or more ports.
    Exit {
        /// List of ports the circuit has to allow.
@@ -231,6 +223,19 @@ pub(crate) enum TargetCircUsage {
    },
    /// For a circuit is only used for the purpose of building it.
    TimeoutTesting,
    /// For internal usage only: build a circuit preemptively, to reduce wait times.
    ///
    /// # Warning
    ///
    /// This **MUST NOT** be used by code outside of the preemptive circuit predictor. In
    /// particular, this usage doesn't support stream isolation, so using it to ask for
    /// circuits (for example, by passing it to `get_or_launch`) could be unsafe!
    Preemptive {
        /// A port the circuit has to allow, if specified.
        ///
        /// If this is `None`, we just want a circuit capable of doing DNS resolution.
        port: Option<TargetPort>,
    },
}

/// The purposes for which a circuit is usable.
@@ -347,7 +352,12 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
                i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true)
                    && p2.iter().all(|port| p1.allows_port(*port))
            }
            (Exit { policy, .. }, TargetCircUsage::Preemptive { port }) => {
            (Exit { policy, isolation }, TargetCircUsage::Preemptive { port }) => {
                if isolation.is_some() {
                    // If the circuit has a stream isolation token, we might not be able to use it
                    // for new streams that don't share it.
                    return false;
                }
                if let Some(p) = port {
                    policy.allows_port(*p)
                } else {
@@ -364,6 +374,9 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {

        match (self, usage) {
            (Dir, TargetCircUsage::Dir) => Ok(()),
            // This usage is only used to create circuits preemptively, and doesn't actually
            // correspond to any streams; accordingly, we don't need to modify the circuit's
            // acceptable usage at all.
            (Exit { .. }, TargetCircUsage::Preemptive { .. }) => Ok(()),
            (
                Exit {
@@ -385,7 +398,7 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage {
        }
    }

    fn find_supported<'a, 'b, C>(
    fn find_supported<'a, 'b, C: AbstractCirc>(
        list: impl Iterator<Item = &'b mut OpenEntry<Self, C>>,
        usage: &TargetCircUsage,
    ) -> Vec<&'b mut OpenEntry<Self, C>> {