From ee47a16697d93cfd49a0b44669831713dbe7f63f Mon Sep 17 00:00:00 2001
From: eta <tor@eta.st>
Date: Thu, 24 Mar 2022 14:07:40 +0000
Subject: [PATCH] tor-rtcompat/scheduler: add unit tests, FireIn -> FireAt

Addressing review comments: added some unit tests for the new scheduler
type, and made FireIn use an Instant instead (making it FireAt).
---
 crates/tor-rtcompat/src/lib.rs       |   3 +
 crates/tor-rtcompat/src/scheduler.rs | 146 +++++++++++++++++++++++++--
 2 files changed, 142 insertions(+), 7 deletions(-)

diff --git a/crates/tor-rtcompat/src/lib.rs b/crates/tor-rtcompat/src/lib.rs
index 2d05b14b56..33f3ed9f46 100644
--- a/crates/tor-rtcompat/src/lib.rs
+++ b/crates/tor-rtcompat/src/lib.rs
@@ -406,6 +406,9 @@ pub mod cond {
 ///
 /// (This is a macro so that it can repeat the closure as multiple separate
 /// expressions, so it can take on two different types, if needed.)
+//
+// NOTE(eta): changing this #[cfg] can affect tests inside this crate that use
+//            this macro, like in scheduler.rs
 #[macro_export]
 #[cfg(all(
     any(feature = "native-tls", feature = "rustls"),
diff --git a/crates/tor-rtcompat/src/scheduler.rs b/crates/tor-rtcompat/src/scheduler.rs
index 8eb7c22919..5512b4ee3e 100644
--- a/crates/tor-rtcompat/src/scheduler.rs
+++ b/crates/tor-rtcompat/src/scheduler.rs
@@ -7,7 +7,7 @@ use futures::{Stream, StreamExt};
 use std::future::Future;
 use std::pin::Pin;
 use std::task::{Context, Poll};
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
 use pin_project::pin_project;
 
@@ -16,8 +16,8 @@ use pin_project::pin_project;
 enum SchedulerCommand {
     /// Run the task now.
     Fire,
-    /// Run the task after the provided duration.
-    FireIn(Duration),
+    /// Run the task at the provided `Instant`.
+    FireAt(Instant),
     /// Cancel a pending execution, if there is one.
     Cancel,
 }
@@ -78,12 +78,12 @@ impl TaskHandle {
     pub fn fire(&self) -> bool {
         self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
     }
-    /// Trigger this handle's corresponding schedule after `dur`.
+    /// Trigger this handle's corresponding schedule at `instant`.
     ///
     /// Returns `true` if the schedule still exists, and `false` otherwise.
-    pub fn fire_in(&self, dur: Duration) -> bool {
+    pub fn fire_at(&self, instant: Instant) -> bool {
         self.tx
-            .unbounded_send(SchedulerCommand::FireIn(dur))
+            .unbounded_send(SchedulerCommand::FireAt(instant))
             .is_ok()
     }
     /// Cancel a pending firing of the handle's corresponding schedule.
@@ -104,7 +104,9 @@ impl<R: SleepProvider> TaskScheduleP<'_, R> {
                 *self.instant_fire = true;
                 *self.sleep = None;
             }
-            SchedulerCommand::FireIn(dur) => {
+            SchedulerCommand::FireAt(instant) => {
+                let now = self.rt.now();
+                let dur = instant.saturating_duration_since(now);
                 *self.instant_fire = false;
                 *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
             }
@@ -146,3 +148,133 @@ impl<R: SleepProvider> Stream for TaskSchedule<R> {
         Poll::Pending
     }
 }
+
+// test_with_all_runtimes! only exists if these features are satisfied.
+#[cfg(all(
+    test,
+    any(feature = "native-tls", feature = "rustls"),
+    any(feature = "tokio", feature = "async-std"),
+))]
+mod test {
+    use crate::scheduler::TaskSchedule;
+    use crate::{test_with_all_runtimes, SleepProvider};
+    use futures::FutureExt;
+    use futures::StreamExt;
+    use std::time::{Duration, Instant};
+
+    #[test]
+    fn it_fires_immediately() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, _hdl) = TaskSchedule::new(rt);
+            assert!(sch.next().now_or_never().is_some());
+        });
+    }
+
+    #[test]
+    #[allow(clippy::unwrap_used)]
+    fn it_dies_if_dropped() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt);
+            drop(hdl);
+            assert!(sch.next().now_or_never().unwrap().is_none());
+        });
+    }
+
+    #[test]
+    fn it_fires_on_demand() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt);
+            assert!(sch.next().now_or_never().is_some());
+
+            assert!(sch.next().now_or_never().is_none());
+            assert!(hdl.fire());
+            assert!(sch.next().now_or_never().is_some());
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+
+    #[test]
+    fn it_cancels_instant_firings() {
+        // NOTE(eta): this test very much assumes that unbounded channels will
+        //            transmit things instantly. If it breaks, that's probably why.
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt);
+            assert!(sch.next().now_or_never().is_some());
+
+            assert!(sch.next().now_or_never().is_none());
+            assert!(hdl.fire());
+            assert!(hdl.cancel());
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+
+    #[test]
+    fn it_fires_after_self_reschedule() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, _hdl) = TaskSchedule::new(rt);
+            assert!(sch.next().now_or_never().is_some());
+
+            sch.fire_in(Duration::from_millis(100));
+
+            assert!(sch.next().now_or_never().is_none());
+            assert!(sch.next().await.is_some());
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+
+    #[test]
+    fn it_fires_after_external_reschedule() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt);
+            assert!(sch.next().now_or_never().is_some());
+
+            hdl.fire_at(Instant::now() + Duration::from_millis(100));
+
+            assert!(sch.next().now_or_never().is_none());
+            assert!(sch.next().await.is_some());
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+
+    #[test]
+    fn it_cancels_delayed_firings() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
+            assert!(sch.next().now_or_never().is_some());
+
+            hdl.fire_at(Instant::now() + Duration::from_millis(100));
+
+            assert!(sch.next().now_or_never().is_none());
+
+            rt.sleep(Duration::from_millis(50)).await;
+
+            assert!(sch.next().now_or_never().is_none());
+
+            hdl.cancel();
+
+            assert!(sch.next().now_or_never().is_none());
+
+            rt.sleep(Duration::from_millis(100)).await;
+
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+
+    #[test]
+    fn last_fire_wins() {
+        test_with_all_runtimes!(|rt| async move {
+            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
+            assert!(sch.next().now_or_never().is_some());
+
+            hdl.fire_at(Instant::now() + Duration::from_millis(100));
+            hdl.fire();
+
+            assert!(sch.next().now_or_never().is_some());
+            assert!(sch.next().now_or_never().is_none());
+
+            rt.sleep(Duration::from_millis(150)).await;
+
+            assert!(sch.next().now_or_never().is_none());
+        });
+    }
+}
-- 
GitLab