diff --git a/crates/tor-chanmgr/src/builder.rs b/crates/tor-chanmgr/src/builder.rs index 1d2d7820564daa87bcd382b725691f616bebfff9..cabbb4b56f665d1efb8a4fc098e2f63a232fe8d7 100644 --- a/crates/tor-chanmgr/src/builder.rs +++ b/crates/tor-chanmgr/src/builder.rs @@ -1,7 +1,8 @@ //! Implement a concrete type to build channels. use std::io; -use std::sync::Mutex; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use crate::{event::ChanMgrEventSender, Error}; @@ -9,10 +10,16 @@ use std::time::Duration; use tor_error::{bad_api_usage, internal}; use tor_linkspec::{ChanTarget, OwnedChanTarget}; use tor_llcrypto::pk; -use tor_rtcompat::{tls::TlsConnector, Runtime, TlsProvider}; +use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider}; use async_trait::async_trait; +use futures::stream::FuturesUnordered; use futures::task::SpawnExt; +use futures::StreamExt; +use futures::{FutureExt, TryFutureExt}; + +/// Time to wait between starting parallel connections to the same relay. +static CONNECTION_DELAY: Duration = Duration::from_millis(150); /// TLS-based channel builder. /// @@ -56,6 +63,68 @@ impl<R: Runtime> crate::mgr::ChannelFactory for ChanBuilder<R> { } } +/// Connect to one of the addresses in `addrs` by running connections in parallel until one works. +/// +/// This implements a basic version of RFC 8305 "happy eyeballs". +async fn connect_to_one<R: Runtime>( + rt: &R, + addrs: &[SocketAddr], +) -> crate::Result<(<R as TcpProvider>::TcpStream, SocketAddr)> { + // We need *some* addresses to connect to. + if addrs.is_empty() { + return Err(Error::UnusableTarget(bad_api_usage!( + "No addresses for chosen relay" + ))); + } + + // Turn each address into a future that waits (i * CONNECTION_DELAY), then + // attempts to connect to the address using the runtime (where i is the + // array index). Shove all of these into a `FuturesUnordered`, polling them + // simultaneously and returning the results in completion order. + // + // This is basically the concurrent-connection stuff from RFC 8305, ish. + // TODO(eta): sort the addresses first? + let mut connections = addrs + .iter() + .enumerate() + .map(|(i, a)| { + let delay = rt.sleep(CONNECTION_DELAY * i as u32); + delay.then(move |_| { + tracing::info!("Connecting to {}", a); + rt.connect(a) + .map_ok(move |stream| (stream, *a)) + .map_err(move |e| (e, *a)) + }) + }) + .collect::<FuturesUnordered<_>>(); + + let mut ret = None; + let mut errors = vec![]; + + while let Some(result) = connections.next().await { + match result { + Ok(s) => { + // We got a stream (and address). + ret = Some(s); + break; + } + Err((e, a)) => { + // We got a failure on one of the streams. Store the error. + // TODO(eta): ideally we'd start the next connection attempt immediately. + tracing::warn!("Connection to {} failed: {}", a, e); + errors.push((e, a)); + } + } + } + + // Ensure we don't continue trying to make connections. + drop(connections); + + ret.ok_or_else(|| Error::ChannelBuild { + addresses: errors.into_iter().map(|(e, a)| (a, Arc::new(e))).collect(), + }) +} + impl<R: Runtime> ChanBuilder<R> { /// As build_channel, but don't include a timeout. async fn build_channel_notimeout( @@ -66,16 +135,6 @@ impl<R: Runtime> ChanBuilder<R> { use tor_rtcompat::tls::CertifiedConn; // 1. Negotiate the TLS connection. - - // TODO: This just uses the first address. Instead we could be - // smarter, or use "happy eyeballs", or whatever. Maybe we will - // want to refactor as we do so? - let addr = target.addrs().get(0).ok_or_else(|| { - Error::UnusableTarget(bad_api_usage!("No addresses for chosen relay")) - })?; - - tracing::info!("Negotiating TLS with {}", addr); - { self.event_sender .lock() @@ -83,21 +142,16 @@ impl<R: Runtime> ChanBuilder<R> { .record_attempt(); } + let (stream, addr) = connect_to_one(&self.runtime, target.addrs()).await?; + let map_ioe = |action: &'static str| { move |ioe: io::Error| Error::Io { action, - peer: *addr, + peer: addr, source: ioe.into(), } }; - // Establish a TCP connection. - let stream = self - .runtime - .connect(addr) - .await - .map_err(map_ioe("connect"))?; - { self.event_sender .lock() @@ -126,7 +180,7 @@ impl<R: Runtime> ChanBuilder<R> { // 2. Set up the channel. let mut builder = ChannelBuilder::new(); - builder.set_declared_addr(*addr); + builder.set_declared_addr(addr); let chan = builder.launch(tls).connect().await?; let now = self.runtime.wallclock(); let chan = chan.check(target, &peer_cert, Some(now))?; diff --git a/crates/tor-chanmgr/src/err.rs b/crates/tor-chanmgr/src/err.rs index 9c87a83c94aac3b7fc63299f7f4dcf94f6ba8ed3..c7ea6789e422f0834d1d142f326818b88b0e79e2 100644 --- a/crates/tor-chanmgr/src/err.rs +++ b/crates/tor-chanmgr/src/err.rs @@ -42,6 +42,14 @@ pub enum Error { source: Arc<std::io::Error>, }, + /// Failed to build a channel, after trying multiple addresses. + #[error("Channel build failed: [(address, error)] = {addresses:?}")] + ChannelBuild { + /// The list of addresses we tried to connect to, coupled with + /// the error we encountered connecting to each one. + addresses: Vec<(SocketAddr, Arc<std::io::Error>)>, + }, + /// Unable to spawn task #[error("unable to spawn {spawning}")] Spawn { @@ -79,6 +87,7 @@ impl tor_error::HasKind for Error { E::Proto(e) => e.kind(), E::PendingFailed => EK::TorAccessFailed, E::UnusableTarget(_) | E::Internal(_) => EK::Internal, + Error::ChannelBuild { .. } => EK::TorAccessFailed, } } }