Loading crates/tor-hsrproxy/src/proxy.rs +23 −6 Original line number Diff line number Diff line Loading @@ -3,8 +3,8 @@ use std::sync::{Arc, Mutex}; use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, channel::oneshot, select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, }; use safelog::sensitive as sv; use std::io::{Error as IoError, Result as IoResult}; Loading Loading @@ -94,9 +94,27 @@ impl OnionServiceReverseProxy { R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, { let mut stream_requests = tor_hsservice::handle_rend_requests(requests); // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse(); let mut shutdown_rx = self .state .lock() .expect("poisoned lock") .shutdown_rx .clone() .fuse(); loop { let stream_request = select_biased! { _ = shutdown_rx => return Ok(()), stream_request = stream_requests.next() => match stream_request { None => return Ok(()), Some(s) => s, } // TODO HSS: we might want to have some mechanism here to report // any fatal errors that occur from run_action, or from one of // the tasks it spawns. }; let action = self.choose_action(stream_request.request()); let a_clone = action.clone(); let rt_clone = runtime.clone(); Loading @@ -115,7 +133,6 @@ impl OnionServiceReverseProxy { }) .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?; } Ok(()) } /// Choose the configured action that we should take in response to a Loading Loading
crates/tor-hsrproxy/src/proxy.rs +23 −6 Original line number Diff line number Diff line Loading @@ -3,8 +3,8 @@ use std::sync::{Arc, Mutex}; use futures::{ channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, channel::oneshot, select_biased, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Future, FutureExt as _, Stream, StreamExt as _, }; use safelog::sensitive as sv; use std::io::{Error as IoError, Result as IoResult}; Loading Loading @@ -94,9 +94,27 @@ impl OnionServiceReverseProxy { R: Runtime, S: Stream<Item = tor_hsservice::RendRequest> + Unpin, { let mut stream_requests = tor_hsservice::handle_rend_requests(requests); // TODO HSS: Actually look at shutdown_rx here! while let Some(stream_request) = stream_requests.next().await { let mut stream_requests = tor_hsservice::handle_rend_requests(requests).fuse(); let mut shutdown_rx = self .state .lock() .expect("poisoned lock") .shutdown_rx .clone() .fuse(); loop { let stream_request = select_biased! { _ = shutdown_rx => return Ok(()), stream_request = stream_requests.next() => match stream_request { None => return Ok(()), Some(s) => s, } // TODO HSS: we might want to have some mechanism here to report // any fatal errors that occur from run_action, or from one of // the tasks it spawns. }; let action = self.choose_action(stream_request.request()); let a_clone = action.clone(); let rt_clone = runtime.clone(); Loading @@ -115,7 +133,6 @@ impl OnionServiceReverseProxy { }) .map_err(|e| HandleRequestsError::Spawn(Arc::new(e)))?; } Ok(()) } /// Choose the configured action that we should take in response to a Loading