Commit acfa0f77 authored by Nick Mathewson's avatar Nick Mathewson 🤹
Browse files

dirclient: Collect and expose peer information from errors.

This commit refactors the dirclient error type into two cases:
errors when constructing a circuit, and errors that occur once we
already have a one-hop circuit.  The latter can usually be
attributed to the specific cache we're talking to.

This commit also adds a function to expose the information about
which directory gave us the info.
parent d38aafa0
Loading
Loading
Loading
Loading
+67 −11
Original line number Diff line number Diff line
@@ -4,12 +4,35 @@ use std::sync::Arc;

use thiserror::Error;
use tor_error::{ErrorKind, HasKind};
use tor_linkspec::OwnedChanTarget;
use tor_rtcompat::TimeoutError;

use crate::SourceInfo;

/// An error originating from the tor-dirclient crate.
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum Error {
    /// Error while getting a circuit
    #[error("Error while getting a circuit {0}")]
    CircMgr(#[from] tor_circmgr::Error),

    /// An error that has occurred after we have contacted a directory cache and made a circuit to it.
    #[error("Error fetching directory information from {source:?}")]
    RequestFailed {
        /// The source that gave us this error.
        source: Option<SourceInfo>,

        /// The underlying error that occurred.
        #[source]
        error: RequestError,
    },
}

/// An error originating from the tor-dirclient crate.
#[derive(Error, Debug, Clone)]
#[non_exhaustive]
pub enum RequestError {
    /// The directory cache took too long to reply to us.
    #[error("directory timed out")]
    DirTimeout,
@@ -34,10 +57,6 @@ pub enum Error {
    #[error("Protocol error while launching a stream: {0}")]
    Proto(#[from] tor_proto::Error),

    /// Error while getting a circuit
    #[error("Error while getting a circuit {0}")]
    CircMgr(#[from] tor_circmgr::Error),

    /// Error when parsing http
    #[error("Couldn't parse HTTP headers")]
    HttparseError(#[from] httparse::Error),
@@ -55,25 +74,53 @@ pub enum Error {
    ContentEncoding(String),
}

impl From<TimeoutError> for Error {
impl From<TimeoutError> for RequestError {
    fn from(_: TimeoutError) -> Self {
        Error::DirTimeout
        RequestError::DirTimeout
    }
}

impl From<std::io::Error> for Error {
impl From<std::io::Error> for RequestError {
    fn from(err: std::io::Error) -> Self {
        Self::IoError(Arc::new(err))
    }
}

impl From<http::Error> for Error {
impl From<http::Error> for RequestError {
    fn from(err: http::Error) -> Self {
        Self::HttpError(Arc::new(err))
    }
}

impl Error {
    /// Return true if this error means that the circuit shouldn't be used
    /// for any more directory requests.
    pub fn should_retire_circ(&self) -> bool {
        // TODO: probably this is too aggressive, and we should
        // actually _not_ dump the circuit under all circumstances.
        match self {
            Error::CircMgr(_) => true, // should be unreachable.
            Error::RequestFailed { error, .. } => error.should_retire_circ(),
        }
    }

    /// Return the peer or peers that are to be blamed for the error.
    ///
    /// (This can return multiple peers if the request failed because multiple
    /// circuit attempts all failed.)
    pub fn cache_ids(&self) -> Vec<&OwnedChanTarget> {
        match &self {
            Error::CircMgr(e) => e.peers(),
            Error::RequestFailed {
                source: Some(source),
                ..
            } => vec![source.cache_id()],
            _ => Vec::new(),
        }
    }
}

impl RequestError {
    /// Return true if this error means that the circuit shouldn't be used
    /// for any more directory requests.
    pub fn should_retire_circ(&self) -> bool {
@@ -83,10 +130,10 @@ impl Error {
    }
}

impl HasKind for Error {
impl HasKind for RequestError {
    fn kind(&self) -> ErrorKind {
        use Error as E;
        use ErrorKind as EK;
        use RequestError as E;
        match self {
            E::DirTimeout => EK::TorNetworkTimeout,
            E::TruncatedHeaders => EK::TorProtocolViolation,
@@ -97,10 +144,19 @@ impl HasKind for Error {
            // downcasting.
            E::IoError(_) => EK::TorDirectoryError,
            E::Proto(e) => e.kind(),
            E::CircMgr(e) => e.kind(),
            E::HttparseError(_) => EK::TorProtocolViolation,
            E::HttpError(_) => EK::Internal,
            E::ContentEncoding(_) => EK::TorProtocolViolation,
        }
    }
}

impl HasKind for Error {
    fn kind(&self) -> ErrorKind {
        use Error as E;
        match self {
            E::CircMgr(e) => e.kind(),
            E::RequestFailed { error, .. } => error.kind(),
        }
    }
}
+89 −42
Original line number Diff line number Diff line
@@ -78,12 +78,15 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::info;

pub use err::Error;
pub use err::{Error, RequestError};
pub use response::{DirResponse, SourceInfo};

/// Type for results returned in this crate.
pub type Result<T> = std::result::Result<T, Error>;

/// Type for internal results  containing a RequestError.
pub type RequestResult<T> = std::result::Result<T, RequestError>;

/// Fetch the resource described by `req` over the Tor network.
///
/// Circuits are built or found using `circ_mgr`, using paths
@@ -113,10 +116,19 @@ where
    let begin_timeout = Duration::from_secs(5);
    let source = SourceInfo::from_circuit(&circuit);

    let wrap_err = |error| Error::RequestFailed {
        source: Some(source.clone()),
        error,
    };

    // Launch the stream.
    let mut stream = runtime
        .timeout(begin_timeout, circuit.begin_dir_stream())
        .await??; // TODO(nickm) handle fatalities here too
        .await
        .map_err(RequestError::from)
        .map_err(wrap_err)?
        .map_err(RequestError::from)
        .map_err(wrap_err)?; // TODO(nickm) handle fatalities here too

    // TODO: Perhaps we want separate timeouts for each phase of this.
    // For now, we just use higher-level timeouts in `dirmgr`.
@@ -134,7 +146,7 @@ where
fn should_retire_circ(result: &Result<DirResponse>) -> bool {
    match result {
        Err(e) => e.should_retire_circ(),
        Ok(dr) => dr.error().map(Error::should_retire_circ) == Some(true),
        Ok(dr) => dr.error().map(RequestError::should_retire_circ) == Some(true),
    }
}

@@ -163,20 +175,33 @@ where
    S: AsyncRead + AsyncWrite + Send + Unpin,
    SP: SleepProvider,
{
    let wrap_err = |error| Error::RequestFailed {
        source: source.clone(),
        error,
    };

    let partial_ok = req.partial_docs_ok();
    let maxlen = req.max_response_len();
    let req = req.make_request()?;
    let req = req.make_request().map_err(wrap_err)?;
    let encoded = util::encode_request(&req);

    // Write the request.
    stream.write_all(encoded.as_bytes()).await?;
    stream.flush().await?;
    stream
        .write_all(encoded.as_bytes())
        .await
        .map_err(RequestError::from)
        .map_err(wrap_err)?;
    stream
        .flush()
        .await
        .map_err(RequestError::from)
        .map_err(wrap_err)?;

    let mut buffered = BufReader::new(stream);

    // Handle the response
    // TODO: should there be a separate timeout here?
    let header = read_headers(&mut buffered).await?;
    let header = read_headers(&mut buffered).await.map_err(wrap_err)?;
    if header.status != Some(200) {
        return Ok(DirResponse::new(
            header.status.unwrap_or(0),
@@ -186,7 +211,7 @@ where
        ));
    }

    let mut decoder = get_decoder(buffered, header.encoding.as_deref())?;
    let mut decoder = get_decoder(buffered, header.encoding.as_deref()).map_err(wrap_err)?;

    let mut result = Vec::new();
    let ok = read_and_decompress(runtime, &mut decoder, maxlen, &mut result).await;
@@ -197,7 +222,7 @@ where
            Err(e)
        }
        (_, Err(e), _) => {
            return Err(e);
            return Err(wrap_err(e));
        }
        (_, Ok(()), _) => Ok(()),
    };
@@ -206,7 +231,7 @@ where
}

/// Read and parse HTTP/1 headers from `stream`.
async fn read_headers<S>(stream: &mut S) -> Result<HeaderStatus>
async fn read_headers<S>(stream: &mut S) -> RequestResult<HeaderStatus>
where
    S: AsyncBufRead + Unpin,
{
@@ -228,7 +253,7 @@ where

                if n == 0 {
                    // We hit an EOF; no more progress can be made.
                    return Err(Error::TruncatedHeaders);
                    return Err(RequestError::TruncatedHeaders);
                }

                // TODO(nickm): Pick a better maximum
@@ -266,7 +291,7 @@ where
            }
        }
        if n == 0 {
            return Err(Error::TruncatedHeaders);
            return Err(RequestError::TruncatedHeaders);
        }
    }
}
@@ -293,7 +318,7 @@ async fn read_and_decompress<S, SP>(
    mut stream: S,
    maxlen: usize,
    result: &mut Vec<u8>,
) -> Result<()>
) -> RequestResult<()>
where
    S: AsyncRead + Unpin,
    SP: SleepProvider,
@@ -315,7 +340,7 @@ where
            status = stream.read(buf).fuse() => status,
            _ = timer => {
                result.resize(written_total, 0); // truncate as needed
                return Err(Error::DirTimeout);
                return Err(RequestError::DirTimeout);
            }
        };
        let written_in_this_loop = match status {
@@ -349,7 +374,7 @@ where
        // filling our RAM.
        if written_total > maxlen {
            result.resize(maxlen, 0);
            return Err(Error::ResponseTooLong(written_total));
            return Err(RequestError::ResponseTooLong(written_total));
        }
    }
}
@@ -421,7 +446,7 @@ macro_rules! decoder {
fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>(
    stream: S,
    encoding: Option<&str>,
) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>> {
) -> RequestResult<Box<dyn AsyncRead + Unpin + Send + 'a>> {
    match encoding {
        None | Some("identity") => Ok(Box::new(stream)),
        Some("deflate") => decoder!(ZlibDecoder, stream),
@@ -429,7 +454,7 @@ fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>(
        Some("x-tor-lzma") => decoder!(XzDecoder, stream),
        #[cfg(feature = "zstd")]
        Some("x-zstd") => decoder!(ZstdDecoder, stream),
        Some(other) => Err(Error::ContentEncoding(other.into())),
        Some(other) => Err(RequestError::ContentEncoding(other.into())),
    }
}

@@ -442,7 +467,7 @@ mod test {
    use futures_await_test::async_test;

    #[async_test]
    async fn test_read_until_limited() -> Result<()> {
    async fn test_read_until_limited() -> RequestResult<()> {
        let mut out = Vec::new();
        let bytes = b"This line eventually ends\nthen comes another\n";

@@ -474,7 +499,7 @@ mod test {
        encoding: Option<&str>,
        data: &[u8],
        maxlen: usize,
    ) -> (Result<()>, Vec<u8>) {
    ) -> (RequestResult<()>, Vec<u8>) {
        // We don't need to do anything fancy here, since we aren't simulating
        // a timeout.
        let mock_time = MockSleepProvider::new(std::time::SystemTime::now());
@@ -491,7 +516,7 @@ mod test {
    }

    #[async_test]
    async fn decompress_identity() -> Result<()> {
    async fn decompress_identity() -> RequestResult<()> {
        let mut text = Vec::new();
        for _ in 0..1000 {
            text.extend(b"This is a string with a nontrivial length that we'll use to make sure that the loop is executed more than once.");
@@ -516,7 +541,7 @@ mod test {
    }

    #[async_test]
    async fn decomp_zlib() -> Result<()> {
    async fn decomp_zlib() -> RequestResult<()> {
        let compressed =
            hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap();

@@ -530,7 +555,7 @@ mod test {

    #[cfg(feature = "zstd")]
    #[async_test]
    async fn decomp_zstd() -> Result<()> {
    async fn decomp_zstd() -> RequestResult<()> {
        let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap();
        let limit = 10 << 20;
        let (s, r) = decomp_basic(Some("x-zstd"), &compressed, limit).await;
@@ -542,7 +567,7 @@ mod test {

    #[cfg(feature = "xz")]
    #[async_test]
    async fn decomp_xz2() -> Result<()> {
    async fn decomp_xz2() -> RequestResult<()> {
        // Not so good at tiny files...
        let compressed = hex::decode("fd377a585a000004e6d6b446020021011c00000010cf58cce00024001d5d00279b88a202ca8612cfb3c19c87c34248a570451e4851d3323d34ab8000000000000901af64854c91f600013925d6ec06651fb6f37d010000000004595a").unwrap();
        let limit = 10 << 20;
@@ -559,7 +584,7 @@ mod test {
        let limit = 10 << 20;
        let (s, _r) = decomp_basic(Some("x-proprietary-rle"), &compressed, limit).await;

        assert!(matches!(s, Err(Error::ContentEncoding(_))));
        assert!(matches!(s, Err(RequestError::ContentEncoding(_))));
    }

    #[async_test]
@@ -569,11 +594,11 @@ mod test {
        let (s, _r) = decomp_basic(Some("deflate"), compressed, limit).await;

        // This should possibly be a different type in the future.
        assert!(matches!(s, Err(Error::IoError(_))));
        assert!(matches!(s, Err(RequestError::IoError(_))));
    }

    #[async_test]
    async fn headers_ok() -> Result<()> {
    async fn headers_ok() -> RequestResult<()> {
        let text = b"HTTP/1.0 200 OK\r\nDate: ignored\r\nContent-Encoding: Waffles\r\n\r\n";

        let mut s = &text[..];
@@ -585,7 +610,7 @@ mod test {
        // now try truncated
        let mut s = &text[..15];
        let h = read_headers(&mut s).await;
        assert!(matches!(h, Err(Error::TruncatedHeaders)));
        assert!(matches!(h, Err(RequestError::TruncatedHeaders)));

        // now try with no encoding.
        let text = b"HTTP/1.0 404 Not found\r\n\r\n";
@@ -605,7 +630,7 @@ mod test {
        let h = read_headers(&mut s).await;

        assert!(h.is_err());
        assert!(matches!(h, Err(Error::HttparseError(_))));
        assert!(matches!(h, Err(RequestError::HttparseError(_))));
        Ok(())
    }

@@ -617,17 +642,24 @@ mod test {
    fn run_download_test<Req: request::Requestable>(
        req: Req,
        response: &[u8],
    ) -> (Result<DirResponse>, Result<Vec<u8>>) {
    ) -> (Result<DirResponse>, RequestResult<Vec<u8>>) {
        let (mut s1, s2) = stream_pair();
        let (mut s2_r, mut s2_w) = s2.split();

        tor_rtcompat::test_with_one_runtime!(|rt| async move {
            let rt2 = rt.clone();
            let (v1, v2, v3): (Result<DirResponse>, Result<Vec<u8>>, Result<()>) = futures::join!(
            let (v1, v2, v3): (
                Result<DirResponse>,
                RequestResult<Vec<u8>>,
                RequestResult<()>,
            ) = futures::join!(
                async {
                    // Run the download function.
                    let r = download(&rt, &req, &mut s1, None).await;
                    s1.close().await?;
                    s1.close().await.map_err(|error| Error::RequestFailed {
                        source: None,
                        error: error.into(),
                    })?;
                    r
                },
                async {
@@ -664,7 +696,7 @@ mod test {
    }

    #[test]
    fn test_download() -> Result<()> {
    fn test_download() -> RequestResult<()> {
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();

        let (response, request) = run_download_test(
@@ -677,8 +709,7 @@ mod test {
            b"GET /tor/micro/d/CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQk.z HTTP/1.0\r\n"
        ));

        assert!(!should_retire_circ(&response));
        let response = response?;
        let response = response.unwrap();
        assert_eq!(response.status_code(), 200);
        assert!(!response.is_partial());
        assert!(response.error().is_none());
@@ -692,7 +723,7 @@ mod test {
    }

    #[test]
    fn test_download_truncated() -> Result<()> {
    fn test_download_truncated() {
        // Request only one md, so "partial ok" will not be set.
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
        let mut response_text: Vec<u8> =
@@ -714,14 +745,12 @@ mod test {
        let (response, request) = run_download_test(req, &response_text);
        assert!(request.is_ok());

        let response = response?;
        let response = response.unwrap();
        assert_eq!(response.status_code(), 200);
        assert!(response.error().is_some());
        assert!(response.is_partial());
        assert!(response.output().len() < 37 * 2);
        assert!(response.output().starts_with(b"One fish"));

        Ok(())
    }

    #[test]
@@ -739,14 +768,26 @@ mod test {
        let response_text = b"HTTP/1.0 404 truncation happens here\r\n";
        let (response, _request) = run_download_test(req, response_text);

        assert!(matches!(response, Err(Error::TruncatedHeaders)));
        assert!(matches!(
            response,
            Err(Error::RequestFailed {
                error: RequestError::TruncatedHeaders,
                ..
            })
        ));

        // Try a completely empty response.
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
        let response_text = b"";
        let (response, _request) = run_download_test(req, response_text);

        assert!(matches!(response, Err(Error::TruncatedHeaders)));
        assert!(matches!(
            response,
            Err(Error::RequestFailed {
                error: RequestError::TruncatedHeaders,
                ..
            })
        ));
    }

    #[test]
@@ -756,8 +797,14 @@ mod test {
        response_text.resize(16384, b'A');
        let (response, _request) = run_download_test(req, &response_text);

        assert!(should_retire_circ(&response));
        assert!(matches!(response, Err(Error::HttparseError(_))));
        assert!(response.as_ref().unwrap_err().should_retire_circ());
        assert!(matches!(
            response,
            Err(Error::RequestFailed {
                error: RequestError::HttparseError(_),
                ..
            })
        ));
    }

    // TODO: test with bad utf-8
+2 −1
Original line number Diff line number Diff line
@@ -8,7 +8,8 @@ use tor_netdoc::doc::netstatus::ConsensusFlavor;
#[cfg(feature = "routerdesc")]
use tor_netdoc::doc::routerdesc::RdDigest;

use crate::Result;
/// Alias for a result with a `RequestError`.
type Result<T> = std::result::Result<T, crate::err::RequestError>;

use std::iter::FromIterator;
use std::time::SystemTime;
+4 −4
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@
use tor_linkspec::OwnedChanTarget;
use tor_proto::circuit::{ClientCirc, UniqId};

use crate::Error;
use crate::RequestError;

/// A successful (or at any rate, well-formed) response to a directory
/// request.
@@ -14,7 +14,7 @@ pub struct DirResponse {
    /// The decompressed output that we got from the directory cache.
    output: Vec<u8>,
    /// The error, if any, that caused us to stop getting this response early.
    error: Option<Error>,
    error: Option<RequestError>,
    /// Information about the directory cache we used.
    source: Option<SourceInfo>,
}
@@ -35,7 +35,7 @@ impl DirResponse {
    /// Construct a new DirResponse from its parts
    pub(crate) fn new(
        status: u16,
        error: Option<Error>,
        error: Option<RequestError>,
        output: Vec<u8>,
        source: Option<SourceInfo>,
    ) -> Self {
@@ -63,7 +63,7 @@ impl DirResponse {
    }

    /// Return the error from this response, if any.
    pub fn error(&self) -> Option<&Error> {
    pub fn error(&self) -> Option<&RequestError> {
        self.error.as_ref()
    }

+3 −0
Original line number Diff line number Diff line
@@ -56,6 +56,9 @@ tor-circmgr:

  api-break: Some error types have changed to include peer info.

tor-dirclient:
  api-break: refactored Error type.

tor-dirmgr:
  new-api: DirMgrConfig object now has accessors.
  DirMgrCfg: totally changed, builder abolished.