diff --git a/crates/tor-dirclient/src/err.rs b/crates/tor-dirclient/src/err.rs index 97785af86f6d7b3bb27315eb7a9f6a58a5debcd5..01e880c2af0126623b8cfcf20a8feae8df2acd9c 100644 --- a/crates/tor-dirclient/src/err.rs +++ b/crates/tor-dirclient/src/err.rs @@ -4,12 +4,35 @@ use std::sync::Arc; use thiserror::Error; use tor_error::{ErrorKind, HasKind}; +use tor_linkspec::OwnedChanTarget; use tor_rtcompat::TimeoutError; +use crate::SourceInfo; + /// An error originating from the tor-dirclient crate. #[derive(Error, Debug, Clone)] #[non_exhaustive] pub enum Error { + /// Error while getting a circuit + #[error("Error while getting a circuit {0}")] + CircMgr(#[from] tor_circmgr::Error), + + /// An error that has occurred after we have contacted a directory cache and made a circuit to it. + #[error("Error fetching directory information from {source:?}")] + RequestFailed { + /// The source that gave us this error. + source: Option<SourceInfo>, + + /// The underlying error that occurred. + #[source] + error: RequestError, + }, +} + +/// An error originating from the tor-dirclient crate. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum RequestError { /// The directory cache took too long to reply to us. #[error("directory timed out")] DirTimeout, @@ -34,10 +57,6 @@ pub enum Error { #[error("Protocol error while launching a stream: {0}")] Proto(#[from] tor_proto::Error), - /// Error while getting a circuit - #[error("Error while getting a circuit {0}")] - CircMgr(#[from] tor_circmgr::Error), - /// Error when parsing http #[error("Couldn't parse HTTP headers")] HttparseError(#[from] httparse::Error), @@ -55,25 +74,53 @@ pub enum Error { ContentEncoding(String), } -impl From<TimeoutError> for Error { +impl From<TimeoutError> for RequestError { fn from(_: TimeoutError) -> Self { - Error::DirTimeout + RequestError::DirTimeout } } -impl From<std::io::Error> for Error { +impl From<std::io::Error> for RequestError { fn from(err: std::io::Error) -> Self { Self::IoError(Arc::new(err)) } } -impl From<http::Error> for Error { +impl From<http::Error> for RequestError { fn from(err: http::Error) -> Self { Self::HttpError(Arc::new(err)) } } impl Error { + /// Return true if this error means that the circuit shouldn't be used + /// for any more directory requests. + pub fn should_retire_circ(&self) -> bool { + // TODO: probably this is too aggressive, and we should + // actually _not_ dump the circuit under all circumstances. + match self { + Error::CircMgr(_) => true, // should be unreachable. + Error::RequestFailed { error, .. } => error.should_retire_circ(), + } + } + + /// Return the peer or peers that are to be blamed for the error. + /// + /// (This can return multiple peers if the request failed because multiple + /// circuit attempts all failed.) + pub fn cache_ids(&self) -> Vec<&OwnedChanTarget> { + match &self { + Error::CircMgr(e) => e.peers(), + Error::RequestFailed { + source: Some(source), + .. + } => vec![source.cache_id()], + _ => Vec::new(), + } + } +} + +impl RequestError { /// Return true if this error means that the circuit shouldn't be used /// for any more directory requests. pub fn should_retire_circ(&self) -> bool { @@ -83,10 +130,10 @@ impl Error { } } -impl HasKind for Error { +impl HasKind for RequestError { fn kind(&self) -> ErrorKind { - use Error as E; use ErrorKind as EK; + use RequestError as E; match self { E::DirTimeout => EK::TorNetworkTimeout, E::TruncatedHeaders => EK::TorProtocolViolation, @@ -97,10 +144,19 @@ impl HasKind for Error { // downcasting. E::IoError(_) => EK::TorDirectoryError, E::Proto(e) => e.kind(), - E::CircMgr(e) => e.kind(), E::HttparseError(_) => EK::TorProtocolViolation, E::HttpError(_) => EK::Internal, E::ContentEncoding(_) => EK::TorProtocolViolation, } } } + +impl HasKind for Error { + fn kind(&self) -> ErrorKind { + use Error as E; + match self { + E::CircMgr(e) => e.kind(), + E::RequestFailed { error, .. } => error.kind(), + } + } +} diff --git a/crates/tor-dirclient/src/lib.rs b/crates/tor-dirclient/src/lib.rs index d9f2e3490d33ca9258b712698ae3885e5ec2c2f9..457c257bfd2023937038a5a7b134b11aecd8db6f 100644 --- a/crates/tor-dirclient/src/lib.rs +++ b/crates/tor-dirclient/src/lib.rs @@ -78,12 +78,15 @@ use std::sync::Arc; use std::time::Duration; use tracing::info; -pub use err::Error; +pub use err::{Error, RequestError}; pub use response::{DirResponse, SourceInfo}; /// Type for results returned in this crate. pub type Result<T> = std::result::Result<T, Error>; +/// Type for internal results containing a RequestError. +pub type RequestResult<T> = std::result::Result<T, RequestError>; + /// Fetch the resource described by `req` over the Tor network. /// /// Circuits are built or found using `circ_mgr`, using paths @@ -113,10 +116,19 @@ where let begin_timeout = Duration::from_secs(5); let source = SourceInfo::from_circuit(&circuit); + let wrap_err = |error| Error::RequestFailed { + source: Some(source.clone()), + error, + }; + // Launch the stream. let mut stream = runtime .timeout(begin_timeout, circuit.begin_dir_stream()) - .await??; // TODO(nickm) handle fatalities here too + .await + .map_err(RequestError::from) + .map_err(wrap_err)? + .map_err(RequestError::from) + .map_err(wrap_err)?; // TODO(nickm) handle fatalities here too // TODO: Perhaps we want separate timeouts for each phase of this. // For now, we just use higher-level timeouts in `dirmgr`. @@ -134,7 +146,7 @@ where fn should_retire_circ(result: &Result<DirResponse>) -> bool { match result { Err(e) => e.should_retire_circ(), - Ok(dr) => dr.error().map(Error::should_retire_circ) == Some(true), + Ok(dr) => dr.error().map(RequestError::should_retire_circ) == Some(true), } } @@ -163,20 +175,33 @@ where S: AsyncRead + AsyncWrite + Send + Unpin, SP: SleepProvider, { + let wrap_err = |error| Error::RequestFailed { + source: source.clone(), + error, + }; + let partial_ok = req.partial_docs_ok(); let maxlen = req.max_response_len(); - let req = req.make_request()?; + let req = req.make_request().map_err(wrap_err)?; let encoded = util::encode_request(&req); // Write the request. - stream.write_all(encoded.as_bytes()).await?; - stream.flush().await?; + stream + .write_all(encoded.as_bytes()) + .await + .map_err(RequestError::from) + .map_err(wrap_err)?; + stream + .flush() + .await + .map_err(RequestError::from) + .map_err(wrap_err)?; let mut buffered = BufReader::new(stream); // Handle the response // TODO: should there be a separate timeout here? - let header = read_headers(&mut buffered).await?; + let header = read_headers(&mut buffered).await.map_err(wrap_err)?; if header.status != Some(200) { return Ok(DirResponse::new( header.status.unwrap_or(0), @@ -186,7 +211,7 @@ where )); } - let mut decoder = get_decoder(buffered, header.encoding.as_deref())?; + let mut decoder = get_decoder(buffered, header.encoding.as_deref()).map_err(wrap_err)?; let mut result = Vec::new(); let ok = read_and_decompress(runtime, &mut decoder, maxlen, &mut result).await; @@ -197,7 +222,7 @@ where Err(e) } (_, Err(e), _) => { - return Err(e); + return Err(wrap_err(e)); } (_, Ok(()), _) => Ok(()), }; @@ -206,7 +231,7 @@ where } /// Read and parse HTTP/1 headers from `stream`. -async fn read_headers<S>(stream: &mut S) -> Result<HeaderStatus> +async fn read_headers<S>(stream: &mut S) -> RequestResult<HeaderStatus> where S: AsyncBufRead + Unpin, { @@ -228,7 +253,7 @@ where if n == 0 { // We hit an EOF; no more progress can be made. - return Err(Error::TruncatedHeaders); + return Err(RequestError::TruncatedHeaders); } // TODO(nickm): Pick a better maximum @@ -266,7 +291,7 @@ where } } if n == 0 { - return Err(Error::TruncatedHeaders); + return Err(RequestError::TruncatedHeaders); } } } @@ -293,7 +318,7 @@ async fn read_and_decompress<S, SP>( mut stream: S, maxlen: usize, result: &mut Vec<u8>, -) -> Result<()> +) -> RequestResult<()> where S: AsyncRead + Unpin, SP: SleepProvider, @@ -315,7 +340,7 @@ where status = stream.read(buf).fuse() => status, _ = timer => { result.resize(written_total, 0); // truncate as needed - return Err(Error::DirTimeout); + return Err(RequestError::DirTimeout); } }; let written_in_this_loop = match status { @@ -349,7 +374,7 @@ where // filling our RAM. if written_total > maxlen { result.resize(maxlen, 0); - return Err(Error::ResponseTooLong(written_total)); + return Err(RequestError::ResponseTooLong(written_total)); } } } @@ -421,7 +446,7 @@ macro_rules! decoder { fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>( stream: S, encoding: Option<&str>, -) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>> { +) -> RequestResult<Box<dyn AsyncRead + Unpin + Send + 'a>> { match encoding { None | Some("identity") => Ok(Box::new(stream)), Some("deflate") => decoder!(ZlibDecoder, stream), @@ -429,7 +454,7 @@ fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>( Some("x-tor-lzma") => decoder!(XzDecoder, stream), #[cfg(feature = "zstd")] Some("x-zstd") => decoder!(ZstdDecoder, stream), - Some(other) => Err(Error::ContentEncoding(other.into())), + Some(other) => Err(RequestError::ContentEncoding(other.into())), } } @@ -442,7 +467,7 @@ mod test { use futures_await_test::async_test; #[async_test] - async fn test_read_until_limited() -> Result<()> { + async fn test_read_until_limited() -> RequestResult<()> { let mut out = Vec::new(); let bytes = b"This line eventually ends\nthen comes another\n"; @@ -474,7 +499,7 @@ mod test { encoding: Option<&str>, data: &[u8], maxlen: usize, - ) -> (Result<()>, Vec<u8>) { + ) -> (RequestResult<()>, Vec<u8>) { // We don't need to do anything fancy here, since we aren't simulating // a timeout. let mock_time = MockSleepProvider::new(std::time::SystemTime::now()); @@ -491,7 +516,7 @@ mod test { } #[async_test] - async fn decompress_identity() -> Result<()> { + async fn decompress_identity() -> RequestResult<()> { let mut text = Vec::new(); for _ in 0..1000 { text.extend(b"This is a string with a nontrivial length that we'll use to make sure that the loop is executed more than once."); @@ -516,7 +541,7 @@ mod test { } #[async_test] - async fn decomp_zlib() -> Result<()> { + async fn decomp_zlib() -> RequestResult<()> { let compressed = hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap(); @@ -530,7 +555,7 @@ mod test { #[cfg(feature = "zstd")] #[async_test] - async fn decomp_zstd() -> Result<()> { + async fn decomp_zstd() -> RequestResult<()> { let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap(); let limit = 10 << 20; let (s, r) = decomp_basic(Some("x-zstd"), &compressed, limit).await; @@ -542,7 +567,7 @@ mod test { #[cfg(feature = "xz")] #[async_test] - async fn decomp_xz2() -> Result<()> { + async fn decomp_xz2() -> RequestResult<()> { // Not so good at tiny files... let compressed = hex::decode("fd377a585a000004e6d6b446020021011c00000010cf58cce00024001d5d00279b88a202ca8612cfb3c19c87c34248a570451e4851d3323d34ab8000000000000901af64854c91f600013925d6ec06651fb6f37d010000000004595a").unwrap(); let limit = 10 << 20; @@ -559,7 +584,7 @@ mod test { let limit = 10 << 20; let (s, _r) = decomp_basic(Some("x-proprietary-rle"), &compressed, limit).await; - assert!(matches!(s, Err(Error::ContentEncoding(_)))); + assert!(matches!(s, Err(RequestError::ContentEncoding(_)))); } #[async_test] @@ -569,11 +594,11 @@ mod test { let (s, _r) = decomp_basic(Some("deflate"), compressed, limit).await; // This should possibly be a different type in the future. - assert!(matches!(s, Err(Error::IoError(_)))); + assert!(matches!(s, Err(RequestError::IoError(_)))); } #[async_test] - async fn headers_ok() -> Result<()> { + async fn headers_ok() -> RequestResult<()> { let text = b"HTTP/1.0 200 OK\r\nDate: ignored\r\nContent-Encoding: Waffles\r\n\r\n"; let mut s = &text[..]; @@ -585,7 +610,7 @@ mod test { // now try truncated let mut s = &text[..15]; let h = read_headers(&mut s).await; - assert!(matches!(h, Err(Error::TruncatedHeaders))); + assert!(matches!(h, Err(RequestError::TruncatedHeaders))); // now try with no encoding. let text = b"HTTP/1.0 404 Not found\r\n\r\n"; @@ -605,7 +630,7 @@ mod test { let h = read_headers(&mut s).await; assert!(h.is_err()); - assert!(matches!(h, Err(Error::HttparseError(_)))); + assert!(matches!(h, Err(RequestError::HttparseError(_)))); Ok(()) } @@ -617,17 +642,24 @@ mod test { fn run_download_test<Req: request::Requestable>( req: Req, response: &[u8], - ) -> (Result<DirResponse>, Result<Vec<u8>>) { + ) -> (Result<DirResponse>, RequestResult<Vec<u8>>) { let (mut s1, s2) = stream_pair(); let (mut s2_r, mut s2_w) = s2.split(); tor_rtcompat::test_with_one_runtime!(|rt| async move { let rt2 = rt.clone(); - let (v1, v2, v3): (Result<DirResponse>, Result<Vec<u8>>, Result<()>) = futures::join!( + let (v1, v2, v3): ( + Result<DirResponse>, + RequestResult<Vec<u8>>, + RequestResult<()>, + ) = futures::join!( async { // Run the download function. let r = download(&rt, &req, &mut s1, None).await; - s1.close().await?; + s1.close().await.map_err(|error| Error::RequestFailed { + source: None, + error: error.into(), + })?; r }, async { @@ -664,7 +696,7 @@ mod test { } #[test] - fn test_download() -> Result<()> { + fn test_download() -> RequestResult<()> { let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); let (response, request) = run_download_test( @@ -677,8 +709,7 @@ mod test { b"GET /tor/micro/d/CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQk.z HTTP/1.0\r\n" )); - assert!(!should_retire_circ(&response)); - let response = response?; + let response = response.unwrap(); assert_eq!(response.status_code(), 200); assert!(!response.is_partial()); assert!(response.error().is_none()); @@ -692,7 +723,7 @@ mod test { } #[test] - fn test_download_truncated() -> Result<()> { + fn test_download_truncated() { // Request only one md, so "partial ok" will not be set. let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); let mut response_text: Vec<u8> = @@ -714,14 +745,12 @@ mod test { let (response, request) = run_download_test(req, &response_text); assert!(request.is_ok()); - let response = response?; + let response = response.unwrap(); assert_eq!(response.status_code(), 200); assert!(response.error().is_some()); assert!(response.is_partial()); assert!(response.output().len() < 37 * 2); assert!(response.output().starts_with(b"One fish")); - - Ok(()) } #[test] @@ -739,14 +768,26 @@ mod test { let response_text = b"HTTP/1.0 404 truncation happens here\r\n"; let (response, _request) = run_download_test(req, response_text); - assert!(matches!(response, Err(Error::TruncatedHeaders))); + assert!(matches!( + response, + Err(Error::RequestFailed { + error: RequestError::TruncatedHeaders, + .. + }) + )); // Try a completely empty response. let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect(); let response_text = b""; let (response, _request) = run_download_test(req, response_text); - assert!(matches!(response, Err(Error::TruncatedHeaders))); + assert!(matches!( + response, + Err(Error::RequestFailed { + error: RequestError::TruncatedHeaders, + .. + }) + )); } #[test] @@ -756,8 +797,14 @@ mod test { response_text.resize(16384, b'A'); let (response, _request) = run_download_test(req, &response_text); - assert!(should_retire_circ(&response)); - assert!(matches!(response, Err(Error::HttparseError(_)))); + assert!(response.as_ref().unwrap_err().should_retire_circ()); + assert!(matches!( + response, + Err(Error::RequestFailed { + error: RequestError::HttparseError(_), + .. + }) + )); } // TODO: test with bad utf-8 diff --git a/crates/tor-dirclient/src/request.rs b/crates/tor-dirclient/src/request.rs index e8beed011f2e1e93e99fe17c933165b5e30f905d..5c5a72f136a79fe7abac021a6ebc7449de2655db 100644 --- a/crates/tor-dirclient/src/request.rs +++ b/crates/tor-dirclient/src/request.rs @@ -8,7 +8,8 @@ use tor_netdoc::doc::netstatus::ConsensusFlavor; #[cfg(feature = "routerdesc")] use tor_netdoc::doc::routerdesc::RdDigest; -use crate::Result; +/// Alias for a result with a `RequestError`. +type Result<T> = std::result::Result<T, crate::err::RequestError>; use std::iter::FromIterator; use std::time::SystemTime; diff --git a/crates/tor-dirclient/src/response.rs b/crates/tor-dirclient/src/response.rs index dff122164a806f5b2a778d69be2338dbfb327395..1f55f030f5d76d55aae977d4da52bf5085539e08 100644 --- a/crates/tor-dirclient/src/response.rs +++ b/crates/tor-dirclient/src/response.rs @@ -3,7 +3,7 @@ use tor_linkspec::OwnedChanTarget; use tor_proto::circuit::{ClientCirc, UniqId}; -use crate::Error; +use crate::RequestError; /// A successful (or at any rate, well-formed) response to a directory /// request. @@ -14,7 +14,7 @@ pub struct DirResponse { /// The decompressed output that we got from the directory cache. output: Vec<u8>, /// The error, if any, that caused us to stop getting this response early. - error: Option<Error>, + error: Option<RequestError>, /// Information about the directory cache we used. source: Option<SourceInfo>, } @@ -35,7 +35,7 @@ impl DirResponse { /// Construct a new DirResponse from its parts pub(crate) fn new( status: u16, - error: Option<Error>, + error: Option<RequestError>, output: Vec<u8>, source: Option<SourceInfo>, ) -> Self { @@ -63,7 +63,7 @@ impl DirResponse { } /// Return the error from this response, if any. - pub fn error(&self) -> Option<&Error> { + pub fn error(&self) -> Option<&RequestError> { self.error.as_ref() } diff --git a/doc/semver_status.md b/doc/semver_status.md index 3e75b1c5d3b52382744a51d9fc0b53f12655d37a..02031396cd2ff74093b0ad2bbcb9610d52be87fe 100644 --- a/doc/semver_status.md +++ b/doc/semver_status.md @@ -56,6 +56,9 @@ tor-circmgr: api-break: Some error types have changed to include peer info. +tor-dirclient: + api-break: refactored Error type. + tor-dirmgr: new-api: DirMgrConfig object now has accessors. DirMgrCfg: totally changed, builder abolished.