Loading Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3314,6 +3314,7 @@ dependencies = [ "thiserror", "tor-circmgr", "tor-error", "tor-linkspec", "tor-llcrypto", "tor-netdoc", "tor-proto", Loading crates/tor-circmgr/src/build.rs +25 −5 Original line number Diff line number Diff line Loading @@ -80,7 +80,10 @@ async fn create_common<RT: Runtime, CT: ChanTarget>( peer: OwnedChanTarget::from_chan_target(target), cause, })?; let (pending_circ, reactor) = chan.new_circ().await?; let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol { error, peer: None, // we don't blame the peer, because new_circ() does no networking. })?; rt.spawn(async { let _ = reactor.run().await; Loading @@ -99,7 +102,12 @@ impl Buildable for ClientCirc { params: &CircParameters, ) -> Result<Self> { let circ = create_common(chanmgr, rt, ct).await?; Ok(circ.create_firsthop_fast(params).await?) circ.create_firsthop_fast(params) .await .map_err(|error| Error::Protocol { peer: Some(ct.clone()), error, }) } async fn create<RT: Runtime>( chanmgr: &ChanMgr<RT>, Loading @@ -108,7 +116,12 @@ impl Buildable for ClientCirc { params: &CircParameters, ) -> Result<Self> { let circ = create_common(chanmgr, rt, ct).await?; Ok(circ.create_firsthop_ntor(ct, params.clone()).await?) circ.create_firsthop_ntor(ct, params.clone()) .await .map_err(|error| Error::Protocol { peer: Some(OwnedChanTarget::from_chan_target(ct)), error, }) } async fn extend<RT: Runtime>( &self, Loading @@ -116,8 +129,15 @@ impl Buildable for ClientCirc { ct: &OwnedCircTarget, params: &CircParameters, ) -> Result<()> { self.extend_ntor(ct, params).await?; Ok(()) self.extend_ntor(ct, params) .await .map_err(|error| Error::Protocol { error, // We can't know who caused the error, since it may have been // the hop we were extending from, or the hop we were extending // to. peer: None, }) } } Loading crates/tor-circmgr/src/err.rs +25 −5 Original line number Diff line number Diff line Loading @@ -76,8 +76,16 @@ pub enum Error { }, /// Protocol issue while building a circuit. #[error("Problem building a circuit: {0}")] Protocol(#[from] tor_proto::Error), #[error("Problem building a circuit with {peer:?}")] Protocol { /// The peer that created the protocol error. /// /// This is set to None if we can't blame a single party. peer: Option<OwnedChanTarget>, /// The underlying error. #[source] error: tor_proto::Error, }, /// We have an expired consensus #[error("Consensus is expired")] Loading Loading @@ -143,10 +151,10 @@ impl HasKind for Error { .map(|e| e.kind()) .unwrap_or(EK::Internal), E::CircCanceled => EK::TransientFailure, E::Protocol(e) => e.kind(), E::Protocol { error, .. } => error.kind(), E::State(e) => e.kind(), E::GuardMgr(e) => e.kind(), E::Guard(_) => EK::NoPath, E::Guard(e) => e.kind(), E::ExpiredConsensus => EK::DirectoryExpired, E::Spawn { cause, .. } => cause.kind(), } Loading Loading @@ -179,11 +187,23 @@ impl Error { E::Guard(_) => 40, E::RequestFailed(_) => 40, E::Channel { .. } => 40, E::Protocol(_) => 45, E::Protocol { .. } => 45, E::ExpiredConsensus => 50, E::Spawn { .. } => 90, E::State(_) => 90, E::Bug(_) => 100, } } /// Return a list of the peers to "blame" for this error, if there are any. pub fn peers(&self) -> Vec<&OwnedChanTarget> { match self { Error::RequestFailed(errors) => errors.sources().flat_map(|e| e.peers()).collect(), Error::Channel { peer, .. } => vec![peer], Error::Protocol { peer: Some(peer), .. } => vec![peer], _ => vec![], } } } crates/tor-circmgr/src/lib.rs +14 −3 Original line number Diff line number Diff line Loading @@ -85,6 +85,8 @@ pub use config::{ use crate::preemptive::PreemptiveCircuitPredictor; use usage::TargetCircUsage; pub use tor_guardmgr::{ExternalFailure, GuardId}; /// A Result type as returned from this crate. pub type Result<T> = std::result::Result<T, Error>; Loading @@ -105,13 +107,13 @@ const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts"; #[non_exhaustive] pub enum DirInfo<'a> { /// A list of fallbacks, for use when we don't know a network directory. Fallbacks(&'a [FallbackDir]), Fallbacks(&'a [&'a FallbackDir]), /// A complete network directory Directory(&'a NetDir), } impl<'a> From<&'a [FallbackDir]> for DirInfo<'a> { fn from(v: &'a [FallbackDir]) -> DirInfo<'a> { impl<'a> From<&'a [&'a FallbackDir]> for DirInfo<'a> { fn from(v: &'a [&'a FallbackDir]) -> DirInfo<'a> { DirInfo::Fallbacks(v) } } Loading Loading @@ -430,6 +432,15 @@ impl<R: Runtime> CircMgr<R> { Ok(()) } /// Record that a failure occurred on a circuit with a given guard, in a way /// that makes us unwilling to use that guard for future circuits. pub fn note_external_failure(&self, id: &GuardId, external_failure: ExternalFailure) { self.mgr .peek_builder() .guardmgr() .note_external_failure(id, external_failure); } } impl<R: Runtime> Drop for CircMgr<R> { Loading crates/tor-circmgr/src/mgr.rs +39 −8 Original line number Diff line number Diff line Loading @@ -780,11 +780,27 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> { } } Err(e) => { // We couldn't pick the action! This is unusual; wait // a little while before we try again. // We couldn't pick the action! info!("Couldn't pick action for circuit attempt {}: {}", n, &e); let small_delay = Duration::from_millis(50); let wait_for_action = match &e { Error::Guard(tor_guardmgr::PickGuardError::AllGuardsDown { retry_at: Some(instant), }) => { // If we failed because all guards are down, that's fine: we just wait until // the next guard is retriable. instant.saturating_duration_since(self.runtime.now()) + small_delay } _ => { // Any other errors are pretty unusual; wait a little while, then try again. small_delay } }; retry_err.push(e); let wait_for_action = Duration::from_millis(50); if remaining < wait_for_action { // We can't wait long enough. Call this failed now. break; } self.runtime .sleep(std::cmp::min(remaining, wait_for_action)) .await; Loading Loading @@ -869,12 +885,27 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> { // Okay, we need to launch circuits here. let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage)); let mut plans = Vec::new(); let mut last_err = None; for _ in 0..parallelism { let (pending, plan) = self.plan_by_usage(dir, usage)?; match self.plan_by_usage(dir, usage) { Ok((pending, plan)) => { list.add_pending_circ(pending); plans.push(plan); } Err(e) => { debug!("Unable to make a plan for {:?}: {}", usage, e); last_err = Some(e); } } } if !plans.is_empty() { Ok(Action::Build(plans)) } else if let Some(last_err) = last_err { Err(last_err) } else { // we didn't even try to plan anything! Err(internal!("no plans were built, but no errors were found").into()) } } /// Execute an action returned by pick-action, and return the Loading Loading @@ -1432,7 +1463,7 @@ mod test { const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30); static DI_EMPTY: [tor_netdir::fallback::FallbackDir; 0] = []; static DI_EMPTY: [&tor_netdir::fallback::FallbackDir; 0] = []; fn di() -> DirInfo<'static> { DI_EMPTY[..].into() Loading Loading
Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3314,6 +3314,7 @@ dependencies = [ "thiserror", "tor-circmgr", "tor-error", "tor-linkspec", "tor-llcrypto", "tor-netdoc", "tor-proto", Loading
crates/tor-circmgr/src/build.rs +25 −5 Original line number Diff line number Diff line Loading @@ -80,7 +80,10 @@ async fn create_common<RT: Runtime, CT: ChanTarget>( peer: OwnedChanTarget::from_chan_target(target), cause, })?; let (pending_circ, reactor) = chan.new_circ().await?; let (pending_circ, reactor) = chan.new_circ().await.map_err(|error| Error::Protocol { error, peer: None, // we don't blame the peer, because new_circ() does no networking. })?; rt.spawn(async { let _ = reactor.run().await; Loading @@ -99,7 +102,12 @@ impl Buildable for ClientCirc { params: &CircParameters, ) -> Result<Self> { let circ = create_common(chanmgr, rt, ct).await?; Ok(circ.create_firsthop_fast(params).await?) circ.create_firsthop_fast(params) .await .map_err(|error| Error::Protocol { peer: Some(ct.clone()), error, }) } async fn create<RT: Runtime>( chanmgr: &ChanMgr<RT>, Loading @@ -108,7 +116,12 @@ impl Buildable for ClientCirc { params: &CircParameters, ) -> Result<Self> { let circ = create_common(chanmgr, rt, ct).await?; Ok(circ.create_firsthop_ntor(ct, params.clone()).await?) circ.create_firsthop_ntor(ct, params.clone()) .await .map_err(|error| Error::Protocol { peer: Some(OwnedChanTarget::from_chan_target(ct)), error, }) } async fn extend<RT: Runtime>( &self, Loading @@ -116,8 +129,15 @@ impl Buildable for ClientCirc { ct: &OwnedCircTarget, params: &CircParameters, ) -> Result<()> { self.extend_ntor(ct, params).await?; Ok(()) self.extend_ntor(ct, params) .await .map_err(|error| Error::Protocol { error, // We can't know who caused the error, since it may have been // the hop we were extending from, or the hop we were extending // to. peer: None, }) } } Loading
crates/tor-circmgr/src/err.rs +25 −5 Original line number Diff line number Diff line Loading @@ -76,8 +76,16 @@ pub enum Error { }, /// Protocol issue while building a circuit. #[error("Problem building a circuit: {0}")] Protocol(#[from] tor_proto::Error), #[error("Problem building a circuit with {peer:?}")] Protocol { /// The peer that created the protocol error. /// /// This is set to None if we can't blame a single party. peer: Option<OwnedChanTarget>, /// The underlying error. #[source] error: tor_proto::Error, }, /// We have an expired consensus #[error("Consensus is expired")] Loading Loading @@ -143,10 +151,10 @@ impl HasKind for Error { .map(|e| e.kind()) .unwrap_or(EK::Internal), E::CircCanceled => EK::TransientFailure, E::Protocol(e) => e.kind(), E::Protocol { error, .. } => error.kind(), E::State(e) => e.kind(), E::GuardMgr(e) => e.kind(), E::Guard(_) => EK::NoPath, E::Guard(e) => e.kind(), E::ExpiredConsensus => EK::DirectoryExpired, E::Spawn { cause, .. } => cause.kind(), } Loading Loading @@ -179,11 +187,23 @@ impl Error { E::Guard(_) => 40, E::RequestFailed(_) => 40, E::Channel { .. } => 40, E::Protocol(_) => 45, E::Protocol { .. } => 45, E::ExpiredConsensus => 50, E::Spawn { .. } => 90, E::State(_) => 90, E::Bug(_) => 100, } } /// Return a list of the peers to "blame" for this error, if there are any. pub fn peers(&self) -> Vec<&OwnedChanTarget> { match self { Error::RequestFailed(errors) => errors.sources().flat_map(|e| e.peers()).collect(), Error::Channel { peer, .. } => vec![peer], Error::Protocol { peer: Some(peer), .. } => vec![peer], _ => vec![], } } }
crates/tor-circmgr/src/lib.rs +14 −3 Original line number Diff line number Diff line Loading @@ -85,6 +85,8 @@ pub use config::{ use crate::preemptive::PreemptiveCircuitPredictor; use usage::TargetCircUsage; pub use tor_guardmgr::{ExternalFailure, GuardId}; /// A Result type as returned from this crate. pub type Result<T> = std::result::Result<T, Error>; Loading @@ -105,13 +107,13 @@ const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts"; #[non_exhaustive] pub enum DirInfo<'a> { /// A list of fallbacks, for use when we don't know a network directory. Fallbacks(&'a [FallbackDir]), Fallbacks(&'a [&'a FallbackDir]), /// A complete network directory Directory(&'a NetDir), } impl<'a> From<&'a [FallbackDir]> for DirInfo<'a> { fn from(v: &'a [FallbackDir]) -> DirInfo<'a> { impl<'a> From<&'a [&'a FallbackDir]> for DirInfo<'a> { fn from(v: &'a [&'a FallbackDir]) -> DirInfo<'a> { DirInfo::Fallbacks(v) } } Loading Loading @@ -430,6 +432,15 @@ impl<R: Runtime> CircMgr<R> { Ok(()) } /// Record that a failure occurred on a circuit with a given guard, in a way /// that makes us unwilling to use that guard for future circuits. pub fn note_external_failure(&self, id: &GuardId, external_failure: ExternalFailure) { self.mgr .peek_builder() .guardmgr() .note_external_failure(id, external_failure); } } impl<R: Runtime> Drop for CircMgr<R> { Loading
crates/tor-circmgr/src/mgr.rs +39 −8 Original line number Diff line number Diff line Loading @@ -780,11 +780,27 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> { } } Err(e) => { // We couldn't pick the action! This is unusual; wait // a little while before we try again. // We couldn't pick the action! info!("Couldn't pick action for circuit attempt {}: {}", n, &e); let small_delay = Duration::from_millis(50); let wait_for_action = match &e { Error::Guard(tor_guardmgr::PickGuardError::AllGuardsDown { retry_at: Some(instant), }) => { // If we failed because all guards are down, that's fine: we just wait until // the next guard is retriable. instant.saturating_duration_since(self.runtime.now()) + small_delay } _ => { // Any other errors are pretty unusual; wait a little while, then try again. small_delay } }; retry_err.push(e); let wait_for_action = Duration::from_millis(50); if remaining < wait_for_action { // We can't wait long enough. Call this failed now. break; } self.runtime .sleep(std::cmp::min(remaining, wait_for_action)) .await; Loading Loading @@ -869,12 +885,27 @@ impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> { // Okay, we need to launch circuits here. let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage)); let mut plans = Vec::new(); let mut last_err = None; for _ in 0..parallelism { let (pending, plan) = self.plan_by_usage(dir, usage)?; match self.plan_by_usage(dir, usage) { Ok((pending, plan)) => { list.add_pending_circ(pending); plans.push(plan); } Err(e) => { debug!("Unable to make a plan for {:?}: {}", usage, e); last_err = Some(e); } } } if !plans.is_empty() { Ok(Action::Build(plans)) } else if let Some(last_err) = last_err { Err(last_err) } else { // we didn't even try to plan anything! Err(internal!("no plans were built, but no errors were found").into()) } } /// Execute an action returned by pick-action, and return the Loading Loading @@ -1432,7 +1463,7 @@ mod test { const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30); static DI_EMPTY: [tor_netdir::fallback::FallbackDir; 0] = []; static DI_EMPTY: [&tor_netdir::fallback::FallbackDir; 0] = []; fn di() -> DirInfo<'static> { DI_EMPTY[..].into() Loading