Commit b0fd7cff authored by Nick Mathewson's avatar Nick Mathewson 🥔
Browse files

Merge remote-tracking branch 'origin/mr/102'

parents da450086 e8419abd
Loading
Loading
Loading
Loading
+25 −7
Original line number Diff line number Diff line
@@ -437,7 +437,6 @@ mod test {
    }

    #[test]
    #[ignore]
    // TODO: re-enable this test after arti#149 is fixed. For now, it
    // is not reliable enough.
    fn test_double_timeout() {
@@ -448,22 +447,29 @@ mod test {
            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
        }

        test_with_all_runtimes!(|rt| async move {
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
        test_with_all_runtimes!(|rto| async move {
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that's ready immediately.
            let x = double_timeout(&rt, async { Ok(3_u32) }, t1, t10).await;
            assert!(x.is_ok());
            assert_eq!(x.unwrap(), 3_u32);

            eprintln!("acquiesce after test1");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that's ready after a short delay.
            let rt_clone = rt.clone();
            // (We only want the short delay to fire, not any of the other timeouts.)
            rt_clone.block_advance("manually controlling advances");
            let x = rt
                .wait_for(double_timeout(
                    &rt,
                    async move {
                        dbg!("A");
                        rt_clone.sleep(Duration::from_millis(0)).await;
                        let sl = rt_clone.sleep(Duration::from_millis(100));
                        rt_clone.allow_one_advance(Duration::from_millis(100));
                        sl.await;
                        dbg!("B");
                        Ok(4_u32)
                    },
@@ -475,16 +481,22 @@ mod test {
            assert!(x.is_ok());
            assert_eq!(x.unwrap(), 4_u32);

            eprintln!("acquiesce after test2");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that passes the first timeout, and make sure that
            // it keeps running after it times out.
            let rt_clone = rt.clone();
            let (snd, rcv) = oneshot::channel();
            let start = rt.now();
            rt.block_advance("manually controlling advances");
            let x = rt
                .wait_for(double_timeout(
                    &rt,
                    async move {
                        rt_clone.sleep(Duration::from_secs(2)).await;
                        let sl = rt_clone.sleep(Duration::from_secs(2));
                        rt_clone.allow_one_advance(Duration::from_secs(2));
                        sl.await;
                        snd.send(()).unwrap();
                        Ok(4_u32)
                    },
@@ -498,10 +510,16 @@ mod test {
            let waited = rt.wait_for(rcv).await;
            assert_eq!(waited, Ok(()));

            eprintln!("acquiesce after test3");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that times out and gets abandoned.
            let rt_clone = rt.clone();
            rt.block_advance("manually controlling advances");
            let (snd, rcv) = oneshot::channel();
            let start = rt.now();
            // Let it hit the first timeout...
            rt.allow_one_advance(Duration::from_secs(1));
            let x = rt
                .wait_for(double_timeout(
                    &rt,
@@ -516,13 +534,13 @@ mod test {
                .await;
            assert!(matches!(x, Err(Error::CircTimeout)));
            let end = rt.now();
            // ...and let it hit the second, too.
            rt.allow_one_advance(Duration::from_secs(9));
            let waited = rt.wait_for(rcv).await;
            assert!(waited.is_err());
            let end2 = rt.now();
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
            dbg!(end2, start, end2 - start);
            // This test is not reliable under test coverage; see arti#149.
            #[cfg(not(tarpaulin))]
            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
        });
    }
+3 −1
Original line number Diff line number Diff line
//! Implement traits from [`crate::mgr`] for the circuit types we use.

use crate::mgr::{self};
use crate::mgr::{self, MockablePlan};
use crate::path::OwnedPath;
use crate::usage::{SupportedCircUsage, TargetCircUsage};
use crate::{DirInfo, Error, Result};
@@ -41,6 +41,8 @@ pub(crate) struct Plan {
    guard_usable: Option<tor_guardmgr::GuardUsable>,
}

impl MockablePlan for Plan {}

#[async_trait]
impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilder<R> {
    type Circ = ClientCirc;
+51 −7
Original line number Diff line number Diff line
@@ -112,6 +112,15 @@ pub(crate) trait AbstractCirc: Debug {
    fn usable(&self) -> bool;
}

/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
///
/// You should implement this trait using all default methods for all code that isn't test code.
pub(crate) trait MockablePlan {
    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
    /// so that it knows what to pass to `::release_advance()`.
    fn add_blocked_advance_reason(&mut self, _reason: String) {}
}

/// An object that knows how to build circuits.
///
/// AbstractCircBuilder creates circuits in two phases.  First, a plan is
@@ -133,7 +142,9 @@ pub(crate) trait AbstractCircBuilder: Send + Sync {
    // TODO: It would be nice to have this parameterized on a lifetime,
    // and have that lifetime depend on the lifetime of the directory.
    // But I don't think that rust can do that.
    type Plan: Send;

    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
    type Plan: Send + MockablePlan;

    // TODO: I'd like to have a Dir type here to represent
    // create::DirInfo, but that would need to be parameterized too,
@@ -816,7 +827,9 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
    ) -> std::result::Result<Arc<B::Circ>, RetryError<Box<Error>>> {
        // Get or make a stream of futures to wait on.
        let wait_on_stream = match act {
            Action::Open(c) => return Ok(c),
            Action::Open(c) => {
                return Ok(c);
            }
            Action::Wait(f) => f,
            Action::Build(plans) => {
                let futures = FuturesUnordered::new();
@@ -963,7 +976,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
    ) -> Shared<oneshot::Receiver<PendResult<B>>> {
        let _ = usage; // Currently unused.
        let CircBuildPlan {
            plan,
            mut plan,
            sender,
            pending,
        } = plan;
@@ -972,9 +985,18 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
        let runtime = self.runtime.clone();
        let runtime_copy = self.runtime.clone();

        let tid = rand::random::<u64>();
        // We release this block when the circuit builder task terminates.
        let reason = format!("circuit builder task {}", tid);
        runtime.block_advance(reason.clone());
        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
        // correctly.
        plan.add_blocked_advance_reason(reason);

        runtime
            .spawn(async move {
                let outcome = self.builder.build_circuit(plan).await;

                let (new_spec, reply) = match outcome {
                    Err(e) => (None, Err(e)),
                    Ok((new_spec, circ)) => {
@@ -1014,7 +1036,9 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
                    // specifically intended for a request a little more time
                    // to finish, before we offer it this circuit instead.
                    let briefly = self.request_timing.request_loyalty;
                    runtime_copy.sleep(briefly).await;
                    let sl = runtime_copy.sleep(briefly);
                    runtime_copy.allow_one_advance(briefly);
                    sl.await;

                    let pending = {
                        let list = self.circs.lock().expect("poisoned lock");
@@ -1024,6 +1048,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
                        let _ = pending_request.notify.clone().try_send(reply.clone());
                    }
                }
                runtime_copy.release_advance(format!("circuit builder task {}", tid));
            })
            .expect("Couldn't spawn circuit-building task");

@@ -1210,10 +1235,19 @@ mod test {
        Fail,
        Delay(Duration),
        Timeout,
        TimeoutReleaseAdvance(String),
        NoPlan,
        WrongSpec(FakeSpec),
    }

    impl MockablePlan for FakePlan {
        fn add_blocked_advance_reason(&mut self, reason: String) {
            if let FakeOp::Timeout = self.op {
                self.op = FakeOp::TimeoutReleaseAdvance(reason);
            }
        }
    }

    const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30);

    static DI_EMPTY: [tor_netdir::fallback::FallbackDir; 0] = [];
@@ -1242,16 +1276,23 @@ mod test {

        async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, Arc<FakeCirc>)> {
            let op = plan.op;
            self.runtime.sleep(FAKE_CIRC_DELAY).await;
            let sl = self.runtime.sleep(FAKE_CIRC_DELAY);
            self.runtime.allow_one_advance(FAKE_CIRC_DELAY);
            sl.await;
            match op {
                FakeOp::Succeed => Ok((plan.spec, Arc::new(FakeCirc { id: FakeId::next() }))),
                FakeOp::WrongSpec(s) => Ok((s, Arc::new(FakeCirc { id: FakeId::next() }))),
                FakeOp::Fail => Err(Error::PendingFailed),
                FakeOp::Delay(d) => {
                    self.runtime.sleep(d).await;
                    let sl = self.runtime.sleep(d);
                    self.runtime.allow_one_advance(d);
                    sl.await;
                    Err(Error::PendingFailed)
                }
                FakeOp::Timeout => {
                FakeOp::Timeout => unreachable!(), // should be converted to the below
                FakeOp::TimeoutReleaseAdvance(reason) => {
                    eprintln!("releasing advance to fake a timeout");
                    self.runtime.release_advance(reason);
                    let () = futures::future::pending().await;
                    unreachable!()
                }
@@ -1514,6 +1555,9 @@ mod test {
                CircuitTiming::default(),
            ));

            // This test doesn't exercise any timeout behaviour.
            rt.block_advance("test doesn't require advancing");

            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports, di()),
+16 −0
Original line number Diff line number Diff line
@@ -68,6 +68,22 @@ pub trait SleepProvider {
    fn wallclock(&self) -> SystemTime {
        SystemTime::now()
    }

    /// Signify that a test running under mock time shouldn't advance time yet, with a given
    /// unique reason string. This is useful for making sure (mock) time doesn't advance while
    /// things that might require some (real-world) time to complete do so, such as spawning a task
    /// on another thread.
    ///
    /// Call `release_advance` with the same reason string in order to unblock.
    fn block_advance<T: Into<String>>(&self, _reason: T) {}

    /// Signify that the reason to withhold time advancing provided in a call to `block_advance` no
    /// longer exists, and it's fine to move time forward if nothing else is blocking advances.
    fn release_advance<T: Into<String>>(&self, _reason: T) {}

    /// Allow a test running under mock time to advance time by the provided duration, even if the
    /// above `block_advance` API has been used.
    fn allow_one_advance(&self, _dur: Duration) {}
}

/// Trait for a runtime that can block on a future.
+55 −39
Original line number Diff line number Diff line
@@ -47,12 +47,32 @@ impl<R: Runtime> MockSleepRuntime<R> {
    pub fn jump_to(&self, new_wallclock: SystemTime) {
        self.sleep.jump_to(new_wallclock);
    }
    /// Advance time one millisecond at a time until the provided
    /// future is ready.
    /// Run a future under mock time, advancing time forward where necessary until it completes.
    /// Users of this function should read the whole of this documentation before using!
    ///
    /// The returned future will run `fut`, expecting it to create `Sleeping` futures (as returned
    /// by `MockSleepProvider::sleep()` and similar functions). When all such created futures have
    /// been polled (indicating the future is waiting on them), time will be advanced in order that
    /// the first (or only) of said futures returns `Ready`. This process then repeats until `fut`
    /// returns `Ready` itself (as in, the returned wrapper future will wait for all created
    /// `Sleeping` futures to be polled, and advance time again).
    ///
    /// **Note:** The above described algorithm interacts poorly with futures that spawn
    /// asynchronous background tasks, or otherwise expect work to complete in the background
    /// before time is advanced. These futures will need to make use of the
    /// `SleepProvider::block_advance` (and similar) APIs in order to prevent time advancing while
    /// said tasks complete; see the documentation for those APIs for more detail.
    ///
    /// # Panics
    ///
    /// Panics if another `WaitFor` future is already running. (If two ran simultaneously, they
    /// would both try and advance the same mock time clock, which would be bad.)
    pub fn wait_for<F: futures::Future>(&self, fut: F) -> WaitFor<F> {
        if self.sleep.has_waitfor_waker() {
            panic!("attempted to call MockSleepRuntime::wait_for while another WaitFor is active");
        }
        WaitFor {
            sleep: self.sleep.clone(),
            yielding: 0,
            fut,
        }
    }
@@ -102,16 +122,23 @@ impl<R: Runtime> SleepProvider for MockSleepRuntime<R> {
    fn wallclock(&self) -> SystemTime {
        self.sleep.wallclock()
    }
    fn block_advance<T: Into<String>>(&self, reason: T) {
        self.sleep.block_advance(reason)
    }
    fn release_advance<T: Into<String>>(&self, reason: T) {
        self.sleep.release_advance(reason)
    }
    fn allow_one_advance(&self, dur: Duration) {
        self.sleep.allow_one_advance(dur)
    }
}

/// A future that advances time until another future is ready to complete.
#[pin_project]
pub struct WaitFor<F: Future> {
pub struct WaitFor<F> {
    /// A reference to the sleep provider that's simulating time for us.
    #[pin]
    sleep: MockSleepProvider,
    /// Nonzero if we just found that this inner future is pending, and we
    /// should yield to give other futures a chance to run.
    yielding: u8,
    /// The future that we're waiting for.
    #[pin]
    fut: F,
@@ -124,42 +151,31 @@ impl<F: Future> Future for WaitFor<F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        eprintln!("waitfor poll");
        let mut this = self.project();
        this.sleep.register_waitfor_waker(cx.waker().clone());

        if *this.yielding > 0 {
            *this.yielding -= 1;
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }
        if let Poll::Ready(r) = this.fut.poll(cx) {
            eprintln!("waitfor done!");
            this.sleep.clear_waitfor_waker();
            return Poll::Ready(r);
        }
        eprintln!("waitfor poll complete");

        // TODO: This increment is unpleasantly short, and slows down
        // the tests that run this future.  But if I increase it, this
        // future doesn't yield enough for other futures to run, and
        // some of the tests in tor-circmgr give bad results.
        //
        // We should resolve this issue; see ticket #149.
        #[cfg(tarpaulin)]
        let high_bound = Duration::from_micros(100);
        #[cfg(tarpaulin)]
        let yield_count = 100;
        #[cfg(not(tarpaulin))]
        let high_bound = Duration::from_millis(1);
        #[cfg(not(tarpaulin))]
        let yield_count = 3;

        let low_bound = Duration::from_micros(10);
        let duration = this
            .sleep
            .time_until_next_timeout()
            .map(|dur| (dur / 10).clamp(low_bound, high_bound))
            .unwrap_or(low_bound);

        if this.sleep.should_advance() {
            if let Some(duration) = this.sleep.time_until_next_timeout() {
                eprintln!("Advancing by {:?}", duration);
                this.sleep.advance_noyield(duration);
        *this.yielding = yield_count;
        cx.waker().wake_by_ref();
            } else {
                // If we get here, something's probably wedged and the test isn't going to complete
                // anyway: we were expecting to advance in order to make progress, but we can't.
                // If we don't panic, the test will just run forever, which is really annoying, so
                // just panic and fail quickly.
                panic!("WaitFor told to advance, but didn't have any duration to advance by");
            }
        } else {
            eprintln!("waiting for sleepers to advance");
        }
        Poll::Pending
    }
}
Loading