Loading Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -4955,6 +4955,7 @@ dependencies = [ "derive_builder_fork_arti", "futures", "rangemap", "safelog", "serde", "serde_json", "serde_with", Loading crates/tor-hsrproxy/Cargo.toml +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" } # postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } futures = "0.3.14" rangemap = "1.3" safelog = { version = "0.3.2", path = "../safelog" } serde = { version = "1.0.103", features = ["derive"] } serde_with = "3.0.0" thiserror = "1" Loading crates/tor-hsrproxy/src/proxy.rs +112 −34 Original line number Diff line number Diff line Loading @@ -6,13 +6,13 @@ use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, }; use std::io::Result as IoResult; use safelog::sensitive as sv; use std::io::{Error as IoError, Result as IoResult}; use tor_cell::relaycell::msg as relaymsg; use tor_error::debug_report; use tor_error::{debug_report, ErrorKind, HasKind}; use tor_hsservice::StreamRequest; use tor_proto::stream::{DataStream, IncomingStreamRequest}; use tor_rtcompat::Runtime; use tracing::debug; use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr}; Loading @@ -35,6 +35,23 @@ struct State { shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>, } /// An error that prevents further progress while processing requests. #[derive(Clone, Debug, thiserror::Error)] #[non_exhaustive] pub enum HandleRequestsError { /// The runtime says it was unable to spawn a task. #[error("Unable to spawn a task")] Spawn(#[source] Arc<futures::task::SpawnError>), } impl HasKind for HandleRequestsError { fn kind(&self) -> ErrorKind { match self { HandleRequestsError::Spawn(e) => e.kind(), } } } impl OnionServiceReverseProxy { /// Create a new proxy with a given configuration. pub fn new(config: ProxyConfig) -> Arc<Self> { Loading Loading @@ -68,7 +85,11 @@ impl OnionServiceReverseProxy { /// /// The future returned by this function blocks indefinitely, so you may /// want to spawn a separate task for it. pub async fn handle_requests<R, S>(&self, runtime: R, requests: S) pub async fn handle_requests<R, S>( &self, runtime: R, requests: S, ) -> Result<(), HandleRequestsError> where R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, Loading @@ -77,10 +98,24 @@ impl OnionServiceReverseProxy { // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let action = self.choose_action(stream_request.request()); let a_clone = action.clone(); let rt_clone = runtime.clone(); let req = stream_request.request().clone(); let _outcome = runtime.spawn(run_action(runtime.clone(), action, stream_request)); // TODO HSS: if we fail to spawn, report an error and exit runtime .spawn(async move { if let Err(e) = run_action(rt_clone, action, stream_request).await { debug_report!( e, "Unable to perform action {:?} for onion service request {:?}", sv(a_clone), sv(req) ); } }) .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?; } Ok(()) } /// Choose the configured action that we should take in response to a Loading Loading @@ -112,21 +147,21 @@ impl OnionServiceReverseProxy { } /// Take the configured action from `action` on the incoming request `request`. pub(super) async fn run_action<R: Runtime>( async fn run_action<R: Runtime>( runtime: R, action: ProxyAction, request: StreamRequest, ) { ) -> Result<(), RequestFailed> { match action { ProxyAction::DestroyCircuit => { if let Err(e) = request.shutdown_circuit() { debug_report!(e, "Unable to destroy onion service circuit"); } request .shutdown_circuit() .map_err(RequestFailed::CantDestroy)?; } ProxyAction::Forward(encap, target) => match (encap, target) { (Encapsulation::Simple, TargetAddr::Inet(a)) => { let rt_clone = runtime.clone(); forward_connection(rt_clone, request, runtime.connect(&a)).await; forward_connection(rt_clone, request, runtime.connect(&a)).await?; } (Encapsulation::Simple, TargetAddr::Unix(_)) => { // TODO HSS: We need to implement unix connections. Loading @@ -136,61 +171,104 @@ pub(super) async fn run_action<R: Runtime>( // TODO HSS: Does this match the behavior from C tor? let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } request .reject(end) .await .map_err(RequestFailed::CantReject)?; } ProxyAction::IgnoreStream => drop(request), }; Ok(()) } /// An error from a single attempt to handle an onion service request. #[derive(thiserror::Error, Debug, Clone)] enum RequestFailed { /// Encountered an error trying to destroy a circuit. #[error("Unable to destroy onion service circuit")] CantDestroy(#[source] tor_error::Bug), /// Encountered an error trying to reject a single stream request. #[error("Unable to reject onion service request")] CantReject(#[source] tor_hsservice::ClientError), /// Encountered an error trying to open a local connection. #[error("Unable to open connection to local target")] ConnectLocal(#[source] Arc<IoError>), /// Encountered an error trying to tell the remote onion service client that /// we have accepted their connection. #[error("Unable to accept onion service connection")] AcceptRemote(#[source] tor_hsservice::ClientError), /// The runtime refused to spawn a task for us. #[error("Unable to spawn task")] Spawn(#[source] Arc<futures::task::SpawnError>), } impl HasKind for RequestFailed { fn kind(&self) -> ErrorKind { match self { RequestFailed::CantDestroy(e) => e.kind(), RequestFailed::CantReject(e) => e.kind(), RequestFailed::ConnectLocal(_) => ErrorKind::LocalNetworkError, RequestFailed::AcceptRemote(e) => e.kind(), RequestFailed::Spawn(e) => e.kind(), } } } /// Try to open a connection to an appropriate local target using /// `target_stream_future`. If successful, try to report success on `request` /// and trandmit data between the two stream indefinitely. On failure, close /// `request`. async fn forward_connection<R, FUT, TS, E>( async fn forward_connection<R, FUT, TS>( runtime: R, request: StreamRequest, target_stream_future: FUT, ) where ) -> Result<(), RequestFailed> where R: Runtime, FUT: Future<Output = Result<TS, E>>, FUT: Future<Output = Result<TS, IoError>>, TS: AsyncRead + AsyncWrite + Send + 'static, E: std::fmt::Display, { let local_stream = match target_stream_future.await { Ok(s) => s, Err(e) => { Err(e_connecting) => { // TODO HSS: We should log more, since this is likely a missing // local service. // TODO HSS: (This is a major usability problem!) debug!("Unable to connect to onion service target: {}", e); let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); if let Err(e_rejecting) = request.reject(end).await { debug_report!( e_rejecting, "Unable to reject onion service request from client" ); } return; return Err(RequestFailed::ConnectLocal(Arc::new(e_connecting))); } }; let onion_service_stream: DataStream = { // TODO HSS: Does this match the behavior from C tor? let connected = relaymsg::Connected::new_empty(); match request.accept(connected).await { Ok(s) => s, Err(e) => { debug_report!(e, "Unable to accept connection from onion service client"); return; } } request .accept(connected) .await .map_err(RequestFailed::AcceptRemote)? }; let (svc_r, svc_w) = onion_service_stream.split(); let (local_r, local_w) = local_stream.split(); // TODO HSS: Actually detect errors. let _ignore_outcome = runtime.spawn(copy_interactive(local_r, svc_w).map(|_| ())); let _ignore_outcome = runtime.spawn(copy_interactive(svc_r, local_w).map(|_| ())); runtime .spawn(copy_interactive(local_r, svc_w).map(|_| ())) .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; runtime .spawn(copy_interactive(svc_r, local_w).map(|_| ())) .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; Ok(()) } /// Copy all the data from `reader` into `writer` until we encounter an EOF or Loading Loading
Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -4955,6 +4955,7 @@ dependencies = [ "derive_builder_fork_arti", "futures", "rangemap", "safelog", "serde", "serde_json", "serde_with", Loading
crates/tor-hsrproxy/Cargo.toml +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ derive_builder = { version = "0.11.2", package = "derive_builder_fork_arti" } # postage = { version = "0.5.0", default-features = false, features = ["futures-traits"] } futures = "0.3.14" rangemap = "1.3" safelog = { version = "0.3.2", path = "../safelog" } serde = { version = "1.0.103", features = ["derive"] } serde_with = "3.0.0" thiserror = "1" Loading
crates/tor-hsrproxy/src/proxy.rs +112 −34 Original line number Diff line number Diff line Loading @@ -6,13 +6,13 @@ use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, }; use std::io::Result as IoResult; use safelog::sensitive as sv; use std::io::{Error as IoError, Result as IoResult}; use tor_cell::relaycell::msg as relaymsg; use tor_error::debug_report; use tor_error::{debug_report, ErrorKind, HasKind}; use tor_hsservice::StreamRequest; use tor_proto::stream::{DataStream, IncomingStreamRequest}; use tor_rtcompat::Runtime; use tracing::debug; use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr}; Loading @@ -35,6 +35,23 @@ struct State { shutdown_rx: futures::future::Shared<oneshot::Receiver<void::Void>>, } /// An error that prevents further progress while processing requests. #[derive(Clone, Debug, thiserror::Error)] #[non_exhaustive] pub enum HandleRequestsError { /// The runtime says it was unable to spawn a task. #[error("Unable to spawn a task")] Spawn(#[source] Arc<futures::task::SpawnError>), } impl HasKind for HandleRequestsError { fn kind(&self) -> ErrorKind { match self { HandleRequestsError::Spawn(e) => e.kind(), } } } impl OnionServiceReverseProxy { /// Create a new proxy with a given configuration. pub fn new(config: ProxyConfig) -> Arc<Self> { Loading Loading @@ -68,7 +85,11 @@ impl OnionServiceReverseProxy { /// /// The future returned by this function blocks indefinitely, so you may /// want to spawn a separate task for it. pub async fn handle_requests<R, S>(&self, runtime: R, requests: S) pub async fn handle_requests<R, S>( &self, runtime: R, requests: S, ) -> Result<(), HandleRequestsError> where R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, Loading @@ -77,10 +98,24 @@ impl OnionServiceReverseProxy { // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let action = self.choose_action(stream_request.request()); let a_clone = action.clone(); let rt_clone = runtime.clone(); let req = stream_request.request().clone(); let _outcome = runtime.spawn(run_action(runtime.clone(), action, stream_request)); // TODO HSS: if we fail to spawn, report an error and exit runtime .spawn(async move { if let Err(e) = run_action(rt_clone, action, stream_request).await { debug_report!( e, "Unable to perform action {:?} for onion service request {:?}", sv(a_clone), sv(req) ); } }) .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?; } Ok(()) } /// Choose the configured action that we should take in response to a Loading Loading @@ -112,21 +147,21 @@ impl OnionServiceReverseProxy { } /// Take the configured action from `action` on the incoming request `request`. pub(super) async fn run_action<R: Runtime>( async fn run_action<R: Runtime>( runtime: R, action: ProxyAction, request: StreamRequest, ) { ) -> Result<(), RequestFailed> { match action { ProxyAction::DestroyCircuit => { if let Err(e) = request.shutdown_circuit() { debug_report!(e, "Unable to destroy onion service circuit"); } request .shutdown_circuit() .map_err(RequestFailed::CantDestroy)?; } ProxyAction::Forward(encap, target) => match (encap, target) { (Encapsulation::Simple, TargetAddr::Inet(a)) => { let rt_clone = runtime.clone(); forward_connection(rt_clone, request, runtime.connect(&a)).await; forward_connection(rt_clone, request, runtime.connect(&a)).await?; } (Encapsulation::Simple, TargetAddr::Unix(_)) => { // TODO HSS: We need to implement unix connections. Loading @@ -136,61 +171,104 @@ pub(super) async fn run_action<R: Runtime>( // TODO HSS: Does this match the behavior from C tor? let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } request .reject(end) .await .map_err(RequestFailed::CantReject)?; } ProxyAction::IgnoreStream => drop(request), }; Ok(()) } /// An error from a single attempt to handle an onion service request. #[derive(thiserror::Error, Debug, Clone)] enum RequestFailed { /// Encountered an error trying to destroy a circuit. #[error("Unable to destroy onion service circuit")] CantDestroy(#[source] tor_error::Bug), /// Encountered an error trying to reject a single stream request. #[error("Unable to reject onion service request")] CantReject(#[source] tor_hsservice::ClientError), /// Encountered an error trying to open a local connection. #[error("Unable to open connection to local target")] ConnectLocal(#[source] Arc<IoError>), /// Encountered an error trying to tell the remote onion service client that /// we have accepted their connection. #[error("Unable to accept onion service connection")] AcceptRemote(#[source] tor_hsservice::ClientError), /// The runtime refused to spawn a task for us. #[error("Unable to spawn task")] Spawn(#[source] Arc<futures::task::SpawnError>), } impl HasKind for RequestFailed { fn kind(&self) -> ErrorKind { match self { RequestFailed::CantDestroy(e) => e.kind(), RequestFailed::CantReject(e) => e.kind(), RequestFailed::ConnectLocal(_) => ErrorKind::LocalNetworkError, RequestFailed::AcceptRemote(e) => e.kind(), RequestFailed::Spawn(e) => e.kind(), } } } /// Try to open a connection to an appropriate local target using /// `target_stream_future`. If successful, try to report success on `request` /// and trandmit data between the two stream indefinitely. On failure, close /// `request`. async fn forward_connection<R, FUT, TS, E>( async fn forward_connection<R, FUT, TS>( runtime: R, request: StreamRequest, target_stream_future: FUT, ) where ) -> Result<(), RequestFailed> where R: Runtime, FUT: Future<Output = Result<TS, E>>, FUT: Future<Output = Result<TS, IoError>>, TS: AsyncRead + AsyncWrite + Send + 'static, E: std::fmt::Display, { let local_stream = match target_stream_future.await { Ok(s) => s, Err(e) => { Err(e_connecting) => { // TODO HSS: We should log more, since this is likely a missing // local service. // TODO HSS: (This is a major usability problem!) debug!("Unable to connect to onion service target: {}", e); let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); if let Err(e_rejecting) = request.reject(end).await { debug_report!( e_rejecting, "Unable to reject onion service request from client" ); } return; return Err(RequestFailed::ConnectLocal(Arc::new(e_connecting))); } }; let onion_service_stream: DataStream = { // TODO HSS: Does this match the behavior from C tor? let connected = relaymsg::Connected::new_empty(); match request.accept(connected).await { Ok(s) => s, Err(e) => { debug_report!(e, "Unable to accept connection from onion service client"); return; } } request .accept(connected) .await .map_err(RequestFailed::AcceptRemote)? }; let (svc_r, svc_w) = onion_service_stream.split(); let (local_r, local_w) = local_stream.split(); // TODO HSS: Actually detect errors. let _ignore_outcome = runtime.spawn(copy_interactive(local_r, svc_w).map(|_| ())); let _ignore_outcome = runtime.spawn(copy_interactive(svc_r, local_w).map(|_| ())); runtime .spawn(copy_interactive(local_r, svc_w).map(|_| ())) .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; runtime .spawn(copy_interactive(svc_r, local_w).map(|_| ())) .map_err(|e| RequestFailed::Spawn(Arc::new(e)))?; Ok(()) } /// Copy all the data from `reader` into `writer` until we encounter an EOF or Loading