Commit e8419abd authored by eta's avatar eta
Browse files

Overhaul the way WaitFor and the MockSleepProvider work

Instead of racily advancing time forward, this commit attempts to rework
how WaitFor works, such that it makes advances when all sleeper futures
that have been created have been polled (by handing the MockSleepRuntime
a Waker with which to wake up the WaitFor).

The above described mechanics work well enough for the double timeout
test, but fail in the presence of code that spawns asynchronous /
background tasks that must make progress before time is advanced for the
test to work properly. In order to deal with these cases, a set of APIs
are introduced in order to block time from being advanced until some
code has run, and a carveout added in order to permit small advances in
time where required.

(In some cases, code needed to be hacked up a bit in order to be made
properly testable using these APIs; the `MockablePlan` trait included in
here is somewhat unfortunate.)

This should fix #149.
parent 650c5a35
Loading
Loading
Loading
Loading
+25 −7
Original line number Diff line number Diff line
@@ -434,7 +434,6 @@ mod test {
    }

    #[test]
    #[ignore]
    // TODO: re-enable this test after arti#149 is fixed. For now, it
    // is not reliable enough.
    fn test_double_timeout() {
@@ -445,22 +444,29 @@ mod test {
            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
        }

        test_with_all_runtimes!(|rt| async move {
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
        test_with_all_runtimes!(|rto| async move {
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that's ready immediately.
            let x = double_timeout(&rt, async { Ok(3_u32) }, t1, t10).await;
            assert!(x.is_ok());
            assert_eq!(x.unwrap(), 3_u32);

            eprintln!("acquiesce after test1");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that's ready after a short delay.
            let rt_clone = rt.clone();
            // (We only want the short delay to fire, not any of the other timeouts.)
            rt_clone.block_advance("manually controlling advances");
            let x = rt
                .wait_for(double_timeout(
                    &rt,
                    async move {
                        dbg!("A");
                        rt_clone.sleep(Duration::from_millis(0)).await;
                        let sl = rt_clone.sleep(Duration::from_millis(100));
                        rt_clone.allow_one_advance(Duration::from_millis(100));
                        sl.await;
                        dbg!("B");
                        Ok(4_u32)
                    },
@@ -472,16 +478,22 @@ mod test {
            assert!(x.is_ok());
            assert_eq!(x.unwrap(), 4_u32);

            eprintln!("acquiesce after test2");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that passes the first timeout, and make sure that
            // it keeps running after it times out.
            let rt_clone = rt.clone();
            let (snd, rcv) = oneshot::channel();
            let start = rt.now();
            rt.block_advance("manually controlling advances");
            let x = rt
                .wait_for(double_timeout(
                    &rt,
                    async move {
                        rt_clone.sleep(Duration::from_secs(2)).await;
                        let sl = rt_clone.sleep(Duration::from_secs(2));
                        rt_clone.allow_one_advance(Duration::from_secs(2));
                        sl.await;
                        snd.send(()).unwrap();
                        Ok(4_u32)
                    },
@@ -495,10 +507,16 @@ mod test {
            let waited = rt.wait_for(rcv).await;
            assert_eq!(waited, Ok(()));

            eprintln!("acquiesce after test3");
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());

            // Try a future that times out and gets abandoned.
            let rt_clone = rt.clone();
            rt.block_advance("manually controlling advances");
            let (snd, rcv) = oneshot::channel();
            let start = rt.now();
            // Let it hit the first timeout...
            rt.allow_one_advance(Duration::from_secs(1));
            let x = rt
                .wait_for(double_timeout(
                    &rt,
@@ -513,13 +531,13 @@ mod test {
                .await;
            assert!(matches!(x, Err(Error::CircTimeout)));
            let end = rt.now();
            // ...and let it hit the second, too.
            rt.allow_one_advance(Duration::from_secs(9));
            let waited = rt.wait_for(rcv).await;
            assert!(waited.is_err());
            let end2 = rt.now();
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
            dbg!(end2, start, end2 - start);
            // This test is not reliable under test coverage; see arti#149.
            #[cfg(not(tarpaulin))]
            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
        });
    }
+3 −1
Original line number Diff line number Diff line
//! Implement traits from [`crate::mgr`] for the circuit types we use.

use crate::mgr::{self};
use crate::mgr::{self, MockablePlan};
use crate::path::OwnedPath;
use crate::usage::{SupportedCircUsage, TargetCircUsage};
use crate::{DirInfo, Error, Result};
@@ -41,6 +41,8 @@ pub(crate) struct Plan {
    guard_usable: Option<tor_guardmgr::GuardUsable>,
}

impl MockablePlan for Plan {}

#[async_trait]
impl<R: Runtime> crate::mgr::AbstractCircBuilder for crate::build::CircuitBuilder<R> {
    type Circ = ClientCirc;
+51 −7
Original line number Diff line number Diff line
@@ -112,6 +112,15 @@ pub(crate) trait AbstractCirc: Debug {
    fn usable(&self) -> bool;
}

/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
///
/// You should implement this trait using all default methods for all code that isn't test code.
pub(crate) trait MockablePlan {
    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
    /// so that it knows what to pass to `::release_advance()`.
    fn add_blocked_advance_reason(&mut self, _reason: String) {}
}

/// An object that knows how to build circuits.
///
/// AbstractCircBuilder creates circuits in two phases.  First, a plan is
@@ -133,7 +142,9 @@ pub(crate) trait AbstractCircBuilder: Send + Sync {
    // TODO: It would be nice to have this parameterized on a lifetime,
    // and have that lifetime depend on the lifetime of the directory.
    // But I don't think that rust can do that.
    type Plan: Send;

    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
    type Plan: Send + MockablePlan;

    // TODO: I'd like to have a Dir type here to represent
    // create::DirInfo, but that would need to be parameterized too,
@@ -816,7 +827,9 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
    ) -> std::result::Result<Arc<B::Circ>, RetryError<Box<Error>>> {
        // Get or make a stream of futures to wait on.
        let wait_on_stream = match act {
            Action::Open(c) => return Ok(c),
            Action::Open(c) => {
                return Ok(c);
            }
            Action::Wait(f) => f,
            Action::Build(plans) => {
                let futures = FuturesUnordered::new();
@@ -963,7 +976,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
    ) -> Shared<oneshot::Receiver<PendResult<B>>> {
        let _ = usage; // Currently unused.
        let CircBuildPlan {
            plan,
            mut plan,
            sender,
            pending,
        } = plan;
@@ -972,9 +985,18 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
        let runtime = self.runtime.clone();
        let runtime_copy = self.runtime.clone();

        let tid = rand::random::<u64>();
        // We release this block when the circuit builder task terminates.
        let reason = format!("circuit builder task {}", tid);
        runtime.block_advance(reason.clone());
        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
        // correctly.
        plan.add_blocked_advance_reason(reason);

        runtime
            .spawn(async move {
                let outcome = self.builder.build_circuit(plan).await;

                let (new_spec, reply) = match outcome {
                    Err(e) => (None, Err(e)),
                    Ok((new_spec, circ)) => {
@@ -1014,7 +1036,9 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
                    // specifically intended for a request a little more time
                    // to finish, before we offer it this circuit instead.
                    let briefly = self.request_timing.request_loyalty;
                    runtime_copy.sleep(briefly).await;
                    let sl = runtime_copy.sleep(briefly);
                    runtime_copy.allow_one_advance(briefly);
                    sl.await;

                    let pending = {
                        let list = self.circs.lock().expect("poisoned lock");
@@ -1024,6 +1048,7 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
                        let _ = pending_request.notify.clone().try_send(reply.clone());
                    }
                }
                runtime_copy.release_advance(format!("circuit builder task {}", tid));
            })
            .expect("Couldn't spawn circuit-building task");

@@ -1210,10 +1235,19 @@ mod test {
        Fail,
        Delay(Duration),
        Timeout,
        TimeoutReleaseAdvance(String),
        NoPlan,
        WrongSpec(FakeSpec),
    }

    impl MockablePlan for FakePlan {
        fn add_blocked_advance_reason(&mut self, reason: String) {
            if let FakeOp::Timeout = self.op {
                self.op = FakeOp::TimeoutReleaseAdvance(reason);
            }
        }
    }

    const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30);

    static DI_EMPTY: [tor_netdir::fallback::FallbackDir; 0] = [];
@@ -1242,16 +1276,23 @@ mod test {

        async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, Arc<FakeCirc>)> {
            let op = plan.op;
            self.runtime.sleep(FAKE_CIRC_DELAY).await;
            let sl = self.runtime.sleep(FAKE_CIRC_DELAY);
            self.runtime.allow_one_advance(FAKE_CIRC_DELAY);
            sl.await;
            match op {
                FakeOp::Succeed => Ok((plan.spec, Arc::new(FakeCirc { id: FakeId::next() }))),
                FakeOp::WrongSpec(s) => Ok((s, Arc::new(FakeCirc { id: FakeId::next() }))),
                FakeOp::Fail => Err(Error::PendingFailed),
                FakeOp::Delay(d) => {
                    self.runtime.sleep(d).await;
                    let sl = self.runtime.sleep(d);
                    self.runtime.allow_one_advance(d);
                    sl.await;
                    Err(Error::PendingFailed)
                }
                FakeOp::Timeout => {
                FakeOp::Timeout => unreachable!(), // should be converted to the below
                FakeOp::TimeoutReleaseAdvance(reason) => {
                    eprintln!("releasing advance to fake a timeout");
                    self.runtime.release_advance(reason);
                    let () = futures::future::pending().await;
                    unreachable!()
                }
@@ -1514,6 +1555,9 @@ mod test {
                CircuitTiming::default(),
            ));

            // This test doesn't exercise any timeout behaviour.
            rt.block_advance("test doesn't require advancing");

            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports, di()),
+16 −0
Original line number Diff line number Diff line
@@ -68,6 +68,22 @@ pub trait SleepProvider {
    fn wallclock(&self) -> SystemTime {
        SystemTime::now()
    }

    /// Signify that a test running under mock time shouldn't advance time yet, with a given
    /// unique reason string. This is useful for making sure (mock) time doesn't advance while
    /// things that might require some (real-world) time to complete do so, such as spawning a task
    /// on another thread.
    ///
    /// Call `release_advance` with the same reason string in order to unblock.
    fn block_advance<T: Into<String>>(&self, _reason: T) {}

    /// Signify that the reason to withhold time advancing provided in a call to `block_advance` no
    /// longer exists, and it's fine to move time forward if nothing else is blocking advances.
    fn release_advance<T: Into<String>>(&self, _reason: T) {}

    /// Allow a test running under mock time to advance time by the provided duration, even if the
    /// above `block_advance` API has been used.
    fn allow_one_advance(&self, _dur: Duration) {}
}

/// Trait for a runtime that can block on a future.
+55 −39
Original line number Diff line number Diff line
@@ -47,12 +47,32 @@ impl<R: Runtime> MockSleepRuntime<R> {
    pub fn jump_to(&self, new_wallclock: SystemTime) {
        self.sleep.jump_to(new_wallclock);
    }
    /// Advance time one millisecond at a time until the provided
    /// future is ready.
    /// Run a future under mock time, advancing time forward where necessary until it completes.
    /// Users of this function should read the whole of this documentation before using!
    ///
    /// The returned future will run `fut`, expecting it to create `Sleeping` futures (as returned
    /// by `MockSleepProvider::sleep()` and similar functions). When all such created futures have
    /// been polled (indicating the future is waiting on them), time will be advanced in order that
    /// the first (or only) of said futures returns `Ready`. This process then repeats until `fut`
    /// returns `Ready` itself (as in, the returned wrapper future will wait for all created
    /// `Sleeping` futures to be polled, and advance time again).
    ///
    /// **Note:** The above described algorithm interacts poorly with futures that spawn
    /// asynchronous background tasks, or otherwise expect work to complete in the background
    /// before time is advanced. These futures will need to make use of the
    /// `SleepProvider::block_advance` (and similar) APIs in order to prevent time advancing while
    /// said tasks complete; see the documentation for those APIs for more detail.
    ///
    /// # Panics
    ///
    /// Panics if another `WaitFor` future is already running. (If two ran simultaneously, they
    /// would both try and advance the same mock time clock, which would be bad.)
    pub fn wait_for<F: futures::Future>(&self, fut: F) -> WaitFor<F> {
        if self.sleep.has_waitfor_waker() {
            panic!("attempted to call MockSleepRuntime::wait_for while another WaitFor is active");
        }
        WaitFor {
            sleep: self.sleep.clone(),
            yielding: 0,
            fut,
        }
    }
@@ -102,16 +122,23 @@ impl<R: Runtime> SleepProvider for MockSleepRuntime<R> {
    fn wallclock(&self) -> SystemTime {
        self.sleep.wallclock()
    }
    fn block_advance<T: Into<String>>(&self, reason: T) {
        self.sleep.block_advance(reason)
    }
    fn release_advance<T: Into<String>>(&self, reason: T) {
        self.sleep.release_advance(reason)
    }
    fn allow_one_advance(&self, dur: Duration) {
        self.sleep.allow_one_advance(dur)
    }
}

/// A future that advances time until another future is ready to complete.
#[pin_project]
pub struct WaitFor<F: Future> {
pub struct WaitFor<F> {
    /// A reference to the sleep provider that's simulating time for us.
    #[pin]
    sleep: MockSleepProvider,
    /// Nonzero if we just found that this inner future is pending, and we
    /// should yield to give other futures a chance to run.
    yielding: u8,
    /// The future that we're waiting for.
    #[pin]
    fut: F,
@@ -124,42 +151,31 @@ impl<F: Future> Future for WaitFor<F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        eprintln!("waitfor poll");
        let mut this = self.project();
        this.sleep.register_waitfor_waker(cx.waker().clone());

        if *this.yielding > 0 {
            *this.yielding -= 1;
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }
        if let Poll::Ready(r) = this.fut.poll(cx) {
            eprintln!("waitfor done!");
            this.sleep.clear_waitfor_waker();
            return Poll::Ready(r);
        }
        eprintln!("waitfor poll complete");

        // TODO: This increment is unpleasantly short, and slows down
        // the tests that run this future.  But if I increase it, this
        // future doesn't yield enough for other futures to run, and
        // some of the tests in tor-circmgr give bad results.
        //
        // We should resolve this issue; see ticket #149.
        #[cfg(tarpaulin)]
        let high_bound = Duration::from_micros(100);
        #[cfg(tarpaulin)]
        let yield_count = 100;
        #[cfg(not(tarpaulin))]
        let high_bound = Duration::from_millis(1);
        #[cfg(not(tarpaulin))]
        let yield_count = 3;

        let low_bound = Duration::from_micros(10);
        let duration = this
            .sleep
            .time_until_next_timeout()
            .map(|dur| (dur / 10).clamp(low_bound, high_bound))
            .unwrap_or(low_bound);

        if this.sleep.should_advance() {
            if let Some(duration) = this.sleep.time_until_next_timeout() {
                eprintln!("Advancing by {:?}", duration);
                this.sleep.advance_noyield(duration);
        *this.yielding = yield_count;
        cx.waker().wake_by_ref();
            } else {
                // If we get here, something's probably wedged and the test isn't going to complete
                // anyway: we were expecting to advance in order to make progress, but we can't.
                // If we don't panic, the test will just run forever, which is really annoying, so
                // just panic and fail quickly.
                panic!("WaitFor told to advance, but didn't have any duration to advance by");
            }
        } else {
            eprintln!("waiting for sleepers to advance");
        }
        Poll::Pending
    }
}
Loading