diff --git a/Cargo.lock b/Cargo.lock index f40d923ca9822e8ff4a0af0ed7d2be3ca8a1fe5d..9086a5f404369f0cd2db86019311423610282d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3129,6 +3129,7 @@ name = "tor-basic-utils" version = "0.1.0" dependencies = [ "educe", + "rand 0.8.5", ] [[package]] diff --git a/crates/tor-basic-utils/Cargo.toml b/crates/tor-basic-utils/Cargo.toml index e9ec806110616c5905f98c694e7d6806081b734e..9ae19c4c5f4debef4dfc11db195c6994718303ab 100644 --- a/crates/tor-basic-utils/Cargo.toml +++ b/crates/tor-basic-utils/Cargo.toml @@ -7,10 +7,12 @@ license = "MIT OR Apache-2.0" homepage = "https://gitlab.torproject.org/tpo/core/arti/-/wikis/home" description = "General helpers used by Tor" keywords = ["tor", "arti"] -categories = ["rust-patterns"] # We must put *something* here and this will do +# We must put *something* here and this will do +categories = ["rust-patterns"] repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [dependencies] +rand = "0.8" [dev-dependencies] educe = "0.4.6" diff --git a/crates/tor-basic-utils/src/lib.rs b/crates/tor-basic-utils/src/lib.rs index 811cb686690c1181c1655e81a732212a1e5f8e09..2c5ec8cd7996c1686a1eca386ad5b054002f0c24 100644 --- a/crates/tor-basic-utils/src/lib.rs +++ b/crates/tor-basic-utils/src/lib.rs @@ -41,6 +41,8 @@ use std::fmt; +pub mod retry; + // ---------------------------------------------------------------------- /// Function with the signature of `Debug::fmt` that just prints `".."` diff --git a/crates/tor-basic-utils/src/retry.rs b/crates/tor-basic-utils/src/retry.rs new file mode 100644 index 0000000000000000000000000000000000000000..b85bc203b8c03e4cb4c3ef48ce92c124e8b3799a --- /dev/null +++ b/crates/tor-basic-utils/src/retry.rs @@ -0,0 +1,159 @@ +//! An implementation of the "decorrelated jitter" algorithm for scheduling retries. +//! +//! See [`RetryDelay`] for more information. + +use std::time::Duration; + +use rand::Rng; + +/// An implementation for retrying a remote operation based on a [decorrelated +/// jitter] schedule. +/// +/// The algorithm used here has several desirable properties: +/// * It is randomized, so that multiple timeouts don't have a danger of +/// getting synchronized with each other and hammering the same servers all +/// at once. +/// * It tends on average to wait longer and longer over time, so that if the +/// server is down, it won't get pummeled by a zillion failing clients +/// when it comes back up. +/// * It has a chance of retrying promptly, which results in better client +/// performance on average. +/// +/// For a more full specification, see [`dir-spec.txt`]. +/// +/// [decorrelated jitter]: +/// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ +/// [`dir-spec.txt`]: https://spec.torproject.org/dir-spec +#[derive(Clone, Debug)] +pub struct RetryDelay { + /// The last delay that this retry delay returned (in msec), or 0 + /// if this never returned a delay. + last_delay_ms: u32, + /// The lowest allowable delay (in msec). + low_bound_ms: u32, +} + +/// Lowest possible lower bound, in milliseconds. +// We're doing this in MS, and Tor does it in seconds, so I'm +// multiplying the minimum by 1000 here. +const MIN_LOW_BOUND: u32 = 1000; + +/// Largest possible lower bound, in milliseconds. +const MAX_LOW_BOUND: u32 = std::u32::MAX - 1; + +/// Maximum amount to multiply the previous delay by. +const MAX_DELAY_MULT: u32 = 3; + +impl RetryDelay { + /// Construct a new RetryDelay from a given base delay in + /// milliseconds. + /// + /// The base delay defines the lowest possible interval that can + /// be returned. + /// + /// # Limitations + /// + /// If the base delay is less than 1000, a base delay of 1000 is + /// used instead, since that's what the C tor implementation does. + pub fn from_msec(base_delay_msec: u32) -> Self { + let low_bound_ms = base_delay_msec.clamp(MIN_LOW_BOUND, MAX_LOW_BOUND); + RetryDelay { + last_delay_ms: 0, + low_bound_ms, + } + } + + /// Construct a new RetryDelay from a given base delay. + /// + /// See from_msec for more information. + pub fn from_duration(d: Duration) -> Self { + let msec = d.as_millis(); + let msec = std::cmp::min(msec, u128::from(MAX_LOW_BOUND)) as u32; + RetryDelay::from_msec(msec) + } + + /// Helper: Return a lower and upper bound for the next delay to + /// be yielded. + /// + /// Values are in milliseconds. + fn delay_bounds(&self) -> (u32, u32) { + let low = self.low_bound_ms; + let high = std::cmp::max( + // We don't need a saturating_add here, since low is always + // less than high, so low cannot be equal to u32::MAX. + low + 1, + self.last_delay_ms.saturating_mul(MAX_DELAY_MULT), + ); + (low, high) + } + + /// Return the next delay to be used (in milliseconds), according + /// to a given random number generator. + fn next_delay_msec<R: Rng>(&mut self, rng: &mut R) -> u32 { + let (low, high) = self.delay_bounds(); + assert!(low < high); + + let val = rng.gen_range(low..high); + self.last_delay_ms = val; + val + } + + /// Return the next delay to be used (as a [`Duration`]), + /// according to a given random number generator. + pub fn next_delay<R: Rng>(&mut self, rng: &mut R) -> Duration { + Duration::from_millis(u64::from(self.next_delay_msec(rng))) + } +} + +impl Default for RetryDelay { + fn default() -> Self { + RetryDelay::from_msec(0) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn init() { + let rd = RetryDelay::from_msec(2000); + assert_eq!(rd.last_delay_ms, 0); + assert_eq!(rd.low_bound_ms, 2000); + + let rd = RetryDelay::from_msec(0); + assert_eq!(rd.last_delay_ms, 0); + assert_eq!(rd.low_bound_ms, 1000); + + let rd = RetryDelay::from_duration(Duration::new(1, 500_000_000)); + assert_eq!(rd.last_delay_ms, 0); + assert_eq!(rd.low_bound_ms, 1500); + } + + #[test] + fn bounds() { + let mut rd = RetryDelay::from_msec(1000); + assert_eq!(rd.delay_bounds(), (1000, 1001)); + rd.last_delay_ms = 1500; + assert_eq!(rd.delay_bounds(), (1000, 4500)); + rd.last_delay_ms = 3_000_000_000; + assert_eq!(rd.delay_bounds(), (1000, std::u32::MAX)); + } + + #[test] + fn rng() { + let mut rd = RetryDelay::from_msec(50); + let real_low_bound = std::cmp::max(50, MIN_LOW_BOUND); + + let mut rng = rand::thread_rng(); + for _ in 1..100 { + let (b_lo, b_hi) = rd.delay_bounds(); + assert!(b_lo == real_low_bound); + assert!(b_hi > b_lo); + let delay = rd.next_delay(&mut rng).as_millis() as u32; + assert_eq!(delay, rd.last_delay_ms); + assert!(delay >= b_lo); + assert!(delay < b_hi); + } + } +} diff --git a/crates/tor-dirmgr/src/retry.rs b/crates/tor-dirmgr/src/retry.rs index 2262c86d674f1d77b5a02596bd73505c0076386a..ff6c75968547fcc7662de82690eec62aadcef7ed 100644 --- a/crates/tor-dirmgr/src/retry.rs +++ b/crates/tor-dirmgr/src/retry.rs @@ -1,113 +1,14 @@ -//! Implement a timer for retrying a single failed fetch or object, -//! using the [decorrelated jitter] algorithm. +//! Configure timers for a timer for retrying a single failed fetch or object. //! -//! For a more full specification, see [`dir-spec.txt`]. -//! -//! [decorrelated jitter]: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ -//! [`dir-spec.txt`]: https://spec.torproject.org/dir-spec +//! For a more information on the algorithm, see +//! [`RetryDelay`](tor_basic_utils::retry::RetryDelay). -use rand::Rng; use std::convert::TryInto; use std::num::{NonZeroU32, NonZeroU8}; use std::time::Duration; use serde::Deserialize; - -/// An implementation for retrying downloads based on a decorrelated jitter -/// schedule. -/// -/// The algorithm used here has several desirable properties: -/// * It is randomized, so that multiple timeouts don't have a -/// danger of getting synchronized with each other and hammering the -/// same directory servers all at once. -/// * It tends on average to wait longer and longer over time, so -/// that if the directory server is down, it won't get pummeled by -/// a zillion failing clients when it comes back up. -/// * It has a chance of retrying promptly, which results in better -/// client performance on average. -pub struct RetryDelay { - /// The last delay that this retry delay returned (in msec), or 0 - /// if this never returned a delay. - last_delay_ms: u32, - /// The lowest allowable delay (in msec). - low_bound_ms: u32, -} - -/// Lowest possible lower bound, in milliseconds. -// We're doing this in MS, and Tor does it in seconds, so I'm -// multiplying the minimum by 1000 here. -const MIN_LOW_BOUND: u32 = 1000; - -/// Largest possible lower bound, in milliseconds. -const MAX_LOW_BOUND: u32 = std::u32::MAX - 1; - -/// Maximum amount to multiply the previous delay by. -const MAX_DELAY_MULT: u32 = 3; - -impl RetryDelay { - /// Construct a new RetryDelay from a given base delay in - /// milliseconds. - /// - /// The base delay defines the lowest possible interval that can - /// be returned. - /// - /// # Limitations - /// - /// If the base delay is less than 1000, a base delay of 1000 is - /// used instead, since that's what the C tor implementation does. - pub fn from_msec(base_delay_msec: u32) -> Self { - let low_bound_ms = base_delay_msec.clamp(MIN_LOW_BOUND, MAX_LOW_BOUND); - RetryDelay { - last_delay_ms: 0, - low_bound_ms, - } - } - - /// Construct a new RetryDelay from a given base delay. - /// - /// See from_msec for more information. - pub fn from_duration(d: Duration) -> Self { - let msec = d.as_millis(); - let msec = std::cmp::min(msec, u128::from(MAX_LOW_BOUND)) as u32; - RetryDelay::from_msec(msec) - } - - /// Helper: Return a lower and upper bound for the next delay to - /// be yielded. - fn delay_bounds(&self) -> (u32, u32) { - let low = self.low_bound_ms; - let high = std::cmp::max( - // We don't need a saturating_add here, since low is always - // less than high, so low cannot be equal to u32::MAX. - low + 1, - self.last_delay_ms.saturating_mul(MAX_DELAY_MULT), - ); - (low, high) - } - - /// Return the next delay to be used (in milliseconds), according - /// to a given random number generator. - pub fn next_delay_msec<R: Rng>(&mut self, rng: &mut R) -> u32 { - let (low, high) = self.delay_bounds(); - assert!(low < high); - - let val = rng.gen_range(low..high); - self.last_delay_ms = val; - val - } - - /// Return the next delay to be used (as a [`Duration`]), - /// according to a given random number generator. - pub fn next_delay<R: Rng>(&mut self, rng: &mut R) -> Duration { - Duration::from_millis(u64::from(self.next_delay_msec(rng))) - } -} - -impl Default for RetryDelay { - fn default() -> Self { - RetryDelay::from_msec(0) - } -} +use tor_basic_utils::retry::RetryDelay; /// Configuration for how many times to retry a download, with what /// frequency. @@ -194,70 +95,31 @@ impl DownloadSchedule { mod test { use super::*; - #[test] - fn init() { - let rd = RetryDelay::from_msec(2000); - assert_eq!(rd.last_delay_ms, 0); - assert_eq!(rd.low_bound_ms, 2000); - - let rd = RetryDelay::from_msec(0); - assert_eq!(rd.last_delay_ms, 0); - assert_eq!(rd.low_bound_ms, 1000); - - let rd = RetryDelay::from_duration(Duration::new(1, 500_000_000)); - assert_eq!(rd.last_delay_ms, 0); - assert_eq!(rd.low_bound_ms, 1500); - } - - #[test] - fn bounds() { - let mut rd = RetryDelay::from_msec(1000); - assert_eq!(rd.delay_bounds(), (1000, 1001)); - rd.last_delay_ms = 1500; - assert_eq!(rd.delay_bounds(), (1000, 4500)); - rd.last_delay_ms = 3_000_000_000; - assert_eq!(rd.delay_bounds(), (1000, std::u32::MAX)); - } - - #[test] - fn rng() { - let mut rd = RetryDelay::from_msec(50); - let real_low_bound = std::cmp::max(50, MIN_LOW_BOUND); - - let mut rng = rand::thread_rng(); - for _ in 1..100 { - let (b_lo, b_hi) = rd.delay_bounds(); - assert!(b_lo == real_low_bound); - assert!(b_hi > b_lo); - let delay = rd.next_delay(&mut rng).as_millis() as u32; - assert_eq!(delay, rd.last_delay_ms); - assert!(delay >= b_lo); - assert!(delay < b_hi); - } - } - #[test] fn config() { // default configuration is 3 tries, 1000 msec initial delay let cfg = DownloadSchedule::default(); + let one_sec = Duration::from_secs(1); + let zero_sec = Duration::from_secs(0); + let mut rng = rand::thread_rng(); assert_eq!(cfg.n_attempts(), 3); let v: Vec<_> = cfg.attempts().collect(); assert_eq!(&v[..], &[0, 1, 2]); - let sched = cfg.schedule(); - assert_eq!(sched.last_delay_ms, 0); - assert_eq!(sched.low_bound_ms, 1000); + assert_eq!(cfg.initial_delay, one_sec); + let mut sched = cfg.schedule(); + assert_eq!(sched.next_delay(&mut rng), one_sec); // Try a zero-attempt schedule, and have it get remapped to 1,1 - let cfg = DownloadSchedule::new(0, Duration::new(0, 0), 0); + let cfg = DownloadSchedule::new(0, zero_sec, 0); assert_eq!(cfg.n_attempts(), 1); assert_eq!(cfg.parallelism(), 1); let v: Vec<_> = cfg.attempts().collect(); assert_eq!(&v[..], &[0]); - let sched = cfg.schedule(); - assert_eq!(sched.last_delay_ms, 0); - assert_eq!(sched.low_bound_ms, 1000); + assert_eq!(cfg.initial_delay, zero_sec); + let mut sched = cfg.schedule(); + assert_eq!(sched.next_delay(&mut rng), one_sec); } }