Loading crates/arti-client/src/client.rs +53 −36 Original line number Diff line number Diff line Loading @@ -103,30 +103,41 @@ pub enum BootstrapBehavior { } /// Preferences for how to route a stream over the Tor network. #[derive(Debug, Clone, Default)] pub struct StreamPrefs { #[derive(Debug, Clone)] pub struct StreamPrefs<T = IsolationToken> { /// What kind of IPv6/IPv4 we'd prefer, and how strongly. ip_ver_pref: IpVersionPreference, /// How should we isolate connection(s) ? isolation: StreamIsolationPreference, isolation: StreamIsolationPreference<T>, /// Whether to return the stream optimistically. optimistic_stream: bool, } impl<T> Default for StreamPrefs<T> { fn default() -> Self { StreamPrefs { ip_ver_pref: Default::default(), isolation: Default::default(), optimistic_stream: Default::default(), } } } /// Record of how we are isolating connections #[derive(Debug, Clone, Educe)] #[educe(Default)] enum StreamIsolationPreference { enum StreamIsolationPreference<T> { /// No additional isolation #[educe(Default)] None, /// Id of the isolation group the connection should be part of Explicit(IsolationToken), /// TODO Explicit(Arc<T>), /// Isolate every connection! EveryStream, } impl StreamPrefs { impl<T> StreamPrefs<T> { /// Construct a new StreamPrefs. pub fn new() -> Self { Self::default() Loading Loading @@ -216,22 +227,8 @@ impl StreamPrefs { /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn set_isolation_group(&mut self, isolation_group: IsolationToken) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(isolation_group); self } /// Indicate that connections with these preferences should have their own isolation group /// /// This is a convenience method which creates a fresh [`IsolationToken`] /// and sets it for these preferences. /// /// This connection preference is orthogonal to isolation established by /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn new_isolation_group(&mut self) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(IsolationToken::new()); pub fn set_isolation_group(&mut self, isolation_group: T) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(Arc::new(isolation_group)); self } Loading @@ -255,18 +252,35 @@ impl StreamPrefs { /// Return a token to describe which connections might use /// the same circuit as this one. fn isolation_group(&self) -> Option<IsolationToken> { fn isolation_group(&self) -> Option<Result<Arc<T>, IsolationToken>> { use StreamIsolationPreference as SIP; match self.isolation { SIP::None => None, SIP::Explicit(ig) => Some(ig), SIP::EveryStream => Some(IsolationToken::new()), SIP::Explicit(ref ig) => Some(Ok(ig.clone())), SIP::EveryStream => Some(Err(IsolationToken::new())), } } // TODO: Add some way to be IPFlexible, and require exit to support both. } impl StreamPrefs<IsolationToken> { /// Indicate that connections with these preferences should have their own isolation group /// /// This is a convenience method which creates a fresh [`IsolationToken`] /// and sets it for these preferences. /// /// This connection preference is orthogonal to isolation established by /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn new_isolation_group(&mut self) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(Arc::new(IsolationToken::new())); self } } #[cfg(feature = "tokio")] impl TorClient<PreferredRuntime> { /// Bootstrap a connection to the Tor network, using the provided `config`. /// Loading Loading @@ -659,10 +673,10 @@ impl<R: Runtime> TorClient<R> { /// Note that because Tor prefers to do DNS resolution on the remote /// side of the network, this function takes its address as a string. /// (See [`TorClient::connect()`] for more information.) pub async fn connect_with_prefs<A: IntoTorAddr>( pub async fn connect_with_prefs<A: IntoTorAddr, T: tor_circmgr::isolation::Isolation>( &self, target: A, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<DataStream> { let addr = target.into_tor_addr().map_err(wrap_err)?; addr.enforce_config(&self.addrcfg.get())?; Loading Loading @@ -698,7 +712,7 @@ impl<R: Runtime> TorClient<R> { // // This function is private just because we're not sure we want to provide this API. // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/250#note_2771238 fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs) { fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs<IsolationToken>) { self.connect_prefs = connect_prefs; } Loading @@ -707,7 +721,7 @@ impl<R: Runtime> TorClient<R> { /// Connections made with e.g. [`connect`](TorClient::connect) on the returned handle will use /// `connect_prefs`. This is a convenience wrapper for `clone` and `set_connect_prefs`. #[must_use] pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs) -> Self { pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs<IsolationToken>) -> Self { let mut result = self.clone(); result.set_stream_prefs(connect_prefs); result Loading @@ -719,10 +733,10 @@ impl<R: Runtime> TorClient<R> { } /// On success, return a list of IP addresses, but use prefs. pub async fn resolve_with_prefs( pub async fn resolve_with_prefs<T: tor_circmgr::isolation::Isolation>( &self, hostname: &str, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<Vec<IpAddr>> { let addr = (hostname, 1).into_tor_addr().map_err(wrap_err)?; addr.enforce_config(&self.addrcfg.get()).map_err(wrap_err)?; Loading Loading @@ -750,10 +764,10 @@ impl<R: Runtime> TorClient<R> { /// Perform a remote DNS reverse lookup with the provided IP address. /// /// On success, return a list of hostnames. pub async fn resolve_ptr_with_prefs( pub async fn resolve_ptr_with_prefs<T: tor_circmgr::isolation::Isolation>( &self, addr: IpAddr, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<Vec<String>> { let circ = self.get_or_launch_exit_circ(&[], prefs).await?; Loading Loading @@ -802,10 +816,10 @@ impl<R: Runtime> TorClient<R> { /// Get or launch an exit-suitable circuit with a given set of /// exit ports. async fn get_or_launch_exit_circ( async fn get_or_launch_exit_circ<T: tor_circmgr::isolation::Isolation>( &self, exit_ports: &[TargetPort], prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> StdResult<ClientCirc, ErrorDetail> { self.wait_for_bootstrap().await?; let dir = self Loading @@ -821,7 +835,10 @@ impl<R: Runtime> TorClient<R> { b.owner_token(self.client_isolation); // Consider stream isolation too, if it's set. if let Some(tok) = prefs.isolation_group() { b.stream_token(tok); match tok { Ok(tok) => b.stream_token(tok), Err(tok) => b.stream_token(Arc::new(tok)), }; } // Failure should be impossible with this builder. b.build().expect("Failed to construct StreamIsolation") Loading crates/tor-circmgr/src/lib.rs +4 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,10 @@ mod usage; pub use err::Error; pub use usage::{IsolationToken, StreamIsolation, StreamIsolationBuilder, TargetPort, TargetPorts}; /// TODO pub mod isolation { pub use crate::usage::{Isolation, IsolationHelper, IsolationToken}; } pub use config::{ CircMgrConfig, CircMgrConfigBuilder, CircuitTiming, CircuitTimingBuilder, PathConfig, Loading crates/tor-circmgr/src/usage.rs +67 −9 Original line number Diff line number Diff line Loading @@ -100,6 +100,40 @@ impl Display for TargetPorts { } } use std::any::Any; pub trait AsAny { fn as_any(&self) -> &dyn Any; } impl<T: 'static> AsAny for T { fn as_any(&self) -> &dyn Any { self } } /// TODO pub trait Isolation: AsAny + std::fmt::Debug + Send + Sync + 'static { /// TODO fn isolated(&self, other: &dyn Isolation) -> bool; } impl<T: IsolationHelper + std::fmt::Debug + Send + Sync + 'static> Isolation for T { fn isolated(&self, other: &dyn Isolation) -> bool { if let Some(other) = AsAny::as_any(other).downcast_ref() { self.isolated_same_type(other) } else { false } } } /// TODO pub trait IsolationHelper { /// TODO fn isolated_same_type(&self, other: &Self) -> bool; } /// A token used to isolate unrelated streams on different circuits. /// /// When two streams are associated with different isolation tokens, they Loading Loading @@ -181,15 +215,21 @@ impl IsolationToken { } } impl IsolationHelper for IsolationToken { fn isolated_same_type(&self, other: &Self) -> bool { self == other } } /// A set of information about how a stream should be isolated. /// /// If two streams are isolated from one another, they may not share /// a circuit. #[derive(Copy, Clone, Eq, Debug, PartialEq, PartialOrd, Ord, derive_builder::Builder)] #[derive(Clone, Debug, derive_builder::Builder)] pub struct StreamIsolation { /// Any isolation token set on the stream. #[builder(default = "IsolationToken::no_isolation()")] stream_token: IsolationToken, #[builder(setter(strip_option), default)] stream_token: Option<Arc<dyn Isolation>>, /// Any additional isolation token set on an object that "owns" this /// stream. This is typically owned by a `TorClient`. #[builder(default = "IsolationToken::no_isolation()")] Loading @@ -213,7 +253,19 @@ impl StreamIsolation { /// Return true if this StreamIsolation can share a circuit with /// `other`. fn may_share_circuit(&self, other: &StreamIsolation) -> bool { self == other self.owner_token == other.owner_token && match (&self.stream_token, &other.stream_token) { (None, None) => true, (Some(this), Some(other)) => !this.isolated(other.as_ref()), _ => false, } } } impl Eq for StreamIsolation {} impl PartialEq for StreamIsolation { fn eq(&self, other: &Self) -> bool { self.may_share_circuit(other) } } Loading Loading @@ -249,7 +301,7 @@ impl ExitPolicy { /// /// This type should stay internal to the circmgr crate for now: we'll probably /// want to refactor it a lot. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum TargetCircUsage { /// Use for BEGINDIR-based non-anonymous directory connections Dir, Loading Loading @@ -352,7 +404,7 @@ impl TargetCircUsage { path, SupportedCircUsage::Exit { policy, isolation: Some(*isolation), isolation: Some(isolation.clone()), }, mon, usable, Loading Loading @@ -393,7 +445,9 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage { isolation: i2, }, ) => { i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true) i1.as_ref() .map(|i1| i1.may_share_circuit(i2)) .unwrap_or(true) && p2.iter().all(|port| p1.allows_port(*port)) } (Exit { policy, isolation }, TargetCircUsage::Preemptive { port, .. }) => { Loading Loading @@ -428,10 +482,14 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage { .. }, TargetCircUsage::Exit { isolation: i2, .. }, ) if i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true) => { ) if i1 .as_ref() .map(|i1| i1.may_share_circuit(i2)) .unwrap_or(true) => { // Once we have more complex isolation, this assignment // won't be correct. *i1 = Some(*i2); *i1 = Some(i2.clone()); Ok(()) } (Exit { .. }, TargetCircUsage::Exit { .. }) => { Loading Loading
crates/arti-client/src/client.rs +53 −36 Original line number Diff line number Diff line Loading @@ -103,30 +103,41 @@ pub enum BootstrapBehavior { } /// Preferences for how to route a stream over the Tor network. #[derive(Debug, Clone, Default)] pub struct StreamPrefs { #[derive(Debug, Clone)] pub struct StreamPrefs<T = IsolationToken> { /// What kind of IPv6/IPv4 we'd prefer, and how strongly. ip_ver_pref: IpVersionPreference, /// How should we isolate connection(s) ? isolation: StreamIsolationPreference, isolation: StreamIsolationPreference<T>, /// Whether to return the stream optimistically. optimistic_stream: bool, } impl<T> Default for StreamPrefs<T> { fn default() -> Self { StreamPrefs { ip_ver_pref: Default::default(), isolation: Default::default(), optimistic_stream: Default::default(), } } } /// Record of how we are isolating connections #[derive(Debug, Clone, Educe)] #[educe(Default)] enum StreamIsolationPreference { enum StreamIsolationPreference<T> { /// No additional isolation #[educe(Default)] None, /// Id of the isolation group the connection should be part of Explicit(IsolationToken), /// TODO Explicit(Arc<T>), /// Isolate every connection! EveryStream, } impl StreamPrefs { impl<T> StreamPrefs<T> { /// Construct a new StreamPrefs. pub fn new() -> Self { Self::default() Loading Loading @@ -216,22 +227,8 @@ impl StreamPrefs { /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn set_isolation_group(&mut self, isolation_group: IsolationToken) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(isolation_group); self } /// Indicate that connections with these preferences should have their own isolation group /// /// This is a convenience method which creates a fresh [`IsolationToken`] /// and sets it for these preferences. /// /// This connection preference is orthogonal to isolation established by /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn new_isolation_group(&mut self) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(IsolationToken::new()); pub fn set_isolation_group(&mut self, isolation_group: T) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(Arc::new(isolation_group)); self } Loading @@ -255,18 +252,35 @@ impl StreamPrefs { /// Return a token to describe which connections might use /// the same circuit as this one. fn isolation_group(&self) -> Option<IsolationToken> { fn isolation_group(&self) -> Option<Result<Arc<T>, IsolationToken>> { use StreamIsolationPreference as SIP; match self.isolation { SIP::None => None, SIP::Explicit(ig) => Some(ig), SIP::EveryStream => Some(IsolationToken::new()), SIP::Explicit(ref ig) => Some(Ok(ig.clone())), SIP::EveryStream => Some(Err(IsolationToken::new())), } } // TODO: Add some way to be IPFlexible, and require exit to support both. } impl StreamPrefs<IsolationToken> { /// Indicate that connections with these preferences should have their own isolation group /// /// This is a convenience method which creates a fresh [`IsolationToken`] /// and sets it for these preferences. /// /// This connection preference is orthogonal to isolation established by /// [`TorClient::isolated_client`]. Connections made with an `isolated_client` (and its /// clones) will not share circuits with the original client, even if the same /// `isolation_group` is specified via the `ConnectionPrefs` in force. pub fn new_isolation_group(&mut self) -> &mut Self { self.isolation = StreamIsolationPreference::Explicit(Arc::new(IsolationToken::new())); self } } #[cfg(feature = "tokio")] impl TorClient<PreferredRuntime> { /// Bootstrap a connection to the Tor network, using the provided `config`. /// Loading Loading @@ -659,10 +673,10 @@ impl<R: Runtime> TorClient<R> { /// Note that because Tor prefers to do DNS resolution on the remote /// side of the network, this function takes its address as a string. /// (See [`TorClient::connect()`] for more information.) pub async fn connect_with_prefs<A: IntoTorAddr>( pub async fn connect_with_prefs<A: IntoTorAddr, T: tor_circmgr::isolation::Isolation>( &self, target: A, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<DataStream> { let addr = target.into_tor_addr().map_err(wrap_err)?; addr.enforce_config(&self.addrcfg.get())?; Loading Loading @@ -698,7 +712,7 @@ impl<R: Runtime> TorClient<R> { // // This function is private just because we're not sure we want to provide this API. // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/250#note_2771238 fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs) { fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs<IsolationToken>) { self.connect_prefs = connect_prefs; } Loading @@ -707,7 +721,7 @@ impl<R: Runtime> TorClient<R> { /// Connections made with e.g. [`connect`](TorClient::connect) on the returned handle will use /// `connect_prefs`. This is a convenience wrapper for `clone` and `set_connect_prefs`. #[must_use] pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs) -> Self { pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs<IsolationToken>) -> Self { let mut result = self.clone(); result.set_stream_prefs(connect_prefs); result Loading @@ -719,10 +733,10 @@ impl<R: Runtime> TorClient<R> { } /// On success, return a list of IP addresses, but use prefs. pub async fn resolve_with_prefs( pub async fn resolve_with_prefs<T: tor_circmgr::isolation::Isolation>( &self, hostname: &str, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<Vec<IpAddr>> { let addr = (hostname, 1).into_tor_addr().map_err(wrap_err)?; addr.enforce_config(&self.addrcfg.get()).map_err(wrap_err)?; Loading Loading @@ -750,10 +764,10 @@ impl<R: Runtime> TorClient<R> { /// Perform a remote DNS reverse lookup with the provided IP address. /// /// On success, return a list of hostnames. pub async fn resolve_ptr_with_prefs( pub async fn resolve_ptr_with_prefs<T: tor_circmgr::isolation::Isolation>( &self, addr: IpAddr, prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> crate::Result<Vec<String>> { let circ = self.get_or_launch_exit_circ(&[], prefs).await?; Loading Loading @@ -802,10 +816,10 @@ impl<R: Runtime> TorClient<R> { /// Get or launch an exit-suitable circuit with a given set of /// exit ports. async fn get_or_launch_exit_circ( async fn get_or_launch_exit_circ<T: tor_circmgr::isolation::Isolation>( &self, exit_ports: &[TargetPort], prefs: &StreamPrefs, prefs: &StreamPrefs<T>, ) -> StdResult<ClientCirc, ErrorDetail> { self.wait_for_bootstrap().await?; let dir = self Loading @@ -821,7 +835,10 @@ impl<R: Runtime> TorClient<R> { b.owner_token(self.client_isolation); // Consider stream isolation too, if it's set. if let Some(tok) = prefs.isolation_group() { b.stream_token(tok); match tok { Ok(tok) => b.stream_token(tok), Err(tok) => b.stream_token(Arc::new(tok)), }; } // Failure should be impossible with this builder. b.build().expect("Failed to construct StreamIsolation") Loading
crates/tor-circmgr/src/lib.rs +4 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,10 @@ mod usage; pub use err::Error; pub use usage::{IsolationToken, StreamIsolation, StreamIsolationBuilder, TargetPort, TargetPorts}; /// TODO pub mod isolation { pub use crate::usage::{Isolation, IsolationHelper, IsolationToken}; } pub use config::{ CircMgrConfig, CircMgrConfigBuilder, CircuitTiming, CircuitTimingBuilder, PathConfig, Loading
crates/tor-circmgr/src/usage.rs +67 −9 Original line number Diff line number Diff line Loading @@ -100,6 +100,40 @@ impl Display for TargetPorts { } } use std::any::Any; pub trait AsAny { fn as_any(&self) -> &dyn Any; } impl<T: 'static> AsAny for T { fn as_any(&self) -> &dyn Any { self } } /// TODO pub trait Isolation: AsAny + std::fmt::Debug + Send + Sync + 'static { /// TODO fn isolated(&self, other: &dyn Isolation) -> bool; } impl<T: IsolationHelper + std::fmt::Debug + Send + Sync + 'static> Isolation for T { fn isolated(&self, other: &dyn Isolation) -> bool { if let Some(other) = AsAny::as_any(other).downcast_ref() { self.isolated_same_type(other) } else { false } } } /// TODO pub trait IsolationHelper { /// TODO fn isolated_same_type(&self, other: &Self) -> bool; } /// A token used to isolate unrelated streams on different circuits. /// /// When two streams are associated with different isolation tokens, they Loading Loading @@ -181,15 +215,21 @@ impl IsolationToken { } } impl IsolationHelper for IsolationToken { fn isolated_same_type(&self, other: &Self) -> bool { self == other } } /// A set of information about how a stream should be isolated. /// /// If two streams are isolated from one another, they may not share /// a circuit. #[derive(Copy, Clone, Eq, Debug, PartialEq, PartialOrd, Ord, derive_builder::Builder)] #[derive(Clone, Debug, derive_builder::Builder)] pub struct StreamIsolation { /// Any isolation token set on the stream. #[builder(default = "IsolationToken::no_isolation()")] stream_token: IsolationToken, #[builder(setter(strip_option), default)] stream_token: Option<Arc<dyn Isolation>>, /// Any additional isolation token set on an object that "owns" this /// stream. This is typically owned by a `TorClient`. #[builder(default = "IsolationToken::no_isolation()")] Loading @@ -213,7 +253,19 @@ impl StreamIsolation { /// Return true if this StreamIsolation can share a circuit with /// `other`. fn may_share_circuit(&self, other: &StreamIsolation) -> bool { self == other self.owner_token == other.owner_token && match (&self.stream_token, &other.stream_token) { (None, None) => true, (Some(this), Some(other)) => !this.isolated(other.as_ref()), _ => false, } } } impl Eq for StreamIsolation {} impl PartialEq for StreamIsolation { fn eq(&self, other: &Self) -> bool { self.may_share_circuit(other) } } Loading Loading @@ -249,7 +301,7 @@ impl ExitPolicy { /// /// This type should stay internal to the circmgr crate for now: we'll probably /// want to refactor it a lot. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum TargetCircUsage { /// Use for BEGINDIR-based non-anonymous directory connections Dir, Loading Loading @@ -352,7 +404,7 @@ impl TargetCircUsage { path, SupportedCircUsage::Exit { policy, isolation: Some(*isolation), isolation: Some(isolation.clone()), }, mon, usable, Loading Loading @@ -393,7 +445,9 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage { isolation: i2, }, ) => { i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true) i1.as_ref() .map(|i1| i1.may_share_circuit(i2)) .unwrap_or(true) && p2.iter().all(|port| p1.allows_port(*port)) } (Exit { policy, isolation }, TargetCircUsage::Preemptive { port, .. }) => { Loading Loading @@ -428,10 +482,14 @@ impl crate::mgr::AbstractSpec for SupportedCircUsage { .. }, TargetCircUsage::Exit { isolation: i2, .. }, ) if i1.map(|i1| i1.may_share_circuit(i2)).unwrap_or(true) => { ) if i1 .as_ref() .map(|i1| i1.may_share_circuit(i2)) .unwrap_or(true) => { // Once we have more complex isolation, this assignment // won't be correct. *i1 = Some(*i2); *i1 = Some(i2.clone()); Ok(()) } (Exit { .. }, TargetCircUsage::Exit { .. }) => { Loading