Loading Cargo.lock +5 −0 Original line number Diff line number Diff line Loading @@ -4959,8 +4959,13 @@ dependencies = [ "serde_json", "serde_with", "thiserror", "tor-cell", "tor-config", "tor-error", "tor-hsservice", "tor-proto", "tor-rtcompat", "tracing", "void", ] Loading crates/tor-hsrproxy/Cargo.toml +5 −0 Original line number Diff line number Diff line Loading @@ -25,8 +25,13 @@ rangemap = "1.3" serde = { version = "1.0.103", features = ["derive"] } serde_with = "3.0.0" thiserror = "1" tor-cell = { version = "0.12.2", path = "../tor-cell" } tor-config = { version = "0.9.3", path = "../tor-config" } tor-error = { version = "0.5.4", path = "../tor-error" } tor-hsservice = { path = "../tor-hsservice", version = "0.2.4" } tor-proto = { version = "0.12.1", path = "../tor-proto", features = ["experimental-api", "hs-service"] } tor-rtcompat = { path = "../tor-rtcompat", version = "0.9.4" } tracing = "0.1.36" void = "1" [dev-dependencies] Loading crates/tor-hsrproxy/src/proxy.rs +133 −4 Original line number Diff line number Diff line Loading @@ -2,10 +2,18 @@ use std::sync::{Arc, Mutex}; use futures::{channel::oneshot, FutureExt as _}; use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncWrite, Future, FutureExt as _, Stream, StreamExt as _, }; use tor_cell::relaycell::msg as relaymsg; use tor_error::debug_report; use tor_hsservice::{OnionServiceDataStream, StreamRequest}; use tor_proto::stream::IncomingStreamRequest; use tor_rtcompat::Runtime; use tracing::debug; use crate::config::ProxyConfig; use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr}; /// A reverse proxy that handles connections from an `OnionService` by routing /// them to local addresses. Loading Loading @@ -55,6 +63,127 @@ impl OnionServiceReverseProxy { let _ = state.shutdown_tx.take(); } /// XXXX async fn handle_requests<R: Runtime>(&self, _runtime: R, _requests: ()) {} /// Use this proxy to handle a stream of [`RendRequest`] requests. /// /// The future returned by this function blocks indefinitely, so you may /// want to spawn a separate task for it. pub async fn handle_requests<R, S>(&self, runtime: R, requests: S) where R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, { let mut stream_requests = tor_hsservice::handle_rend_requests(requests); // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let action = self.choose_action(stream_request.request()); let _outcome = runtime.spawn(run_action(runtime.clone(), action, stream_request)); // TODO HSS: if we fail to spawn, report an error and exit } } /// Choose the configured action that we should take in response to a /// [`StreamRequest`], based on our current configuration. fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction { let port: u16 = match stream_request { IncomingStreamRequest::Begin(begin) => { // TODO HSS: Should we look at the address and flags at all? begin.port() } other => { tracing::warn!( "Rejecting onion service request for invalid command {:?}. Internal error.", other ); return ProxyAction::DestroyCircuit; } }; self.state .lock() .expect("poisoned lock") .config .resolve_port_for_begin(port) .cloned() // The default action is "destroy the circuit." .unwrap_or(ProxyAction::DestroyCircuit) } } /// Take the configured action from `action` on the incoming request `request`. pub(super) async fn run_action<R: Runtime>( runtime: R, action: ProxyAction, request: StreamRequest, ) { match action { ProxyAction::DestroyCircuit => { if let Err(e) = request.shutdown_circuit() { debug_report!(e, "Unable to destroy onion service circuit"); } } ProxyAction::Forward(encap, target) => match (encap, target) { (Encapsulation::Simple, TargetAddr::Inet(a)) => { let rt_clone = runtime.clone(); forward_connection(rt_clone, request, runtime.connect(&a)).await; } (Encapsulation::Simple, TargetAddr::Unix(_)) => { // TODO HSS: We need to implement unix connections. } }, ProxyAction::RejectStream => { // TODO HSS: Does this match the behavior from C tor? let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } } ProxyAction::IgnoreStream => drop(request), }; } /// Try to open a connection to an appropriate local target using /// `target_stream_future`. If successful, try to report success on `request` /// and trandmit data between the two stream indefinitely. On failure, close /// `request`. async fn forward_connection<R, FUT, TS, E>( _runtime: R, request: StreamRequest, target_stream_future: FUT, ) where R: Runtime, FUT: Future<Output = Result<TS, E>>, TS: AsyncRead + AsyncWrite, E: std::fmt::Display, { let _local_stream = match target_stream_future.await { Ok(s) => s, Err(e) => { // TODO HSS: We should log more, since this is likely a missing // local service. // TODO HSS: (This is a major usability problem!) debug!("Unable to connect to onion service target: {}", e); let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } return; } }; let _onion_service_stream: OnionServiceDataStream = { // TODO HSS: Does this match the behavior from C tor? let connected = relaymsg::Connected::new_empty(); match request.accept(connected).await { Ok(s) => s, Err(e) => { debug_report!(e, "Unable to accept connection from onion service client"); return; } } }; // TODO HSS: Forward data indefinitely. // TODO HSS: Why is OnionServiceDataStream not the same type as DataStream? } Loading
Cargo.lock +5 −0 Original line number Diff line number Diff line Loading @@ -4959,8 +4959,13 @@ dependencies = [ "serde_json", "serde_with", "thiserror", "tor-cell", "tor-config", "tor-error", "tor-hsservice", "tor-proto", "tor-rtcompat", "tracing", "void", ] Loading
crates/tor-hsrproxy/Cargo.toml +5 −0 Original line number Diff line number Diff line Loading @@ -25,8 +25,13 @@ rangemap = "1.3" serde = { version = "1.0.103", features = ["derive"] } serde_with = "3.0.0" thiserror = "1" tor-cell = { version = "0.12.2", path = "../tor-cell" } tor-config = { version = "0.9.3", path = "../tor-config" } tor-error = { version = "0.5.4", path = "../tor-error" } tor-hsservice = { path = "../tor-hsservice", version = "0.2.4" } tor-proto = { version = "0.12.1", path = "../tor-proto", features = ["experimental-api", "hs-service"] } tor-rtcompat = { path = "../tor-rtcompat", version = "0.9.4" } tracing = "0.1.36" void = "1" [dev-dependencies] Loading
crates/tor-hsrproxy/src/proxy.rs +133 −4 Original line number Diff line number Diff line Loading @@ -2,10 +2,18 @@ use std::sync::{Arc, Mutex}; use futures::{channel::oneshot, FutureExt as _}; use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncWrite, Future, FutureExt as _, Stream, StreamExt as _, }; use tor_cell::relaycell::msg as relaymsg; use tor_error::debug_report; use tor_hsservice::{OnionServiceDataStream, StreamRequest}; use tor_proto::stream::IncomingStreamRequest; use tor_rtcompat::Runtime; use tracing::debug; use crate::config::ProxyConfig; use crate::config::{Encapsulation, ProxyAction, ProxyConfig, TargetAddr}; /// A reverse proxy that handles connections from an `OnionService` by routing /// them to local addresses. Loading Loading @@ -55,6 +63,127 @@ impl OnionServiceReverseProxy { let _ = state.shutdown_tx.take(); } /// XXXX async fn handle_requests<R: Runtime>(&self, _runtime: R, _requests: ()) {} /// Use this proxy to handle a stream of [`RendRequest`] requests. /// /// The future returned by this function blocks indefinitely, so you may /// want to spawn a separate task for it. pub async fn handle_requests<R, S>(&self, runtime: R, requests: S) where R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, { let mut stream_requests = tor_hsservice::handle_rend_requests(requests); // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let action = self.choose_action(stream_request.request()); let _outcome = runtime.spawn(run_action(runtime.clone(), action, stream_request)); // TODO HSS: if we fail to spawn, report an error and exit } } /// Choose the configured action that we should take in response to a /// [`StreamRequest`], based on our current configuration. fn choose_action(&self, stream_request: &IncomingStreamRequest) -> ProxyAction { let port: u16 = match stream_request { IncomingStreamRequest::Begin(begin) => { // TODO HSS: Should we look at the address and flags at all? begin.port() } other => { tracing::warn!( "Rejecting onion service request for invalid command {:?}. Internal error.", other ); return ProxyAction::DestroyCircuit; } }; self.state .lock() .expect("poisoned lock") .config .resolve_port_for_begin(port) .cloned() // The default action is "destroy the circuit." .unwrap_or(ProxyAction::DestroyCircuit) } } /// Take the configured action from `action` on the incoming request `request`. pub(super) async fn run_action<R: Runtime>( runtime: R, action: ProxyAction, request: StreamRequest, ) { match action { ProxyAction::DestroyCircuit => { if let Err(e) = request.shutdown_circuit() { debug_report!(e, "Unable to destroy onion service circuit"); } } ProxyAction::Forward(encap, target) => match (encap, target) { (Encapsulation::Simple, TargetAddr::Inet(a)) => { let rt_clone = runtime.clone(); forward_connection(rt_clone, request, runtime.connect(&a)).await; } (Encapsulation::Simple, TargetAddr::Unix(_)) => { // TODO HSS: We need to implement unix connections. } }, ProxyAction::RejectStream => { // TODO HSS: Does this match the behavior from C tor? let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } } ProxyAction::IgnoreStream => drop(request), }; } /// Try to open a connection to an appropriate local target using /// `target_stream_future`. If successful, try to report success on `request` /// and trandmit data between the two stream indefinitely. On failure, close /// `request`. async fn forward_connection<R, FUT, TS, E>( _runtime: R, request: StreamRequest, target_stream_future: FUT, ) where R: Runtime, FUT: Future<Output = Result<TS, E>>, TS: AsyncRead + AsyncWrite, E: std::fmt::Display, { let _local_stream = match target_stream_future.await { Ok(s) => s, Err(e) => { // TODO HSS: We should log more, since this is likely a missing // local service. // TODO HSS: (This is a major usability problem!) debug!("Unable to connect to onion service target: {}", e); let end = relaymsg::End::new_misc(); if let Err(e) = request.reject(end).await { debug_report!(e, "Unable to reject onion service request from client"); } return; } }; let _onion_service_stream: OnionServiceDataStream = { // TODO HSS: Does this match the behavior from C tor? let connected = relaymsg::Connected::new_empty(); match request.accept(connected).await { Ok(s) => s, Err(e) => { debug_report!(e, "Unable to accept connection from onion service client"); return; } } }; // TODO HSS: Forward data indefinitely. // TODO HSS: Why is OnionServiceDataStream not the same type as DataStream? }