diff --git a/Cargo.lock b/Cargo.lock index 6ca49916800b992cd2b1c9a2a72a3f638f4ab85d..d501a491a15fc11a50fa5e164ecc80784c608168 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,7 @@ dependencies = [ "tor-config", "tor-dirmgr", "tor-error", + "tor-guardmgr", "tor-netdoc", "tor-persist", "tor-proto", @@ -3250,6 +3251,7 @@ dependencies = [ "futures-await-test", "humantime-serde", "itertools", + "once_cell", "pin-project", "rand 0.8.5", "retry-error", @@ -3361,6 +3363,7 @@ dependencies = [ "tor-consdiff", "tor-dirclient", "tor-error", + "tor-guardmgr", "tor-llcrypto", "tor-netdir", "tor-netdoc", @@ -3396,6 +3399,7 @@ name = "tor-guardmgr" version = "0.1.0" dependencies = [ "derive_builder", + "derive_more", "educe", "futures", "humantime-serde", diff --git a/crates/arti-client/Cargo.toml b/crates/arti-client/Cargo.toml index 9a16b926522849c3da467586d7f58e52aed259ae..9cd780ab1efc57a7defefc3ef6c5a14128dce5d4 100644 --- a/crates/arti-client/Cargo.toml +++ b/crates/arti-client/Cargo.toml @@ -36,6 +36,7 @@ tor-config = { path = "../tor-config", version = "0.1.0" } tor-chanmgr = { path = "../tor-chanmgr", version = "0.1.0" } tor-dirmgr = { path = "../tor-dirmgr", version = "0.1.0" } tor-error = { path = "../tor-error", version = "0.1.0" } +tor-guardmgr = { path = "../tor-guardmgr", version = "0.1.0" } tor-netdoc = { path = "../tor-netdoc", version = "0.1.0" } tor-persist = { path = "../tor-persist", version = "0.1.0" } tor-proto = { path = "../tor-proto", version = "0.1.0" } @@ -47,19 +48,14 @@ derive_more = "0.99" directories = "4" educe = "0.4.6" futures = "0.3.14" -postage = { version = "0.4", default-features = false, features = [ - "futures-traits", -] } +postage = { version = "0.4", default-features = false, features = ["futures-traits"] } tracing = "0.1.18" serde = { version = "1.0.103", features = ["derive"] } thiserror = "1" pin-project = "1" [dev-dependencies] -tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = [ - "tokio", - "native-tls", -] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = ["tokio", "native-tls"] } tokio-crate = { package = "tokio", version = "1.7", features = [ "rt", "rt-multi-thread", diff --git a/crates/arti-client/src/config.rs b/crates/arti-client/src/config.rs index 98fe32936b2dea422a54c9605826735c2d5311bd..7997c27b3c2d68488e3db343213aaed2c598b33b 100644 --- a/crates/arti-client/src/config.rs +++ b/crates/arti-client/src/config.rs @@ -275,6 +275,12 @@ pub struct TorClientConfig { impl tor_circmgr::CircMgrConfig for TorClientConfig {} +impl AsRef<tor_guardmgr::fallback::FallbackList> for TorClientConfig { + fn as_ref(&self) -> &tor_guardmgr::fallback::FallbackList { + self.tor_network.fallback_caches() + } +} + impl Default for TorClientConfig { fn default() -> Self { Self::builder() diff --git a/crates/tor-circmgr/Cargo.toml b/crates/tor-circmgr/Cargo.toml index df95b0677a6e7f987bc863904445d65b60af63b5..4ca8d3fd329e55f8d810691854b133b1029095d1 100644 --- a/crates/tor-circmgr/Cargo.toml +++ b/crates/tor-circmgr/Cargo.toml @@ -6,9 +6,9 @@ edition = "2018" license = "MIT OR Apache-2.0" homepage = "https://gitlab.torproject.org/tpo/core/arti/-/wikis/home" description = "Manage a set of anonymous circuits over the Tor network" -keywords = [ "tor", "arti", "async" ] -categories = [ "network-programming", "cryptography" ] -repository="https://gitlab.torproject.org/tpo/core/arti.git/" +keywords = ["tor", "arti", "async"] +categories = ["network-programming", "cryptography"] +repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [features] # Enable experimental APIs that are not yet officially supported. @@ -18,18 +18,18 @@ repository="https://gitlab.torproject.org/tpo/core/arti.git/" experimental-api = [] [dependencies] -tor-basic-utils = { path = "../tor-basic-utils", version = "0.1.0"} -tor-chanmgr = { path="../tor-chanmgr", version = "0.1.0"} -tor-config = { path="../tor-config", version = "0.1.0"} -tor-error = { path="../tor-error", version = "0.1.0"} -tor-guardmgr = { path="../tor-guardmgr", version = "0.1.0"} -tor-netdir = { path="../tor-netdir", version = "0.1.0"} -tor-netdoc = { path="../tor-netdoc", version = "0.1.0"} -tor-proto = { path="../tor-proto", version = "0.1.0"} -retry-error = { path="../retry-error", version = "0.1.0"} -tor-linkspec = { path="../tor-linkspec", version = "0.1.0"} -tor-persist = { path="../tor-persist", version = "0.1.0"} -tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0"} +tor-basic-utils = { path = "../tor-basic-utils", version = "0.1.0" } +tor-chanmgr = { path = "../tor-chanmgr", version = "0.1.0" } +tor-config = { path = "../tor-config", version = "0.1.0" } +tor-error = { path = "../tor-error", version = "0.1.0" } +tor-guardmgr = { path = "../tor-guardmgr", version = "0.1.0" } +tor-netdir = { path = "../tor-netdir", version = "0.1.0" } +tor-netdoc = { path = "../tor-netdoc", version = "0.1.0" } +tor-proto = { path = "../tor-proto", version = "0.1.0" } +retry-error = { path = "../retry-error", version = "0.1.0" } +tor-linkspec = { path = "../tor-linkspec", version = "0.1.0" } +tor-persist = { path = "../tor-persist", version = "0.1.0" } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0" } async-trait = "0.1.2" bounded-vec-deque = "0.1" @@ -40,6 +40,7 @@ educe = "0.4.6" futures = "0.3.14" humantime-serde = "1.1.1" itertools = "0.10.1" +once_cell = "1" tracing = "0.1.18" pin-project = "1" rand = "0.8" @@ -50,9 +51,9 @@ weak-table = "0.3.0" [dev-dependencies] futures-await-test = "0.3.0" -tor-rtmock = { path="../tor-rtmock", version = "0.1.0"} -tor-guardmgr = { path="../tor-guardmgr", version = "0.1.0", features=["testing"]} -tor-llcrypto = { path="../tor-llcrypto", version = "0.1.0"} -tor-netdir = { path="../tor-netdir", version = "0.1.0", features=["testing"] } -tor-persist = { path="../tor-persist", version = "0.1.0", features=["testing"] } -tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0", features=["tokio", "native-tls" ] } +tor-rtmock = { path = "../tor-rtmock", version = "0.1.0" } +tor-guardmgr = { path = "../tor-guardmgr", version = "0.1.0", features = ["testing"] } +tor-llcrypto = { path = "../tor-llcrypto", version = "0.1.0" } +tor-netdir = { path = "../tor-netdir", version = "0.1.0", features = ["testing"] } +tor-persist = { path = "../tor-persist", version = "0.1.0", features = ["testing"] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = ["tokio", "native-tls"] } diff --git a/crates/tor-circmgr/src/config.rs b/crates/tor-circmgr/src/config.rs index ce715514f1fe7dd25ba14e41f6ef0b6063c51bae..2d50d0fc891df1426030b1ea82855e9b94a54bc7 100644 --- a/crates/tor-circmgr/src/config.rs +++ b/crates/tor-circmgr/src/config.rs @@ -6,6 +6,7 @@ use tor_basic_utils::define_accessor_trait; use tor_config::ConfigBuildError; +use tor_guardmgr::fallback::FallbackList; use derive_builder::Builder; use serde::Deserialize; @@ -281,6 +282,7 @@ define_accessor_trait! { path_rules: PathConfig, circuit_timing: CircuitTiming, preemptive_circuits: PreemptiveCircuitConfig, + fallbacks: FallbackList, } } diff --git a/crates/tor-circmgr/src/lib.rs b/crates/tor-circmgr/src/lib.rs index 26cd7a86607e7ad7175495d1daac1454ea8b736e..4dcac3133ff13160fa0bd5744934d23a3dd0b8b4 100644 --- a/crates/tor-circmgr/src/lib.rs +++ b/crates/tor-circmgr/src/lib.rs @@ -51,7 +51,8 @@ #![deny(clippy::unwrap_used)] use tor_chanmgr::ChanMgr; -use tor_netdir::{fallback::FallbackDir, DirEvent, NetDir, NetDirProvider}; +use tor_linkspec::ChanTarget; +use tor_netdir::{DirEvent, NetDir, NetDirProvider}; use tor_proto::circuit::{CircParameters, ClientCirc, UniqId}; use tor_rtcompat::Runtime; @@ -75,6 +76,7 @@ mod usage; pub use err::Error; pub use isolation::IsolationToken; +use tor_guardmgr::fallback::FallbackList; pub use usage::{TargetPort, TargetPorts}; pub use config::{ @@ -86,7 +88,7 @@ use crate::isolation::StreamIsolation; use crate::preemptive::PreemptiveCircuitPredictor; use usage::TargetCircUsage; -pub use tor_guardmgr::{ExternalFailure, GuardId}; +pub use tor_guardmgr::{ExternalActivity, FirstHopId}; use tor_persist::{FsStateMgr, StateMgr}; use tor_rtcompat::scheduler::{TaskHandle, TaskSchedule}; @@ -110,13 +112,16 @@ const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts"; #[non_exhaustive] pub enum DirInfo<'a> { /// A list of fallbacks, for use when we don't know a network directory. - Fallbacks(&'a [&'a FallbackDir]), + Fallbacks(&'a FallbackList), /// A complete network directory Directory(&'a NetDir), + /// No information: we can only build one-hop paths: and that, only if the + /// guard manager knows some guards or fallbacks. + Nothing, } -impl<'a> From<&'a [&'a FallbackDir]> for DirInfo<'a> { - fn from(v: &'a [&'a FallbackDir]) -> DirInfo<'a> { +impl<'a> From<&'a FallbackList> for DirInfo<'a> { + fn from(v: &'a FallbackList) -> DirInfo<'a> { DirInfo::Fallbacks(v) } } @@ -143,8 +148,8 @@ impl<'a> DirInfo<'a> { } match self { - DirInfo::Fallbacks(_) => from_netparams(&NetParameters::default()), DirInfo::Directory(d) => from_netparams(d.params()), + _ => from_netparams(&NetParameters::default()), } } } @@ -183,7 +188,11 @@ impl<R: Runtime> CircMgr<R> { config.preemptive_circuits().clone(), ))); - let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?; + let guardmgr = tor_guardmgr::GuardMgr::new( + runtime.clone(), + storage.clone(), + config.fallbacks().clone(), + )?; let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY); @@ -286,6 +295,11 @@ impl<R: Runtime> CircMgr<R> { return Ok(()); } + self.mgr + .peek_builder() + .guardmgr() + .replace_fallback_list(new_config.fallbacks().clone()); + let discard_circuits = !new_config .path_rules() .at_least_as_permissive_as(&old_path_rules); @@ -682,11 +696,31 @@ impl<R: Runtime> CircMgr<R> { /// Record that a failure occurred on a circuit with a given guard, in a way /// that makes us unwilling to use that guard for future circuits. - pub fn note_external_failure(&self, id: &GuardId, external_failure: ExternalFailure) { - self.mgr - .peek_builder() - .guardmgr() - .note_external_failure(id, external_failure); + /// + pub fn note_external_failure( + &self, + target: &impl ChanTarget, + external_failure: ExternalActivity, + ) { + self.mgr.peek_builder().guardmgr().note_external_failure( + target.ed_identity(), + target.rsa_identity(), + external_failure, + ); + } + + /// Record that a success occurred on a circuit with a given guard, in a way + /// that makes us possibly willing to use that guard for future circuits. + pub fn note_external_success( + &self, + target: &impl ChanTarget, + external_activity: ExternalActivity, + ) { + self.mgr.peek_builder().guardmgr().note_external_success( + target.ed_identity(), + target.rsa_identity(), + external_activity, + ); } } @@ -714,7 +748,8 @@ mod test { use tor_netdir::{MdReceiver, PartialNetDir}; use tor_netdoc::doc::netstatus::NetParams; // If it's just fallbackdir, we get the default parameters. - let di: DirInfo<'_> = (&[][..]).into(); + let fb = FallbackList::from([]); + let di: DirInfo<'_> = (&fb).into(); let p1 = di.circ_params(); assert!(!p1.extend_by_ed25519_id()); diff --git a/crates/tor-circmgr/src/mgr.rs b/crates/tor-circmgr/src/mgr.rs index 08ec587d9a86174a69da78af49d49486a70922bd..10e60614e0d22870a8a878789447c38ba8cd8daf 100644 --- a/crates/tor-circmgr/src/mgr.rs +++ b/crates/tor-circmgr/src/mgr.rs @@ -1335,9 +1335,11 @@ mod test { use crate::isolation::test::{assert_isoleq, IsolationTokenEq}; use crate::usage::{ExitPolicy, SupportedCircUsage}; use crate::{Error, StreamIsolation, TargetCircUsage, TargetPort}; + use once_cell::sync::Lazy; use std::collections::BTreeSet; use std::sync::atomic::{self, AtomicUsize}; use tor_error::bad_api_usage; + use tor_guardmgr::fallback::FallbackList; use tor_netdir::testnet; use tor_rtcompat::SleepProvider; use tor_rtmock::MockSleepRuntime; @@ -1463,10 +1465,10 @@ mod test { const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30); - static DI_EMPTY: [&tor_netdir::fallback::FallbackDir; 0] = []; + static FALLBACKS_EMPTY: Lazy<FallbackList> = Lazy::new(|| [].into()); fn di() -> DirInfo<'static> { - DI_EMPTY[..].into() + (&*FALLBACKS_EMPTY).into() } #[async_trait] diff --git a/crates/tor-circmgr/src/path.rs b/crates/tor-circmgr/src/path.rs index b02afa40f7621061f26878f4898bb5dd61b61fb9..b25d8b6dccb52f3272086fa1dc7c9b9214e5be21 100644 --- a/crates/tor-circmgr/src/path.rs +++ b/crates/tor-circmgr/src/path.rs @@ -7,8 +7,9 @@ pub mod dirpath; pub mod exitpath; use tor_error::bad_api_usage; +use tor_guardmgr::fallback::FallbackDir; use tor_linkspec::{OwnedChanTarget, OwnedCircTarget}; -use tor_netdir::{fallback::FallbackDir, Relay}; +use tor_netdir::Relay; use std::convert::TryFrom; @@ -31,6 +32,8 @@ enum TorPathInner<'a> { /// A single-hop path for use with a directory cache, when we don't have /// a consensus. FallbackOneHop(&'a FallbackDir), + /// A single-hop path taken from an OwnedChanTarget. + OwnedOneHop(OwnedChanTarget), /// A multi-hop path, containing one or more relays. Path(Vec<Relay<'a>>), } @@ -52,6 +55,14 @@ impl<'a> TorPath<'a> { } } + /// Construct a new one-hop path for directory use from an arbitrarily + /// chosen channel target. + pub fn new_one_hop_owned<T: tor_linkspec::ChanTarget>(target: &T) -> Self { + Self { + inner: TorPathInner::OwnedOneHop(OwnedChanTarget::from_chan_target(target)), + } + } + /// Create a new multi-hop path with a given number of ordered relays. pub fn new_multihop(relays: impl IntoIterator<Item = Relay<'a>>) -> Self { Self { @@ -81,6 +92,7 @@ impl<'a> TorPath<'a> { match &self.inner { OneHop(_) => 1, FallbackOneHop(_) => 1, + OwnedOneHop(_) => 1, Path(p) => p.len(), } } @@ -103,6 +115,7 @@ impl<'a> TryFrom<&TorPath<'a>> for OwnedPath { Ok(match &p.inner { FallbackOneHop(h) => OwnedPath::ChannelOnly(OwnedChanTarget::from_chan_target(*h)), OneHop(h) => OwnedPath::Normal(vec![OwnedCircTarget::from_circ_target(h)]), + OwnedOneHop(owned) => OwnedPath::ChannelOnly(owned.clone()), Path(p) if !p.is_empty() => { OwnedPath::Normal(p.iter().map(OwnedCircTarget::from_circ_target).collect()) } diff --git a/crates/tor-circmgr/src/path/dirpath.rs b/crates/tor-circmgr/src/path/dirpath.rs index bae973c38962dee30bf101bc5fcc1291efc2994e..62ff86e3264d9c14550648ade218c581a5d5671d 100644 --- a/crates/tor-circmgr/src/path/dirpath.rs +++ b/crates/tor-circmgr/src/path/dirpath.rs @@ -1,11 +1,12 @@ //! Code to construct paths to a directory for non-anonymous downloads use super::TorPath; use crate::{DirInfo, Error, Result}; +use tor_error::bad_api_usage; use tor_guardmgr::{GuardMgr, GuardMonitor, GuardUsable}; use tor_netdir::{Relay, WeightRole}; use tor_rtcompat::Runtime; -use rand::{seq::SliceRandom, Rng}; +use rand::Rng; /// A PathBuilder that can connect to a directory. #[non_exhaustive] @@ -32,11 +33,31 @@ impl DirPathBuilder { guards: Option<&GuardMgr<RT>>, ) -> Result<(TorPath<'a>, Option<GuardMonitor>, Option<GuardUsable>)> { match (netdir, guards) { - (DirInfo::Fallbacks(f), _) => { - let relay = f.choose(rng); - if let Some(r) = relay { - return Ok((TorPath::new_fallback_one_hop(r), None, None)); - } + (dirinfo, Some(guardmgr)) => { + // We use a guardmgr whenever we have one, regardless of whether + // there's a netdir. + // + // That way, we prefer our guards (if they're up) before we default to the fallback directories. + let netdir = match dirinfo { + DirInfo::Directory(netdir) => { + guardmgr.update_network(netdir); // possibly unnecessary. + Some(netdir) + } + _ => None, + }; + + let guard_usage = tor_guardmgr::GuardUsageBuilder::default() + .kind(tor_guardmgr::GuardUsageKind::OneHopDirectory) + .build() + .expect("Unable to build directory guard usage"); + let (guard, mon, usable) = guardmgr.select_guard(guard_usage, netdir)?; + return Ok((TorPath::new_one_hop_owned(&guard), Some(mon), Some(usable))); + } + + // In the following cases, we don't have a guardmgr, so we'll use the provided information if we can. + (DirInfo::Fallbacks(f), None) => { + let relay = f.choose(rng)?; + return Ok((TorPath::new_fallback_one_hop(relay), None, None)); } (DirInfo::Directory(netdir), None) => { let relay = netdir.pick_relay(rng, WeightRole::BeginDir, Relay::is_dir_cache); @@ -44,18 +65,11 @@ impl DirPathBuilder { return Ok((TorPath::new_one_hop(r), None, None)); } } - (DirInfo::Directory(netdir), Some(guardmgr)) => { - // TODO: We might want to use the guardmgr even if - // we don't have a netdir. See arti#220. - guardmgr.update_network(netdir); // possibly unnecessary. - let guard_usage = tor_guardmgr::GuardUsageBuilder::default() - .kind(tor_guardmgr::GuardUsageKind::OneHopDirectory) - .build() - .expect("Unable to build directory guard usage"); - let (guard, mon, usable) = guardmgr.select_guard(guard_usage, Some(netdir))?; - if let Some(r) = guard.get_relay(netdir) { - return Ok((TorPath::new_one_hop(r), Some(mon), Some(usable))); - } + (DirInfo::Nothing, None) => { + return Err(bad_api_usage!( + "Tried to build a one hop path with no directory, fallbacks, or guard manager" + ) + .into()); } } Err(Error::NoPath( @@ -72,8 +86,8 @@ mod test { use crate::path::assert_same_path_when_owned; use crate::test::OptDummyGuardMgr; use std::collections::HashSet; + use tor_guardmgr::fallback::{FallbackDir, FallbackList}; use tor_linkspec::ChanTarget; - use tor_netdir::fallback::FallbackDir; use tor_netdir::testnet; #[test] @@ -116,8 +130,8 @@ mod test { .build() .unwrap(), ]; - let fb: Vec<_> = fb_owned.iter().collect(); - let dirinfo = (&fb[..]).into(); + let fb: FallbackList = fb_owned.clone().into(); + let dirinfo = (&fb).into(); let mut rng = rand::thread_rng(); let guards: OptDummyGuardMgr<'_> = None; @@ -129,7 +143,7 @@ mod test { assert_same_path_when_owned(&p); if let crate::path::TorPathInner::FallbackOneHop(f) = p.inner { - assert!(std::ptr::eq(f, fb[0]) || std::ptr::eq(f, fb[1])); + assert!(f == &fb_owned[0] || f == &fb_owned[1]); } else { panic!("Generated the wrong kind of path."); } @@ -138,13 +152,19 @@ mod test { #[test] fn dirpath_no_fallbacks() { - let fb = vec![]; - let dirinfo = DirInfo::Fallbacks(&fb[..]); + let fb = FallbackList::from([]); + let dirinfo = DirInfo::Fallbacks(&fb); let mut rng = rand::thread_rng(); let guards: OptDummyGuardMgr<'_> = None; let err = DirPathBuilder::default().pick_path(&mut rng, dirinfo, guards); - assert!(matches!(err, Err(Error::NoPath(_)))); + dbg!(err.as_ref().err()); + assert!(matches!( + err, + Err(Error::Guard( + tor_guardmgr::PickGuardError::AllFallbacksDown { .. } + )) + )); } #[test] @@ -157,7 +177,7 @@ mod test { let mut rng = rand::thread_rng(); let dirinfo = (&netdir).into(); let statemgr = tor_persist::TestingStateMgr::new(); - let guards = tor_guardmgr::GuardMgr::new(rt.clone(), statemgr).unwrap(); + let guards = tor_guardmgr::GuardMgr::new(rt.clone(), statemgr, [].into()).unwrap(); guards.update_network(&netdir); let mut distinct_guards = HashSet::new(); @@ -168,7 +188,7 @@ mod test { let (path, mon, usable) = DirPathBuilder::new() .pick_path(&mut rng, dirinfo, Some(&guards)) .unwrap(); - if let crate::path::TorPathInner::OneHop(relay) = path.inner { + if let crate::path::TorPathInner::OwnedOneHop(relay) = path.inner { distinct_guards.insert(relay.ed_identity().clone()); mon.unwrap().succeeded(); assert!(usable.unwrap().await.unwrap()); diff --git a/crates/tor-circmgr/src/path/exitpath.rs b/crates/tor-circmgr/src/path/exitpath.rs index 17c2d50d7b9e654139ef4f5c216aa277710c06cb..df57dce9d869bf76beaeb1e2d2889065b5537f8b 100644 --- a/crates/tor-circmgr/src/path/exitpath.rs +++ b/crates/tor-circmgr/src/path/exitpath.rs @@ -125,13 +125,13 @@ impl<'a> ExitPathBuilder<'a> { now: SystemTime, ) -> Result<(TorPath<'a>, Option<GuardMonitor>, Option<GuardUsable>)> { let netdir = match netdir { - DirInfo::Fallbacks(_) => { + DirInfo::Directory(d) => d, + _ => { return Err(bad_api_usage!( - "Tried to build a multihop path using only a list of fallback caches" + "Tried to build a multihop path without a network directory" ) .into()) } - DirInfo::Directory(d) => d, }; let subnet_config = config.subnet_config(); let lifetime = netdir.lifetime(); @@ -389,7 +389,7 @@ mod test { let mut rng = rand::thread_rng(); let dirinfo = (&netdir).into(); let statemgr = tor_persist::TestingStateMgr::new(); - let guards = tor_guardmgr::GuardMgr::new(rt.clone(), statemgr).unwrap(); + let guards = tor_guardmgr::GuardMgr::new(rt.clone(), statemgr, [].into()).unwrap(); let config = PathConfig::default(); guards.update_network(&netdir); let port443 = TargetPort::ipv4(443); diff --git a/crates/tor-dirmgr/Cargo.toml b/crates/tor-dirmgr/Cargo.toml index 2778e7791da84f25d431da2ae5911c0c1df462fd..f5b03bbe67ed880a201365236709379f7fd1596d 100644 --- a/crates/tor-dirmgr/Cargo.toml +++ b/crates/tor-dirmgr/Cargo.toml @@ -33,6 +33,7 @@ tor-config = { path = "../tor-config", version = "0.1.0" } tor-consdiff = { path = "../tor-consdiff", version = "0.1.0" } tor-dirclient = { path = "../tor-dirclient", version = "0.1.0" } tor-error = { path = "../tor-error", version = "0.1.0" } +tor-guardmgr = { path = "../tor-guardmgr", version = "0.1.0" } tor-netdir = { path = "../tor-netdir", version = "0.1.0" } tor-netdoc = { path = "../tor-netdoc", version = "0.1.0" } tor-llcrypto = { path = "../tor-llcrypto", version = "0.1.0" } @@ -52,9 +53,7 @@ itertools = "0.10.1" tracing = "0.1.18" memmap2 = { version = "0.5.0", optional = true } once_cell = "1" -postage = { version = "0.4", default-features = false, features = [ - "futures-traits", -] } +postage = { version = "0.4", default-features = false, features = ["futures-traits"] } rand = "0.8" rusqlite = { version = "0.27.0", features = ["time"] } serde = { version = "1.0.103", features = ["derive"] } @@ -67,8 +66,5 @@ humantime-serde = "1.1.1" futures-await-test = "0.3.0" hex-literal = "0.3" tempfile = "3" -tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = [ - "tokio", - "native-tls", -] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = ["tokio", "native-tls"] } float_eq = "0.7" diff --git a/crates/tor-dirmgr/src/bootstrap.rs b/crates/tor-dirmgr/src/bootstrap.rs index 28974905cdd4a0e828c45b7a8d7b05b228f3228b..7c0d7f316a86d211d0557c66a8b118c3c113f5ef 100644 --- a/crates/tor-dirmgr/src/bootstrap.rs +++ b/crates/tor-dirmgr/src/bootstrap.rs @@ -59,14 +59,9 @@ async fn fetch_single<R: Runtime>( } let circmgr = dirmgr.circmgr()?; let cur_netdir = dirmgr.opt_netdir(); - let config = dirmgr.config.get(); - let fbs; let dirinfo = match cur_netdir { Some(ref netdir) => netdir.as_ref().into(), - None => { - fbs = config.fallbacks().iter().collect::<Vec<_>>(); - fbs[..].into() - } + None => tor_circmgr::DirInfo::Nothing, }; let outcome = tor_dirclient::get_resource(request.as_requestable(), dirinfo, &dirmgr.runtime, circmgr) @@ -210,7 +205,12 @@ async fn download_attempt<R: Runtime>( Ok(text) => { let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store)); match outcome { - Ok(b) => changed |= b, + Ok(b) => { + changed |= b; + if let Some(source) = source { + dirmgr.note_cache_success(&source); + } + } Err(e) => { warn!("error while adding directory info: {}", e); if let Some(source) = source { diff --git a/crates/tor-dirmgr/src/config.rs b/crates/tor-dirmgr/src/config.rs index ace0024c151fd7f197743ce20acd79fe8faf0172..7be49434e791ec73a319c71fabb3ae39cc6b7d74 100644 --- a/crates/tor-dirmgr/src/config.rs +++ b/crates/tor-dirmgr/src/config.rs @@ -12,19 +12,20 @@ use crate::retry::DownloadSchedule; use crate::storage::DynStore; use crate::{Authority, Result}; use tor_config::ConfigBuildError; -use tor_netdir::fallback::FallbackDir; use tor_netdoc::doc::netstatus; use derive_builder::Builder; -use std::path::PathBuf; - use serde::Deserialize; +use std::path::PathBuf; /// Configuration information about the Tor network itself; used as /// part of Arti's configuration. /// /// This type is immutable once constructed. To make one, use /// [`NetworkConfigBuilder`], or deserialize it from a string. +// +// TODO: We should move this type around, since the fallbacks part will no longer be used in +// dirmgr, but only in guardmgr. Probably this type belongs in `arti-client`. #[derive(Deserialize, Debug, Clone, Builder, Eq, PartialEq)] #[serde(deny_unknown_fields)] #[builder(build_fn(validate = "Self::validate", error = "ConfigBuildError"))] @@ -42,8 +43,8 @@ pub struct NetworkConfig { #[builder(default = "fallbacks::default_fallbacks()")] #[serde(rename = "fallback_caches")] #[builder_field_attr(serde(rename = "fallback_caches"))] - #[builder(setter(name = "fallback_caches"))] - pub(crate) fallbacks: Vec<FallbackDir>, + #[builder(setter(into, name = "fallback_caches"))] + pub(crate) fallbacks: tor_guardmgr::fallback::FallbackList, /// List of directory authorities which we expect to sign consensus /// documents. @@ -71,6 +72,11 @@ impl NetworkConfig { pub fn builder() -> NetworkConfigBuilder { NetworkConfigBuilder::default() } + + /// Return the list of fallback directory caches from this configuration. + pub fn fallback_caches(&self) -> &tor_guardmgr::fallback::FallbackList { + &self.fallbacks + } } impl NetworkConfigBuilder { @@ -230,7 +236,7 @@ impl DirMgrConfig { } /// Return the configured set of fallback directories - pub fn fallbacks(&self) -> &[FallbackDir] { + pub fn fallbacks(&self) -> &tor_guardmgr::fallback::FallbackList { &self.network_config.fallbacks } @@ -306,11 +312,11 @@ impl DownloadScheduleConfig { /// Helpers for initializing the fallback list. mod fallbacks { + use tor_guardmgr::fallback::{FallbackDir, FallbackList}; use tor_llcrypto::pk::{ed25519::Ed25519Identity, rsa::RsaIdentity}; - use tor_netdir::fallback::FallbackDir; /// Return a list of the default fallback directories shipped with /// arti. - pub(crate) fn default_fallbacks() -> Vec<super::FallbackDir> { + pub(crate) fn default_fallbacks() -> FallbackList { /// Build a fallback directory; panic if input is bad. fn fallback(rsa: &str, ed: &str, ports: &[&str]) -> FallbackDir { let rsa = RsaIdentity::from_hex(rsa).expect("Bad hex in built-in fallback list"); @@ -331,7 +337,7 @@ mod fallbacks { bld.build() .expect("Unable to build default fallback directory!?") } - include!("fallback_dirs.inc") + include!("fallback_dirs.inc").into() } } @@ -361,6 +367,8 @@ mod test { #[test] fn build_network() -> Result<()> { + use tor_guardmgr::fallback::FallbackDir; + let dflt = NetworkConfig::default(); // with nothing set, we get the default. diff --git a/crates/tor-dirmgr/src/lib.rs b/crates/tor-dirmgr/src/lib.rs index 707d6b3b614d963abab5f3723a68fa5f0488769f..d4f45587071d598c1344904d6b31f86a0e6e82f3 100644 --- a/crates/tor-dirmgr/src/lib.rs +++ b/crates/tor-dirmgr/src/lib.rs @@ -101,7 +101,7 @@ pub use docid::DocId; pub use err::Error; pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirStatus}; pub use storage::DocumentText; -pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder}; +pub use tor_guardmgr::fallback::{FallbackDir, FallbackDirBuilder}; /// A Result as returned by this crate. pub type Result<T> = std::result::Result<T, Error>; @@ -981,7 +981,7 @@ impl<R: Runtime> DirMgr<R> { /// Record that a problem has occurred because of a failure in an answer from `source`. fn note_cache_error(&self, source: &tor_dirclient::SourceInfo, problem: &Error) { - use tor_circmgr::{ExternalFailure, GuardId}; + use tor_circmgr::ExternalActivity; if !problem.indicates_cache_failure() { return; @@ -989,11 +989,20 @@ impl<R: Runtime> DirMgr<R> { if let Some(circmgr) = &self.circmgr { info!("Marking {:?} as failed: {}", source, problem); - let guard_id = GuardId::from_chan_target(source.cache_id()); - circmgr.note_external_failure(&guard_id, ExternalFailure::DirCache); + circmgr.note_external_failure(source.cache_id(), ExternalActivity::DirCache); circmgr.retire_circ(source.unique_circ_id()); } } + + /// Record that `source` has successfully given us some directory info. + fn note_cache_success(&self, source: &tor_dirclient::SourceInfo) { + use tor_circmgr::ExternalActivity; + + if let Some(circmgr) = &self.circmgr { + trace!("Marking {:?} as successful", source); + circmgr.note_external_success(source.cache_id(), ExternalActivity::DirCache); + } + } } /// A degree of readiness for a given directory state object. diff --git a/crates/tor-guardmgr/Cargo.toml b/crates/tor-guardmgr/Cargo.toml index 4579ee1aa14f1393e0ca47cc8204864cc1b9ac28..8a4b59bba272e1c314bed0641a05f2b9f0b43cbb 100644 --- a/crates/tor-guardmgr/Cargo.toml +++ b/crates/tor-guardmgr/Cargo.toml @@ -6,9 +6,9 @@ edition = "2018" license = "MIT OR Apache-2.0" homepage = "https://gitlab.torproject.org/tpo/core/arti/-/wikis/home" description = "Manage a set of guard relays for Tor network" -keywords = [ "tor", "arti", "async" ] -categories = [ "network-programming", "cryptography" ] -repository="https://gitlab.torproject.org/tpo/core/arti.git/" +keywords = ["tor", "arti", "async"] +categories = ["network-programming", "cryptography"] +repository = "https://gitlab.torproject.org/tpo/core/arti.git/" [features] default = [] @@ -18,18 +18,19 @@ default = [] testing = [] [dependencies] -tor-basic-utils = { path="../tor-basic-utils", version = "0.1.0"} -tor-config = { path="../tor-config", version = "0.1.0"} -tor-error = { path="../tor-error", version = "0.1.0"} -tor-netdir = { path="../tor-netdir", version = "0.1.0"} -tor-linkspec = { path="../tor-linkspec", version = "0.1.0"} -tor-llcrypto = { path="../tor-llcrypto", version = "0.1.0"} -tor-persist = { path="../tor-persist", version = "0.1.0"} -tor-proto = { path="../tor-proto", version = "0.1.0"} -tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0"} -tor-units = { path="../tor-units", version = "0.1.0"} +tor-basic-utils = { path = "../tor-basic-utils", version = "0.1.0" } +tor-config = { path = "../tor-config", version = "0.1.0" } +tor-error = { path = "../tor-error", version = "0.1.0" } +tor-netdir = { path = "../tor-netdir", version = "0.1.0" } +tor-linkspec = { path = "../tor-linkspec", version = "0.1.0" } +tor-llcrypto = { path = "../tor-llcrypto", version = "0.1.0" } +tor-persist = { path = "../tor-persist", version = "0.1.0" } +tor-proto = { path = "../tor-proto", version = "0.1.0" } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0" } +tor-units = { path = "../tor-units", version = "0.1.0" } derive_builder = "0.11" +derive_more = "0.99" educe = "0.4.6" futures = "0.3.14" humantime-serde = "1.1.1" @@ -42,8 +43,8 @@ thiserror = "1" tracing = "0.1.18" [dev-dependencies] -tor-netdir = { path="../tor-netdir", version = "0.1.0", features=["testing"]} -tor-netdoc = { path="../tor-netdoc", version = "0.1.0"} -tor-persist = { path="../tor-persist", version = "0.1.0", features=["testing"]} -tor-rtcompat = { path="../tor-rtcompat", version = "0.1.0", features=["tokio", "native-tls"]} -tor-rtmock = { path="../tor-rtmock", version = "0.1.0"} +tor-netdir = { path = "../tor-netdir", version = "0.1.0", features = ["testing"] } +tor-netdoc = { path = "../tor-netdoc", version = "0.1.0" } +tor-persist = { path = "../tor-persist", version = "0.1.0", features = ["testing"] } +tor-rtcompat = { path = "../tor-rtcompat", version = "0.1.0", features = ["tokio", "native-tls"] } +tor-rtmock = { path = "../tor-rtmock", version = "0.1.0" } diff --git a/crates/tor-guardmgr/src/err.rs b/crates/tor-guardmgr/src/err.rs new file mode 100644 index 0000000000000000000000000000000000000000..ca0ec046292b05ce35f6d6e2a4577c5aa179541e --- /dev/null +++ b/crates/tor-guardmgr/src/err.rs @@ -0,0 +1,86 @@ +//! Declare error types for the `tor-guardmgr` crate. + +use futures::task::SpawnError; +use std::sync::Arc; +use std::time::Instant; +use tor_error::{ErrorKind, HasKind}; + +/// A error caused by a failure to pick a guard. +#[derive(Clone, Debug, thiserror::Error)] +#[non_exhaustive] +pub enum PickGuardError { + /// All members of the current sample were down. + #[error("All guards are down")] + AllGuardsDown { + /// The next time at which any guard will be retriable. + retry_at: Option<Instant>, + }, + + /// Some guards were running, but all of them were either blocked on pending + /// circuits at other guards, unusable for the provided purpose, or filtered + /// out. + #[error("No running guards were usable for the selected purpose")] + NoGuardsUsable, + + /// We have no usable fallback directories. + #[error("All fallback directories are down")] + AllFallbacksDown { + /// The next time at which any fallback directory will back available. + retry_at: Option<Instant>, + }, + + /// Tried to select guards or fallbacks from an empty list. + #[error("Tried to pick from an empty list")] + NoCandidatesAvailable, +} + +impl tor_error::HasKind for PickGuardError { + fn kind(&self) -> tor_error::ErrorKind { + use tor_error::ErrorKind as EK; + use PickGuardError as E; + match self { + E::AllFallbacksDown { .. } | E::AllGuardsDown { .. } => EK::TorAccessFailed, + E::NoGuardsUsable | E::NoCandidatesAvailable => EK::NoPath, + } + } +} + +/// An error caused while creating or updating a guard manager. +#[derive(Clone, Debug, thiserror::Error)] +#[non_exhaustive] +pub enum GuardMgrError { + /// An error manipulating persistent state + #[error("Problem accessing persistent state")] + State(#[from] tor_persist::Error), + + /// An error that occurred while trying to spawn a daemon task. + #[error("Unable to spawn {spawning}")] + Spawn { + /// What we were trying to spawn. + spawning: &'static str, + /// What happened when we tried to spawn it. + #[source] + cause: Arc<SpawnError>, + }, +} + +impl HasKind for GuardMgrError { + #[rustfmt::skip] // to preserve table in match + fn kind(&self) -> ErrorKind { + use GuardMgrError as G; + match self { + G::State(e) => e.kind(), + G::Spawn{ cause, .. } => cause.kind(), + } + } +} + +impl GuardMgrError { + /// Construct a new `GuardMgrError` from a `SpawnError`. + pub(crate) fn from_spawn(spawning: &'static str, err: SpawnError) -> GuardMgrError { + GuardMgrError::Spawn { + spawning, + cause: Arc::new(err), + } + } +} diff --git a/crates/tor-netdir/src/fallback.rs b/crates/tor-guardmgr/src/fallback.rs similarity index 87% rename from crates/tor-netdir/src/fallback.rs rename to crates/tor-guardmgr/src/fallback.rs index f4332b9f5483b21dbe43c0ec92ae9669cd854517..9e01e23f4e7b661d11f91a0ed83cb4ef2dd17e3d 100644 --- a/crates/tor-netdir/src/fallback.rs +++ b/crates/tor-guardmgr/src/fallback.rs @@ -10,6 +10,10 @@ //! The types in this module are re-exported from `arti-client` and //! `tor-dirmgr`: any changes here must be reflected there. +mod set; +mod status; + +use crate::ids::FallbackId; use derive_builder::Builder; use tor_config::ConfigBuildError; use tor_llcrypto::pk::ed25519::Ed25519Identity; @@ -18,6 +22,10 @@ use tor_llcrypto::pk::rsa::RsaIdentity; use serde::Deserialize; use std::net::SocketAddr; +pub use set::FallbackList; +pub(crate) use set::FallbackState; +use status::Status; + /// A directory whose location ships with Tor (or arti), and which we /// can use for bootstrapping when we don't know anything else about /// the network. @@ -42,6 +50,14 @@ impl FallbackDir { pub fn builder() -> FallbackDirBuilder { FallbackDirBuilder::default() } + + /// Return a copy of this FallbackDir as a [`FirstHop`](crate::FirstHop) + pub fn as_guard(&self) -> crate::FirstHop { + crate::FirstHop { + id: FallbackId::from_chan_target(self).into(), + orports: self.orports.clone(), + } + } } impl FallbackDirBuilder { diff --git a/crates/tor-guardmgr/src/fallback/set.rs b/crates/tor-guardmgr/src/fallback/set.rs new file mode 100644 index 0000000000000000000000000000000000000000..a9b03f9969b5ea62be958e67ceca5daf3063ba63 --- /dev/null +++ b/crates/tor-guardmgr/src/fallback/set.rs @@ -0,0 +1,386 @@ +//! Declare the [`FallbackState`] type, which is used to store a set of FallbackDir. + +use rand::seq::IteratorRandom; +use std::time::Instant; + +use super::{FallbackDir, Status}; +use crate::{ids::FallbackId, PickGuardError}; +use serde::Deserialize; + +/// A list of fallback directories. +/// +/// Fallback directories (represented by [`FallbackDir`]) are used by Tor +/// clients when they don't already have enough other directory information to +/// contact the network. +#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +#[serde(transparent)] +pub struct FallbackList { + /// The underlying fallbacks in this set. + fallbacks: Vec<FallbackDir>, +} + +impl<T: IntoIterator<Item = FallbackDir>> From<T> for FallbackList { + fn from(fallbacks: T) -> Self { + FallbackList { + fallbacks: fallbacks.into_iter().collect(), + } + } +} + +impl FallbackList { + /// Return the number of fallbacks in this list. + pub fn len(&self) -> usize { + self.fallbacks.len() + } + /// Return true if there are no fallbacks in this list. + pub fn is_empty(&self) -> bool { + self.fallbacks.is_empty() + } + /// Return a random member of this list. + pub fn choose<R: rand::Rng>(&self, rng: &mut R) -> Result<&FallbackDir, PickGuardError> { + // TODO: Return NoCandidatesAvailable when the fallback list is empty. + self.fallbacks + .iter() + .choose(rng) + .ok_or(PickGuardError::AllFallbacksDown { retry_at: None }) + } +} + +/// A set of fallback directories, in usable form. +#[derive(Debug, Clone)] +pub(crate) struct FallbackState { + /// The list of fallbacks in the set. + /// + /// We require that these are sorted and unique by (ED,RSA) keys. + fallbacks: Vec<Entry>, +} + +/// Wrapper type for FallbackDir converted into crate::Guard, and Status. +/// +/// Defines a sort order to ensure that we can look up fallback directories +/// by binary search on keys. +#[derive(Debug, Clone)] +pub(super) struct Entry { + /// The inner fallback directory. + /// + /// (TODO: We represent this as a `FirstHop`, which could technically hold a + /// guard as well. Ought to fix that.) + pub(super) fallback: crate::FirstHop, + /// The status for the fallback directory. + pub(super) status: Status, +} + +impl From<FallbackDir> for Entry { + fn from(fallback: FallbackDir) -> Self { + let fallback = fallback.as_guard(); + let status = Status::default(); + Entry { fallback, status } + } +} + +impl Entry { + /// Return the identity for this fallback entry. + fn id(&self) -> &FallbackId { + use crate::ids::FirstHopIdInner::*; + match &self.fallback.id().0 { + Fallback(id) => id, + _ => panic!("Somehow we constructed a fallback object with a non-fallback id!"), + } + } +} + +impl From<FallbackList> for FallbackState { + fn from(list: FallbackList) -> Self { + let mut fallbacks: Vec<Entry> = list.fallbacks.into_iter().map(|fb| fb.into()).collect(); + fallbacks.sort_by(|x, y| x.id().cmp(y.id())); + fallbacks.dedup_by(|x, y| x.id() == y.id()); + FallbackState { fallbacks } + } +} + +impl FallbackState { + /// Return a random member of this FallbackSet that's usable at `now`. + pub(crate) fn choose<R: rand::Rng>( + &self, + rng: &mut R, + now: Instant, + ) -> Result<&crate::FirstHop, PickGuardError> { + if self.fallbacks.is_empty() { + return Err(PickGuardError::NoCandidatesAvailable); + } + + self.fallbacks + .iter() + .filter(|ent| ent.status.usable_at(now)) + .choose(rng) + .map(|ent| &ent.fallback) + .ok_or_else(|| PickGuardError::AllFallbacksDown { + retry_at: self.next_retry(), + }) + } + + /// Return the next time at which any member of this set will become ready. + /// + /// Returns None if no elements are failing. + fn next_retry(&self) -> Option<Instant> { + self.fallbacks + .iter() + .filter_map(|ent| ent.status.next_retriable()) + .min() + } + + /// Return a reference to the entry whose identity is `id`, if there is one. + fn get(&self, id: &FallbackId) -> Option<&Entry> { + match self.fallbacks.binary_search_by(|e| e.id().cmp(id)) { + Ok(idx) => Some(&self.fallbacks[idx]), + Err(_) => None, + } + } + + /// Return a mutable reference to the entry whose identity is `id`, if there is one. + fn get_mut(&mut self, id: &FallbackId) -> Option<&mut Entry> { + match self.fallbacks.binary_search_by(|e| e.id().cmp(id)) { + Ok(idx) => Some(&mut self.fallbacks[idx]), + Err(_) => None, + } + } + + /// Return true if this set contains some entry with the given `id`. + pub(crate) fn contains(&self, id: &FallbackId) -> bool { + self.get(id).is_some() + } + + /// Record that a success has occurred for the fallback with the given + /// identity. + /// + /// Be aware that for fallbacks, we only count a successful directory + /// operation as a success: a circuit success is not enough. + pub(crate) fn note_success(&mut self, id: &FallbackId) { + if let Some(entry) = self.get_mut(id) { + entry.status.note_success(); + } + } + + /// Record that a failure has occurred for the fallback with the given + /// identity. + pub(crate) fn note_failure(&mut self, id: &FallbackId, now: Instant) { + if let Some(entry) = self.get_mut(id) { + entry.status.note_failure(now); + } + } + + /// Consume `other` and copy all of its fallback status entries into the corresponding entries for `self`. + pub(crate) fn take_status_from(&mut self, other: FallbackState) { + use itertools::EitherOrBoth::Both; + + itertools::merge_join_by( + self.fallbacks.iter_mut(), + other.fallbacks.into_iter(), + |a, b| a.fallback.id().cmp(b.fallback.id()), + ) + .for_each(|entry| { + if let Both(entry, other) = entry { + debug_assert_eq!(entry.fallback.id(), other.fallback.id()); + entry.status = other.status; + } + }); + } +} + +#[cfg(test)] +mod test { + #![allow(clippy::unwrap_used)] + use super::*; + use crate::FirstHopId; + + /// Construct a `FallbackDir` with random identity keys and addresses. + /// + /// Since there are 416 bits of random id here, the risk of collision is + /// negligible. + fn rand_fb() -> FallbackDir { + use rand::Rng; + let mut rng = rand::thread_rng(); + let ed: [u8; 32] = rng.gen(); + let rsa: [u8; 20] = rng.gen(); + let ip: u32 = rng.gen(); + FallbackDir::builder() + .ed_identity(ed.into()) + .rsa_identity(rsa.into()) + .orport(std::net::SocketAddrV4::new(ip.into(), 9090).into()) + .build() + .unwrap() + } + + #[test] + fn construct_fallback_set() { + use rand::seq::SliceRandom; + + // fabricate some fallbacks. + let fbs = vec![rand_fb(), rand_fb(), rand_fb(), rand_fb()]; + let fb_other = rand_fb(); + let id_other = FallbackId::from_chan_target(&fb_other); + + // basic case: construct a set + let list: FallbackList = fbs.clone().into(); + assert!(!list.is_empty()); + assert_eq!(list.len(), 4); + let mut set: FallbackState = list.clone().into(); + + // inspect the generated set + assert_eq!(set.fallbacks.len(), 4); + assert!(set.fallbacks[0].id() < set.fallbacks[1].id()); + assert!(set.fallbacks[1].id() < set.fallbacks[2].id()); + assert!(set.fallbacks[2].id() < set.fallbacks[3].id()); + + // use the constructed set a little. + for fb in fbs.iter() { + let id = FallbackId::from_chan_target(fb); + assert_eq!(set.get_mut(&id).unwrap().id(), &id); + } + assert!(set.get_mut(&id_other).is_none()); + + // Now try an input set with duplicates. + let mut redundant_fbs = fbs.clone(); + redundant_fbs.extend(fbs.clone()); + redundant_fbs.extend(fbs[0..2].iter().map(Clone::clone)); + redundant_fbs[..].shuffle(&mut rand::thread_rng()); + let list2 = redundant_fbs.into(); + assert_ne!(&list, &list2); + let set2: FallbackState = list2.into(); + + // It should have the same elements, in the same order. + assert_eq!(set.fallbacks.len(), set2.fallbacks.len()); + assert!(set + .fallbacks + .iter() + .zip(set2.fallbacks.iter()) + .all(|(ent1, ent2)| ent1.id() == ent2.id())); + } + + #[test] + fn set_choose() { + let fbs = vec![rand_fb(), rand_fb(), rand_fb(), rand_fb()]; + let list: FallbackList = fbs.into(); + let mut set: FallbackState = list.into(); + + let mut counts = [0_usize; 4]; + let mut rng = rand::thread_rng(); + let now = Instant::now(); + + fn lookup_idx(set: &FallbackState, id: &FirstHopId) -> Option<usize> { + if let FirstHopId(crate::ids::FirstHopIdInner::Fallback(id)) = id { + set.fallbacks.binary_search_by(|ent| ent.id().cmp(id)).ok() + } else { + None + } + } + // Basic case: everybody is up. + for _ in 0..100 { + let fb = set.choose(&mut rng, now).unwrap(); + let idx = lookup_idx(&set, fb.id()).unwrap(); + counts[idx] += 1; + } + assert!(counts.iter().all(|v| *v > 0)); + + // Mark somebody down and make sure they don't get chosen. + let ids: Vec<_> = set.fallbacks.iter().map(|fb| fb.id().clone()).collect(); + set.note_failure(&ids[2], now); + counts = [0; 4]; + for _ in 0..100 { + let fb = set.choose(&mut rng, now).unwrap(); + let idx = lookup_idx(&set, fb.id()).unwrap(); + counts[idx] += 1; + } + assert_eq!(counts.iter().filter(|v| **v > 0).count(), 3); + assert_eq!(counts[2], 0); + + // Mark everybody down; make sure we get the right error. + for id in ids.iter() { + set.note_failure(id, now); + } + assert!(matches!( + set.choose(&mut rng, now), + Err(PickGuardError::AllFallbacksDown { .. }) + )); + + // Construct an empty set; make sure we get the right error. + let empty_set = FallbackState::from(FallbackList::from(vec![])); + assert!(matches!( + empty_set.choose(&mut rng, now), + Err(PickGuardError::NoCandidatesAvailable) + )); + + // TODO: test restrictions and filters once they're implemented. + } + + #[test] + fn test_status() { + let fbs = vec![rand_fb(), rand_fb(), rand_fb(), rand_fb()]; + let list: FallbackList = fbs.clone().into(); + let mut set: FallbackState = list.into(); + let ids: Vec<_> = set.fallbacks.iter().map(|fb| fb.id().clone()).collect(); + + let now = Instant::now(); + + // There's no "next retry time" when everybody's up. + assert!(set.next_retry().is_none()); + + // Mark somebody down; try accessors. + set.note_failure(&ids[3], now); + assert!(set.fallbacks[3].status.next_retriable().unwrap() > now); + assert!(!set.fallbacks[3].status.usable_at(now)); + assert_eq!(set.next_retry(), set.fallbacks[3].status.next_retriable()); + + // Mark somebody else down; try accessors. + set.note_failure(&ids[0], now); + assert!(set.fallbacks[0].status.next_retriable().unwrap() > now); + assert!(!set.fallbacks[0].status.usable_at(now)); + assert_eq!( + set.next_retry().unwrap(), + std::cmp::min( + set.fallbacks[0].status.next_retriable().unwrap(), + set.fallbacks[3].status.next_retriable().unwrap() + ) + ); + + // Mark somebody as running; try accessors. + set.note_success(&ids[0]); + assert!(set.fallbacks[0].status.next_retriable().is_none()); + assert!(set.fallbacks[0].status.usable_at(now)); + + assert!(set.get_mut(&ids[0]).unwrap().status.usable_at(now)); + + for id in ids.iter() { + dbg!(id, set.get_mut(id).map(|e| e.id())); + } + + // Make a new set with slightly different members; make sure that we can copy stuff successfully. + let mut fbs2: Vec<_> = fbs + .into_iter() + // (Remove the fallback with id==ids[2]) + .filter(|fb| FallbackId::from_chan_target(fb) != ids[2]) + .collect(); + // add 2 new ones. + let fbs_new = [rand_fb(), rand_fb(), rand_fb()]; + fbs2.extend(fbs_new.clone()); + + let mut set2 = FallbackState::from(FallbackList::from(fbs2.clone())); + set2.take_status_from(set); // consumes set. + assert_eq!(set2.fallbacks.len(), 6); // Started with 4, added 3, removed 1. + + // Make sure that the status entries are correctly copied. + assert!(set2.get_mut(&ids[0]).unwrap().status.usable_at(now)); + assert!(set2.get_mut(&ids[1]).unwrap().status.usable_at(now)); + assert!(set2.get_mut(&ids[2]).is_none()); + assert!(!set2.get_mut(&ids[3]).unwrap().status.usable_at(now)); + + // Make sure that the new fbs are there. + for new_fb in fbs_new { + assert!(set2 + .get_mut(&FallbackId::from_chan_target(&new_fb)) + .unwrap() + .status + .usable_at(now)); + } + } +} diff --git a/crates/tor-guardmgr/src/fallback/status.rs b/crates/tor-guardmgr/src/fallback/status.rs new file mode 100644 index 0000000000000000000000000000000000000000..c5d4f5ba2b277ad8d06ec0a6f68007f7a8ce26c7 --- /dev/null +++ b/crates/tor-guardmgr/src/fallback/status.rs @@ -0,0 +1,102 @@ +//! Types and code to track the readiness status of a fallback directory. + +use std::time::{Duration, Instant}; +use tor_basic_utils::retry::RetryDelay; + +/// Status information about whether a [`FallbackDir`](super::FallbackDir) is +/// currently usable. +/// +/// This structure is used to track whether the fallback cache has recently +/// failed, and if so, when it can be retried. +#[derive(Debug, Clone)] +pub(crate) struct Status { + /// Used to decide how long to delay before retrying a fallback cache + /// that has failed. + delay: RetryDelay, + /// A time before which we should assume that this fallback cache is broken. + /// + /// If None, then this fallback cache is ready to use right away. + retry_at: Option<Instant>, +} + +/// Least amount of time we'll wait before retrying a fallback cache. +// +// TODO: we may want to make this configurable to a smaller value for chutney networks. +const FALLBACK_RETRY_FLOOR: Duration = Duration::from_secs(150); + +impl Default for Status { + fn default() -> Self { + Status { + delay: RetryDelay::from_duration(FALLBACK_RETRY_FLOOR), + retry_at: None, + } + } +} + +impl Status { + /// Return true if this `Status` is usable at the time `now`. + pub(crate) fn usable_at(&self, now: Instant) -> bool { + match self.retry_at { + Some(ready) => now >= ready, + None => true, + } + } + + /// Return the time at which this `Status` can next be retried. + /// + /// A return value of `None`, or of a time in the past, indicates that this + /// status can be used immediately. + pub(crate) fn next_retriable(&self) -> Option<Instant> { + self.retry_at + } + + /// Record that the associated fallback directory has been used successfully. + /// + /// This should only be done after successfully handling a whole reply from the + /// directory. + pub(crate) fn note_success(&mut self) { + self.retry_at = None; + self.delay = RetryDelay::from_duration(FALLBACK_RETRY_FLOOR); + } + + /// Record that the associated fallback directory has failed. + pub(crate) fn note_failure(&mut self, now: Instant) { + let mut rng = rand::thread_rng(); + self.retry_at = Some(now + self.delay.next_delay(&mut rng)); + } +} + +#[cfg(test)] +mod test { + #![allow(clippy::unwrap_used)] + use super::*; + + #[test] + fn status_basics() { + let now = Instant::now(); + + let mut status = Status::default(); + // newly created status is usable. + assert!(status.usable_at(now)); + + // no longer usable after a failure. + status.note_failure(now); + assert_eq!(status.next_retriable().unwrap(), now + FALLBACK_RETRY_FLOOR); + assert!(!status.usable_at(now)); + + // Not enough time has passed. + assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR / 2)); + + // Enough time has passed. + assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR)); + + // Mark as failed again; the timeout will (probably) be longer. + status.note_failure(now + FALLBACK_RETRY_FLOOR); + assert!(status.next_retriable().unwrap() >= now + FALLBACK_RETRY_FLOOR * 2); + assert!(!status.usable_at(now + FALLBACK_RETRY_FLOOR)); + + // Mark as succeeded; it should be usable immediately. + status.note_success(); + assert!(status.usable_at(now + FALLBACK_RETRY_FLOOR)); + } +} diff --git a/crates/tor-guardmgr/src/guard.rs b/crates/tor-guardmgr/src/guard.rs index b48f1f5d0608cbee21691485a069a66af1e96567..5a66a67308546aca66854985c27768a85dae10f8 100644 --- a/crates/tor-guardmgr/src/guard.rs +++ b/crates/tor-guardmgr/src/guard.rs @@ -13,7 +13,8 @@ use std::time::{Duration, Instant, SystemTime}; use tracing::{trace, warn}; use crate::util::randomize_time; -use crate::{GuardId, GuardParams, GuardRestriction, GuardUsage}; +use crate::FirstHopId; +use crate::{ids::GuardId, GuardParams, GuardRestriction, GuardUsage}; use tor_persist::{Futureproof, JsonValue}; /// Tri-state to represent whether a guard is believed to be reachable or not. @@ -321,8 +322,8 @@ impl Guard { /// Return true if this guard obeys a single restriction. fn obeys_restriction(&self, r: &GuardRestriction) -> bool { match r { - GuardRestriction::AvoidId(ed) => &self.id.ed25519 != ed, - GuardRestriction::AvoidAllIds(ids) => !ids.contains(&self.id.ed25519), + GuardRestriction::AvoidId(ed) => &self.id.0.ed25519 != ed, + GuardRestriction::AvoidAllIds(ids) => !ids.contains(&self.id.0.ed25519), } } @@ -353,7 +354,7 @@ impl Guard { /// download another microdescriptor before we can be certain whether this /// guard is listed or not. pub(crate) fn listed_in(&self, netdir: &NetDir) -> Option<bool> { - netdir.id_pair_listed(&self.id.ed25519, &self.id.rsa) + netdir.id_pair_listed(&self.id.0.ed25519, &self.id.0.rsa) } /// Change this guard's status based on a newly received or newly @@ -370,11 +371,9 @@ impl Guard { // not. let listed_as_guard = match self.listed_in(netdir) { Some(true) => { + let id: FirstHopId = self.id.clone().into(); // Definitely listed. - let relay = self - .id - .get_relay(netdir) - .expect("Couldn't get a listed relay?!"); + let relay = id.get_relay(netdir).expect("Couldn't get a listed relay?!"); // Update address information. self.orports = relay.addrs().into(); // Check whether we can currently use it as a directory cache. @@ -570,13 +569,13 @@ impl Guard { /// We use this information to decide whether we are about to sample /// too much of the network as guards. pub(crate) fn get_weight(&self, dir: &NetDir) -> Option<RelayWeight> { - dir.weight_by_rsa_id(&self.id.rsa, tor_netdir::WeightRole::Guard) + dir.weight_by_rsa_id(&self.id.0.rsa, tor_netdir::WeightRole::Guard) } - /// Return a [`crate::Guard`] object to represent this guard. - pub(crate) fn get_external_rep(&self) -> crate::Guard { - crate::Guard { - id: self.id.clone(), + /// Return a [`FirstHop`](crate::FirstHop) object to represent this guard. + pub(crate) fn get_external_rep(&self) -> crate::FirstHop { + crate::FirstHop { + id: self.id.clone().into(), orports: self.orports.clone(), } } @@ -587,10 +586,10 @@ impl tor_linkspec::ChanTarget for Guard { &self.orports[..] } fn ed_identity(&self) -> &Ed25519Identity { - &self.id.ed25519 + &self.id.0.ed25519 } fn rsa_identity(&self) -> &RsaIdentity { - &self.id.rsa + &self.id.0.rsa } } @@ -703,8 +702,8 @@ mod test { let g = basic_guard(); assert_eq!(g.guard_id(), &id); - assert_eq!(g.ed_identity(), &id.ed25519); - assert_eq!(g.rsa_identity(), &id.rsa); + assert_eq!(g.ed_identity(), &id.0.ed25519); + assert_eq!(g.rsa_identity(), &id.0.rsa); assert_eq!(g.addrs(), &["127.0.0.7:7777".parse().unwrap()]); assert_eq!(g.reachable(), Reachable::Unknown); assert_eq!(g.reachable(), Reachable::default()); @@ -910,7 +909,8 @@ mod test { assert!(Some(guard22.added_at) <= Some(now)); // Can we still get the relay back? - let r = guard22.id.get_relay(&netdir).unwrap(); + let id: FirstHopId = guard22.id.clone().into(); + let r = id.get_relay(&netdir).unwrap(); assert_eq!(r.ed_identity(), relay22.ed_identity()); // Can we check on the guard's weight? @@ -923,7 +923,8 @@ mod test { vec![], now, ); - assert!(guard255.id.get_relay(&netdir).is_none()); + let id: FirstHopId = guard255.id.clone().into(); + assert!(id.get_relay(&netdir).is_none()); assert!(guard255.get_weight(&netdir).is_none()); } @@ -975,7 +976,8 @@ mod test { // Try a guard that is in netdir, but not netdir2. let mut guard22 = Guard::new(GuardId::new([22; 32].into(), [22; 20].into()), vec![], now); - let relay22 = guard22.id.get_relay(&netdir).unwrap(); + let id22: FirstHopId = guard22.id.clone().into(); + let relay22 = id22.get_relay(&netdir).unwrap(); assert_eq!(guard22.listed_in(&netdir), Some(true)); guard22.update_from_netdir(&netdir); assert_eq!(guard22.unlisted_since, None); // It's listed. diff --git a/crates/tor-guardmgr/src/ids.rs b/crates/tor-guardmgr/src/ids.rs new file mode 100644 index 0000000000000000000000000000000000000000..92ca66aedfeb10b54c892bce6d0b57b889375186 --- /dev/null +++ b/crates/tor-guardmgr/src/ids.rs @@ -0,0 +1,104 @@ +//! Identifier objects used to specify guards and/or fallbacks. + +use derive_more::AsRef; +use serde::{Deserialize, Serialize}; +use tor_llcrypto::pk; + +/// A pair of cryptographic identities used to distinguish a guard or fallback. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub(crate) struct IdPair { + /// Ed25519 identity key for a guard + pub(crate) ed25519: pk::ed25519::Ed25519Identity, + /// RSA identity fingerprint for a guard + pub(crate) rsa: pk::rsa::RsaIdentity, +} + +/// An identifier for a fallback directory cache. +/// +/// This is a separate type from GuardId and FirstHopId to avoid confusion +/// about what kind of object we're identifying. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, AsRef)] +pub(crate) struct FallbackId(pub(crate) IdPair); + +impl FallbackId { + /// Return a new, manually constructed `FallbackId` + pub(crate) fn new(ed25519: pk::ed25519::Ed25519Identity, rsa: pk::rsa::RsaIdentity) -> Self { + Self(IdPair { ed25519, rsa }) + } + /// Extract a `FallbackId` from a ChanTarget object. + pub(crate) fn from_chan_target<T: tor_linkspec::ChanTarget>(target: &T) -> Self { + Self::new(*target.ed_identity(), *target.rsa_identity()) + } +} + +/// An identifier for a sampled guard. +/// +/// This is a separate type from GuardId and FirstHopId to avoid confusion +/// about what kind of object we're identifying. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd, AsRef)] +#[serde(transparent)] +pub(crate) struct GuardId(pub(crate) IdPair); + +impl GuardId { + /// Return a new, manually constructed `GuardId` + pub(crate) fn new(ed25519: pk::ed25519::Ed25519Identity, rsa: pk::rsa::RsaIdentity) -> Self { + Self(IdPair { ed25519, rsa }) + } + /// Extract a `GuardId` from a ChanTarget object. + pub(crate) fn from_chan_target<T: tor_linkspec::ChanTarget>(target: &T) -> Self { + Self::new(*target.ed_identity(), *target.rsa_identity()) + } +} + +/// Implementation type held inside of FirstHopId. +/// +/// This exists as a separate type from FirstHopId because Rust requires that a pub enum's variants are all public. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) enum FirstHopIdInner { + /// Identifies a guard. + Guard(GuardId), + /// Identifies a fallback. + Fallback(FallbackId), +} + +/// A unique cryptographic identifier for a selected guard or fallback +/// directory. +/// +/// (This is implemented internally using both of the guard's Ed25519 and RSA +/// identities.) +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct FirstHopId(pub(crate) FirstHopIdInner); + +impl From<GuardId> for FirstHopId { + fn from(id: GuardId) -> Self { + Self(FirstHopIdInner::Guard(id)) + } +} +impl From<FallbackId> for FirstHopId { + fn from(id: FallbackId) -> Self { + Self(FirstHopIdInner::Fallback(id)) + } +} +impl AsRef<IdPair> for FirstHopId { + /// Return the inner IdPair for this object. + /// + /// Only use this when it's okay to erase the type information about + /// whether this identifies a guard or a fallback. + fn as_ref(&self) -> &IdPair { + match &self.0 { + FirstHopIdInner::Guard(id) => id.as_ref(), + FirstHopIdInner::Fallback(id) => id.as_ref(), + } + } +} + +impl FirstHopId { + /// Return the relay in `netdir` that corresponds to this ID, if there + /// is one. + // + // We have to define this function so it'll be public. + pub fn get_relay<'a>(&self, netdir: &'a tor_netdir::NetDir) -> Option<tor_netdir::Relay<'a>> { + let id = self.as_ref(); + netdir.by_id_pair(&id.ed25519, &id.rsa) + } +} diff --git a/crates/tor-guardmgr/src/lib.rs b/crates/tor-guardmgr/src/lib.rs index 8dc75e4d38c53985d3a732264bfe1169ccfb673f..fe661719da65139a177e311b7d22e37e30560d4a 100644 --- a/crates/tor-guardmgr/src/lib.rs +++ b/crates/tor-guardmgr/src/lib.rs @@ -132,7 +132,7 @@ use educe::Educe; use futures::channel::mpsc; -use futures::task::{SpawnError, SpawnExt}; +use futures::task::SpawnExt; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; @@ -141,26 +141,31 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime}; use tracing::{debug, info, trace, warn}; -use tor_error::{ErrorKind, HasKind}; use tor_llcrypto::pk; use tor_netdir::{params::NetParameters, NetDir, Relay}; use tor_persist::{DynStorageHandle, StateMgr}; use tor_rtcompat::Runtime; mod daemon; +mod err; +pub mod fallback; mod filter; mod guard; +mod ids; mod pending; mod sample; mod util; +pub use err::{GuardMgrError, PickGuardError}; pub use filter::GuardFilter; +pub use ids::FirstHopId; pub use pending::{GuardMonitor, GuardStatus, GuardUsable}; -pub use sample::PickGuardError; use pending::{PendingRequest, RequestId}; use sample::GuardSet; +use crate::ids::FirstHopIdInner; + /// A "guard manager" that selects and remembers a persistent set of /// guard nodes. /// @@ -228,6 +233,11 @@ struct GuardMgrInner { /// same guard. waiting: Vec<PendingRequest>, + /// A list of fallback directories used to access the directory system + /// when no other directory information is yet known. + // TODO: reconfigure when the configuration changes. + fallbacks: fallback::FallbackState, + /// Location in which to store persistent state. storage: DynStorageHandle<GuardSets>, } @@ -260,7 +270,11 @@ impl<R: Runtime> GuardMgr<R> { /// /// It won't be able to hand out any guards until /// [`GuardMgr::update_network`] has been called. - pub fn new<S>(runtime: R, state_mgr: S) -> Result<Self, GuardMgrError> + pub fn new<S>( + runtime: R, + state_mgr: S, + fallbacks: fallback::FallbackList, + ) -> Result<Self, GuardMgrError> where S: StateMgr + Send + Sync + 'static, { @@ -278,6 +292,7 @@ impl<R: Runtime> GuardMgr<R> { ctrl, pending: HashMap::new(), waiting: Vec::new(), + fallbacks: fallbacks.into(), storage, })); { @@ -364,6 +379,14 @@ impl<R: Runtime> GuardMgr<R> { inner.update(now, Some(netdir)); } + /// Replace the fallback list held by this GuardMgr with `new_list`. + pub fn replace_fallback_list(&self, list: fallback::FallbackList) { + let mut fallbacks: fallback::FallbackState = list.into(); + let mut inner = self.inner.lock().expect("Poisoned lock"); + std::mem::swap(&mut inner.fallbacks, &mut fallbacks); + inner.fallbacks.take_status_from(fallbacks); + } + /// Replace the current [`GuardFilter`] used by this `GuardMgr`. /// /// (Since there is only one kind of filter right now, there's no @@ -413,7 +436,7 @@ impl<R: Runtime> GuardMgr<R> { /// Select a guard for a given [`GuardUsage`]. /// - /// On success, we return a [`GuardId`] object to identify which + /// On success, we return a [`FirstHopId`] object to identify which /// guard we have picked, a [`GuardMonitor`] object that the /// caller can use to report whether its attempt to use the guard /// succeeded or failed, and a [`GuardUsable`] future that the @@ -440,7 +463,7 @@ impl<R: Runtime> GuardMgr<R> { &self, usage: GuardUsage, netdir: Option<&NetDir>, - ) -> Result<(Guard, GuardMonitor, GuardUsable), PickGuardError> { + ) -> Result<(FirstHop, GuardMonitor, GuardUsable), PickGuardError> { let now = self.runtime.now(); let wallclock = self.runtime.wallclock(); @@ -450,18 +473,11 @@ impl<R: Runtime> GuardMgr<R> { // it should _probably_ not hurt.) inner.guards.active_guards_mut().consider_all_retries(now); - let (origin, guard_id) = inner.select_guard_with_retries(&usage, netdir, wallclock)?; - let guard = inner - .guards - .active_guards() - .get(&guard_id) - .expect("Selected guard that wasn't in our sample!?") - .get_external_rep(); - - trace!(?guard_id, ?usage, "Guard selected"); + let (origin, guard) = inner.select_guard_with_expand(&usage, netdir, now, wallclock)?; + trace!(?guard, ?usage, "Guard selected"); - let (usable, usable_sender) = if origin.is_primary() { - (GuardUsable::new_primary(), None) + let (usable, usable_sender) = if origin.usable_immediately() { + (GuardUsable::new_usable_immediately(), None) } else { let (u, snd) = GuardUsable::new_uncertain(); (u, Some(snd)) @@ -488,26 +504,73 @@ impl<R: Runtime> GuardMgr<R> { }; let pending_request = - pending::PendingRequest::new(guard_id.clone(), usage, usable_sender, net_has_been_down); + pending::PendingRequest::new(guard.id.clone(), usage, usable_sender, net_has_been_down); inner.pending.insert(request_id, pending_request); - inner - .guards - .active_guards_mut() - .record_attempt(&guard_id, now); + match &guard.id.0 { + FirstHopIdInner::Guard(id) => inner.guards.active_guards_mut().record_attempt(id, now), + FirstHopIdInner::Fallback(_) => { + // We don't record attempts for fallbacks; we only care when + // they have failed. + } + } Ok((guard, monitor, usable)) } - /// Record that after we tried to connect to the guard described in `Guard` - pub fn note_external_failure(&self, guard: &GuardId, external_failure: ExternalFailure) { + /// Record that _after_ we built a circuit with a guard, something described + /// in `external_failure` went wrong with it. + pub fn note_external_failure( + &self, + ed_identity: &pk::ed25519::Ed25519Identity, + rsa_identity: &pk::rsa::RsaIdentity, + external_failure: ExternalActivity, + ) { let now = self.runtime.now(); let mut inner = self.inner.lock().expect("Poisoned lock"); - inner - .guards - .active_guards_mut() - .record_failure(guard, Some(external_failure), now); + for id in inner.lookup_ids(ed_identity, rsa_identity) { + match &id.0 { + FirstHopIdInner::Guard(id) => { + inner.guards.active_guards_mut().record_failure( + id, + Some(external_failure), + now, + ); + } + FirstHopIdInner::Fallback(id) => { + if external_failure == ExternalActivity::DirCache { + inner.fallbacks.note_failure(id, now); + } + } + } + } + } + + /// Record that _after_ we built a circuit with a guard, some activity + /// described in `external_activity` was successful with it. + pub fn note_external_success( + &self, + ed_identity: &pk::ed25519::Ed25519Identity, + rsa_identity: &pk::rsa::RsaIdentity, + external_activity: ExternalActivity, + ) { + let mut inner = self.inner.lock().expect("Poisoned lock"); + + for id in inner.lookup_ids(ed_identity, rsa_identity) { + match &id.0 { + FirstHopIdInner::Guard(_) => { + // TODO: Nothing to do in this case yet; there is not yet a + // separate "directory succeeding" status for guards. But there + // should be, eventually. This is part of #329. + } + FirstHopIdInner::Fallback(id) => { + if external_activity == ExternalActivity::DirCache { + inner.fallbacks.note_success(id); + } + } + } + } } /// Ensure that the message queue is flushed before proceeding to @@ -527,12 +590,12 @@ impl<R: Runtime> GuardMgr<R> { } } -/// A reason for marking a guard as failed that can't be observed from inside -/// the `GuardMgr` code. -#[derive(Copy, Clone, Debug)] +/// An activity that can succeed or fail, and whose success or failure can be +/// attributed to a guard. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[non_exhaustive] -pub enum ExternalFailure { - /// This guard has somehow failed to operate as a good directory cache. +pub enum ExternalActivity { + /// The activity of using the guard as a directory cache. DirCache, } @@ -654,8 +717,17 @@ impl GuardMgrInner { // If there was a pending request matching this RequestId, great! let guard_id = pending.guard_id(); trace!(?guard_id, ?status, "Received report of guard status"); - match status { - GuardStatus::Success => { + + match (status, &guard_id.0) { + (GuardStatus::Failure, FirstHopIdInner::Fallback(id)) => { + // We used a fallback, and we weren't able to build a circuit through it. + self.fallbacks.note_failure(id, runtime.now()); + } + (_, FirstHopIdInner::Fallback(_)) => { + // We don't record any other kind of circuit activity if we + // took the entry from the fallback list. + } + (GuardStatus::Success, FirstHopIdInner::Guard(id)) => { // If we had gone too long without any net activity when we // gave out this guard, and now we're seeing a circuit // succeed, tell the primary guards that they might be @@ -666,7 +738,7 @@ impl GuardMgrInner { // The guard succeeded. Tell the GuardSet. self.guards.active_guards_mut().record_success( - guard_id, + id, &self.params, runtime.wallclock(), ); @@ -683,22 +755,20 @@ impl GuardMgrInner { self.waiting.push(pending); } } - GuardStatus::Failure => { + (GuardStatus::Failure, FirstHopIdInner::Guard(id)) => { self.guards .active_guards_mut() - .record_failure(guard_id, None, runtime.now()); + .record_failure(id, None, runtime.now()); pending.reply(false); } - GuardStatus::AttemptAbandoned => { - self.guards - .active_guards_mut() - .record_attempt_abandoned(guard_id); + (GuardStatus::AttemptAbandoned, FirstHopIdInner::Guard(id)) => { + self.guards.active_guards_mut().record_attempt_abandoned(id); pending.reply(false); } - GuardStatus::Indeterminate => { + (GuardStatus::Indeterminate, FirstHopIdInner::Guard(id)) => { self.guards .active_guards_mut() - .record_indeterminate_result(guard_id); + .record_indeterminate_result(id); pending.reply(false); } }; @@ -728,12 +798,17 @@ impl GuardMgrInner { /// Return None if we can't yet give an answer about whether such /// a circuit is usable. fn guard_usability_status(&self, pending: &PendingRequest, now: Instant) -> Option<bool> { - self.guards.active_guards().circ_usability_status( - pending.guard_id(), - pending.usage(), - &self.params, - now, - ) + match &pending.guard_id().0 { + FirstHopIdInner::Guard(id) => self.guards.active_guards().circ_usability_status( + id, + pending.usage(), + &self.params, + now, + ), + // Fallback circuits are usable immediately, since we don't have to wait to + // see whether any _other_ circuit succeeds or fails. + FirstHopIdInner::Fallback(_) => Some(true), + } } /// For requests that have been "waiting" for an answer for too long, @@ -783,6 +858,36 @@ impl GuardMgrInner { std::mem::swap(&mut waiting, &mut self.waiting); } + /// Return every currently extant FirstHopId for a guard or fallback + /// directory matching the provided keys. + /// + /// # TODO + /// + /// This function should probably not exist; it's only used so that dirmgr + /// can report successes or failures, since by the time it observes them it + /// doesn't know whether its circuit came from a guard or a fallback. To + /// solve that, we'll need CircMgr to record and report which one it was + /// using, which will take some more plumbing. + fn lookup_ids( + &self, + ed_identity: &pk::ed25519::Ed25519Identity, + rsa_identity: &pk::rsa::RsaIdentity, + ) -> Vec<FirstHopId> { + let mut vec = Vec::with_capacity(2); + + let id = ids::GuardId::new(*ed_identity, *rsa_identity); + if self.guards.active_guards().contains(&id) { + vec.push(id.into()); + } + + let id = ids::FallbackId::new(*ed_identity, *rsa_identity); + if self.fallbacks.contains(&id) { + vec.push(id.into()); + } + + vec + } + /// Run any periodic events that update guard status, and return a /// duration after which periodic events should next be run. pub(crate) fn run_periodic_events(&mut self, wallclock: SystemTime, now: Instant) -> Duration { @@ -791,38 +896,83 @@ impl GuardMgrInner { Duration::from_secs(1) // TODO: Too aggressive. } - /// Try to select a guard, expanding the sample or marking guards retriable - /// if the first attempts fail. - fn select_guard_with_retries( + /// Try to select a guard, expanding the sample if the first attempt fails. + fn select_guard_with_expand( &mut self, usage: &GuardUsage, netdir: Option<&NetDir>, - now: SystemTime, - ) -> Result<(sample::ListKind, GuardId), PickGuardError> { + now: Instant, + wallclock: SystemTime, + ) -> Result<(sample::ListKind, FirstHop), PickGuardError> { // Try to find a guard. - let res1 = self.guards.active_guards().pick_guard(usage, &self.params); - if let Ok(s) = res1 { - return Ok(s); - } + let first_error = match self.select_guard_once(usage) { + Ok(res1) => return Ok(res1), + Err(e) => { + trace!("Couldn't select guard on first attempt: {}", e); + e + } + }; // That didn't work. If we have a netdir, expand the sample and try again. if let Some(dir) = netdir { trace!("No guards available, trying to extend the sample."); - self.update(now, Some(dir)); + self.update(wallclock, Some(dir)); if self .guards .active_guards_mut() - .extend_sample_as_needed(now, &self.params, dir) + .extend_sample_as_needed(wallclock, &self.params, dir) { self.guards .active_guards_mut() .select_primary_guards(&self.params); - return self.guards.active_guards().pick_guard(usage, &self.params); + match self.select_guard_once(usage) { + Ok(res) => return Ok(res), + Err(e) => { + trace!("Couldn't select guard after expanding sample: {}", e); + } + } } } - // Couldn't extend the sample; return the original error. - res1 + // Okay, that didn't work either. If we were asked for a directory + // guard, then we may be able to use a fallback. + if usage.kind == GuardUsageKind::OneHopDirectory { + return self.select_fallback(now); + } + + // Couldn't extend the sample or use a fallback; return the original error. + Err(first_error) + } + + /// Helper: try to pick a single guard, without retrying on failure. + fn select_guard_once( + &self, + usage: &GuardUsage, + ) -> Result<(sample::ListKind, FirstHop), PickGuardError> { + let (source, id) = self + .guards + .active_guards() + .pick_guard(usage, &self.params)?; + let guard = self + .guards + .active_guards() + .get(&id) + .expect("Selected guard that wasn't in our sample!?") + .get_external_rep(); + + Ok((source, guard)) + } + + /// Helper: Select a fallback directory. + /// + /// Called when we have no guard information to use. Return values are as + /// for [`GuardMgr::select_guard()`] + fn select_fallback( + &self, + now: Instant, + ) -> Result<(sample::ListKind, FirstHop), PickGuardError> { + let fallback = self.fallbacks.choose(&mut rand::thread_rng(), now)?; + Ok((sample::ListKind::Fallback, fallback.clone())) } } @@ -918,48 +1068,18 @@ impl TryFrom<&NetParameters> for GuardParams { } } -/// A unique cryptographic identifier for a selected guard. -/// -/// (This is implemented internally using both of the guard's Ed25519 -/// and RSA identities.) -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] -pub struct GuardId { - /// Ed25519 identity key for a a guard - ed25519: pk::ed25519::Ed25519Identity, - /// RSA identity fingerprint for a a guard - rsa: pk::rsa::RsaIdentity, -} - -impl GuardId { - /// Return a new, manually constructed GuardId - fn new(ed25519: pk::ed25519::Ed25519Identity, rsa: pk::rsa::RsaIdentity) -> Self { - Self { ed25519, rsa } - } - - /// Extract a GuardId from a ChanTarget object. - pub fn from_chan_target<T: tor_linkspec::ChanTarget>(target: &T) -> Self { - GuardId::new(*target.ed_identity(), *target.rsa_identity()) - } - - /// Return the relay in `netdir` that corresponds to this ID, if there - /// is one. - pub fn get_relay<'a>(&self, netdir: &'a NetDir) -> Option<Relay<'a>> { - netdir.by_id_pair(&self.ed25519, &self.rsa) - } -} - -/// Representation of a guard, as returned by [`GuardMgr::select_guard()`]. +/// Representation of a guard or fallback, as returned by [`GuardMgr::select_guard()`]. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct Guard { +pub struct FirstHop { /// The guard's identities - id: GuardId, + id: FirstHopId, /// The addresses at which the guard can be contacted. orports: Vec<SocketAddr>, } -impl Guard { +impl FirstHop { /// Return the identities of this guard. - pub fn id(&self) -> &GuardId { + pub fn id(&self) -> &FirstHopId { &self.id } /// Look up this guard in `netdir`. @@ -969,15 +1089,15 @@ impl Guard { } // This is somewhat redundant with the implementation in crate::guard::Guard. -impl tor_linkspec::ChanTarget for Guard { +impl tor_linkspec::ChanTarget for FirstHop { fn addrs(&self) -> &[SocketAddr] { &self.orports[..] } fn ed_identity(&self) -> &pk::ed25519::Ed25519Identity { - &self.id.ed25519 + &self.id.as_ref().ed25519 } fn rsa_identity(&self) -> &pk::rsa::RsaIdentity { - &self.id.rsa + &self.id.as_ref().rsa } } @@ -1046,46 +1166,6 @@ pub enum GuardRestriction { AvoidAllIds(HashSet<pk::ed25519::Ed25519Identity>), } -/// An error caused while creating or updating a guard manager. -#[derive(Clone, Debug, thiserror::Error)] -#[non_exhaustive] -pub enum GuardMgrError { - /// An error manipulating persistent state - #[error("Problem accessing persistent state")] - State(#[from] tor_persist::Error), - - /// An error that occurred while trying to spawn a daemon task. - #[error("Unable to spawn {spawning}")] - Spawn { - /// What we were trying to spawn. - spawning: &'static str, - /// What happened when we tried to spawn it. - #[source] - cause: Arc<SpawnError>, - }, -} - -impl HasKind for GuardMgrError { - #[rustfmt::skip] // to preserve table in match - fn kind(&self) -> ErrorKind { - use GuardMgrError as G; - match self { - G::State(e) => e.kind(), - G::Spawn{ cause, .. } => cause.kind(), - } - } -} - -impl GuardMgrError { - /// Construct a new `GuardMgrError` from a `SpawnError`. - fn from_spawn(spawning: &'static str, err: SpawnError) -> GuardMgrError { - GuardMgrError::Spawn { - spawning, - cause: Arc::new(err), - } - } -} - #[cfg(test)] mod test { #![allow(clippy::unwrap_used)] @@ -1105,7 +1185,7 @@ mod test { let statemgr = TestingStateMgr::new(); let have_lock = statemgr.try_lock().unwrap(); assert!(have_lock.held()); - let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap(); + let guardmgr = GuardMgr::new(rt, statemgr.clone(), [].into()).unwrap(); let (con, mds) = testnet::construct_network().unwrap(); let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2" .parse() @@ -1142,7 +1222,7 @@ mod test { drop(guardmgr); // Try reloading from the state... - let guardmgr2 = GuardMgr::new(rt.clone(), statemgr.clone()).unwrap(); + let guardmgr2 = GuardMgr::new(rt.clone(), statemgr.clone(), [].into()).unwrap(); guardmgr2.update_network(&netdir); // Since the guard was confirmed, we should get the same one this time! @@ -1208,7 +1288,7 @@ mod test { let (guard, _mon, _usable) = guardmgr.select_guard(u, Some(&netdir)).unwrap(); // Make sure that the filter worked. - assert_eq!(guard.id().rsa.as_bytes()[0] % 4, 0); + assert_eq!(guard.id().as_ref().rsa.as_bytes()[0] % 4, 0); }); } } diff --git a/crates/tor-guardmgr/src/pending.rs b/crates/tor-guardmgr/src/pending.rs index fff7e2cce20342069233d9935ec37c8b1150985c..7941eab853ba84d7e935d48583e96d5ea9455b3d 100644 --- a/crates/tor-guardmgr/src/pending.rs +++ b/crates/tor-guardmgr/src/pending.rs @@ -7,7 +7,7 @@ //! then the circuit manager can't know whether to use a circuit built //! through that guard until the guard manager tells it. This is //! handled via [`GuardUsable`]. -use crate::{daemon, GuardId}; +use crate::{daemon, FirstHopId}; use educe::Educe; use futures::{ @@ -49,6 +49,8 @@ pub struct GuardUsable { /// /// If absent, then the guard is ready immediately and no waiting /// is needed. + // + // TODO: use a type that makes the case here more distinguishable. #[pin] u: Option<oneshot::Receiver<bool>>, } @@ -65,16 +67,22 @@ impl Future for GuardUsable { } impl GuardUsable { - /// Create a new GuardUsable for a primary guard. + /// Create a new GuardUsable for a primary guard or a fallback directory. /// - /// (Circuits built through primary guards are usable immediately, - /// so we don't need a way to report that this guard is usable.) - pub(crate) fn new_primary() -> Self { + /// (Circuits built through these are usable immediately, independently of + /// whether other guards succeed or fail, so we don't need a way to report + /// whether such guards/fallbacks are usable.) + pub(crate) fn new_usable_immediately() -> Self { GuardUsable { u: None } } - /// Create a new GuardUsable for a guard with undecided usability - /// status. + /// Create a new GuardUsable for a guard with undecided usability status. + /// + /// (We use this constructor when a circuit is built through a non-primary + /// guard, and there are other guards _we would prefer to use, if they turn + /// out to work_. If such a circuit succeeds, the caller must still use + /// this `GuardUsable` to wait until the `GuardMgr` sees whether the + /// more-preferred guards have succeeded or failed.) pub(crate) fn new_uncertain() -> (Self, oneshot::Sender<bool>) { let (snd, rcv) = oneshot::channel(); (GuardUsable { u: Some(rcv) }, snd) @@ -255,7 +263,7 @@ impl RequestId { #[derive(Debug)] pub(crate) struct PendingRequest { /// Identity of the guard that we gave out. - guard_id: GuardId, + guard_id: FirstHopId, /// The usage for which this guard was requested. /// /// We need this information because, if we find that a better guard @@ -282,7 +290,7 @@ pub(crate) struct PendingRequest { impl PendingRequest { /// Create a new PendingRequest. pub(crate) fn new( - guard_id: GuardId, + guard_id: FirstHopId, usage: crate::GuardUsage, usable: Option<oneshot::Sender<bool>>, net_has_been_down: bool, @@ -297,7 +305,7 @@ impl PendingRequest { } /// Return the Id of the guard we gave out. - pub(crate) fn guard_id(&self) -> &GuardId { + pub(crate) fn guard_id(&self) -> &FirstHopId { &self.guard_id } diff --git a/crates/tor-guardmgr/src/sample.rs b/crates/tor-guardmgr/src/sample.rs index d620335e4fc159a62917cd3b5970e365f266ae4d..844519853acbe1088dd5933716b2b40d478b2fc8 100644 --- a/crates/tor-guardmgr/src/sample.rs +++ b/crates/tor-guardmgr/src/sample.rs @@ -3,7 +3,9 @@ use crate::filter::GuardFilter; use crate::guard::{Guard, NewlyConfirmed, Reachable}; -use crate::{ExternalFailure, GuardId, GuardParams, GuardUsage, GuardUsageKind}; +use crate::{ + ids::GuardId, ExternalActivity, GuardParams, GuardUsage, GuardUsageKind, PickGuardError, +}; use tor_netdir::{NetDir, Relay}; use itertools::Itertools; @@ -89,6 +91,8 @@ pub(crate) enum ListKind { Confirmed, /// A non-primary, non-confirmed guard. Sample, + /// Not a guard at all, but a fallback directory. + Fallback, } impl ListKind { @@ -96,6 +100,16 @@ impl ListKind { pub(crate) fn is_primary(&self) -> bool { self == &ListKind::Primary } + + /// Return true if this guard's origin indicates that you can use successful + /// circuits built through it immediately without waiting for any other + /// circuits to succeed or fail. + pub(crate) fn usable_immediately(&self) -> bool { + match self { + ListKind::Primary | ListKind::Fallback => true, + ListKind::Confirmed | ListKind::Sample => false, + } + } } impl GuardSet { @@ -236,7 +250,12 @@ impl GuardSet { // Note: Could implement Borrow instead, but I don't think it'll // matter. let id = GuardId::from_chan_target(relay); - self.guards.contains_key(&id) + self.contains(&id) + } + + /// Return true if `id` is a member of this set. + pub(crate) fn contains(&self, id: &GuardId) -> bool { + self.guards.contains_key(id) } /// If there are not enough filter-permitted usable guards in this @@ -594,7 +613,7 @@ impl GuardSet { pub(crate) fn record_failure( &mut self, guard_id: &GuardId, - how: Option<ExternalFailure>, + how: Option<ExternalActivity>, now: Instant, ) { // TODO: For now, we treat failures of guards for circuit building the @@ -781,41 +800,13 @@ impl<'a> From<GuardSample<'a>> for GuardSet { } } -/// A error caused by a failure to pick a guard. -#[derive(Clone, Debug, thiserror::Error)] -#[non_exhaustive] -pub enum PickGuardError { - /// All members of the current sample were down. - #[error("All guards are down")] - AllGuardsDown { - /// The next time at which any guard will be retriable. - retry_at: Option<Instant>, - }, - - /// Some guards were running, but all of them were either blocked on pending - /// circuits at other guards, unusable for the provided purpose, or filtered - /// out. - #[error("No running guards were usable for the selected purpose")] - NoGuardsUsable, -} - -impl tor_error::HasKind for PickGuardError { - fn kind(&self) -> tor_error::ErrorKind { - use tor_error::ErrorKind as EK; - use PickGuardError as E; - match self { - E::AllGuardsDown { .. } => EK::TorAccessFailed, - E::NoGuardsUsable => EK::NoPath, - } - } -} - #[cfg(test)] mod test { #![allow(clippy::unwrap_used)] use tor_netdoc::doc::netstatus::{RelayFlags, RelayWeight}; use super::*; + use crate::FirstHopId; use std::time::Duration; fn netdir() -> NetDir { @@ -877,7 +868,8 @@ mod test { // make sure all the guards are okay. for (g, guard) in &guards.guards { - let relay = g.get_relay(&netdir).unwrap(); + let id: FirstHopId = g.clone().into(); + let relay = id.get_relay(&netdir).unwrap(); assert!(relay.is_flagged_guard()); assert!(relay.is_dir_cache()); assert!(guards.contains_relay(&relay)); @@ -1235,7 +1227,7 @@ mod test { use tor_netdir::testnet; let netdir2 = testnet::construct_custom_netdir(|idx, bld| { - if idx == p_id1.ed25519.as_bytes()[0] as usize { + if idx == p_id1.0.ed25519.as_bytes()[0] as usize { bld.omit_md = true; } }) diff --git a/crates/tor-llcrypto/src/pk/ed25519.rs b/crates/tor-llcrypto/src/pk/ed25519.rs index 85af8832b8bf3591199b71d6c9e0743d887604ce..082bbf268051a2311e1d56e83c9d465c88dfb41c 100644 --- a/crates/tor-llcrypto/src/pk/ed25519.rs +++ b/crates/tor-llcrypto/src/pk/ed25519.rs @@ -24,7 +24,7 @@ pub use ed25519_dalek::{ExpandedSecretKey, Keypair, PublicKey, SecretKey, Signat /// validation. /// * This type hasn't checked whether the bytes here actually _are_ a /// valid Ed25519 public key. -#[derive(Clone, Copy, Hash)] +#[derive(Clone, Copy, Hash, PartialOrd, Ord)] #[allow(clippy::derive_hash_xor_eq)] pub struct Ed25519Identity { /// A raw unchecked Ed25519 public key. diff --git a/crates/tor-netdir/src/lib.rs b/crates/tor-netdir/src/lib.rs index 2db0a5982fccce477240a2e3bbfcd73873b3f6c7..e0fd9c9e7188302a2f0406281c4e612616c93ada 100644 --- a/crates/tor-netdir/src/lib.rs +++ b/crates/tor-netdir/src/lib.rs @@ -56,7 +56,6 @@ #![deny(clippy::unwrap_used)] mod err; -pub mod fallback; pub mod params; #[cfg(test)] mod testing; diff --git a/doc/semver_status.md b/doc/semver_status.md index 42603e32e268f744a6679a20c4b80b5aa23d02f1..985dcabc54bc8e8b61066e52606b6f737fe38390 100644 --- a/doc/semver_status.md +++ b/doc/semver_status.md @@ -41,6 +41,7 @@ arti: tor-llcrypto: new-api: Added RsaIdentity::from\_hex(). + new-api: Ed25519Identity implements PartialOrd. arti-client: @@ -70,6 +71,8 @@ tor-circmgr: api-break: isolation completely revised + api-break: config must now implement AsRef<FallbackList> + tor-netdoc: new-api (experimental only): Can modify the set of relays in an unverified @@ -77,6 +80,16 @@ tor-netdoc: api-break: changed the return type of GenericRouterStatus::version() +tor-netdir: + + api-break: moved FallbackDir to guardmgr. + +tor-guardmgr: + + new-api: moved FallbackDir from netdir. + + api-break: FallbackDir required in constructor. + tor-proto: new-api: ClientCirc path accessors.