From ca6070b962d50ba114b6cfdcc0dc72b0e5b228e3 Mon Sep 17 00:00:00 2001
From: Nick Mathewson <nickm@torproject.org>
Date: Fri, 4 Mar 2022 17:43:17 -0500
Subject: [PATCH] arti-testing: Initial support for broken TCP.

This commit adds support for a BrokenTcp provider that can make
connection attempts fail or time out.  It doesn't yet have a way to
turn on the failure.
---
 Cargo.lock                           |  1 +
 crates/arti-testing/Cargo.toml       |  1 +
 crates/arti-testing/src/main.rs      | 37 +++++++++++++---
 crates/arti-testing/src/rt.rs        |  1 +
 crates/arti-testing/src/rt/badtcp.rs | 66 ++++++++++++++++++++++++++++
 5 files changed, 99 insertions(+), 7 deletions(-)
 create mode 100644 crates/arti-testing/src/rt/badtcp.rs

diff --git a/Cargo.lock b/Cargo.lock
index 2d6fc8ffac..6048d3f4e6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -200,6 +200,7 @@ dependencies = [
  "futures",
  "notify",
  "pin-project",
+ "rand 0.8.5",
  "rlimit",
  "serde",
  "tokio",
diff --git a/crates/arti-testing/Cargo.toml b/crates/arti-testing/Cargo.toml
index fde297ee97..3e9b8690a2 100644
--- a/crates/arti-testing/Cargo.toml
+++ b/crates/arti-testing/Cargo.toml
@@ -26,6 +26,7 @@ futures = "0.3.14"
 tracing = "0.1.18"
 notify = "4.0"
 pin-project = "1"
+rand = "0.8"
 rlimit = "0.7.0"
 serde = { version = "1.0.103", features = ["derive"] }
 tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
diff --git a/crates/arti-testing/src/main.rs b/crates/arti-testing/src/main.rs
index b5f341c25c..17098b596a 100644
--- a/crates/arti-testing/src/main.rs
+++ b/crates/arti-testing/src/main.rs
@@ -29,10 +29,28 @@
 //!
 //! # TODO
 //!
-//! - make TCP connections fail
-//! - do something on the connection
-//! - look at bootstrapping status and events
-//! - look at trace messages
+//! - More ways to break
+//!
+//!   - make TCP connections fail
+//!      o With various errors
+//!      o by timing out
+//!      - sporadically
+//!      - depending on address / port / family
+//!      - Install this after a delay
+//!   - make TLS fail
+//!      - With wrong cert
+//!      - Mysteriously
+//!      - With complete junk
+//!      - TLS succeeds, then sends nonsense
+//!      - Authenticating with wrong ID.
+//!   - Munge directory before using it
+//!      - May require some dirmgr plug-in. :p
+//!      - May require
+//!
+//! - More things to look at
+//!   - do something on the connection
+//!   - look at bootstrapping status and events
+//!   - Make streams repeatedly on different circuits with some delay.
 //! - Make sure we can replicate all/most test situations from arti#329
 //! - Actually implement those tests.
 
@@ -182,11 +200,16 @@ impl Job {
     /// XXXX Eventually this should come up with some kind of result that's meaningful.
     async fn run_job(&self) -> Result<()> {
         let runtime = PreferredRuntime::current()?;
-        let tcp = rt::count::Counting::new_zeroed(runtime.clone());
+        let broken_tcp =
+            rt::badtcp::BrokenTcpProvider::new(runtime.clone(), rt::badtcp::Action::Work);
+        // We put the counting TCP provider outside the one that breaks: we want
+        // to know how many attempts to connect there are, and BrokenTcpProvider
+        // eats the attempts that it fails without passing them down the stack.
+        let counting_tcp = rt::count::Counting::new_zeroed(broken_tcp.clone());
         let runtime = tor_rtcompat::CompoundRuntime::new(
             runtime.clone(),
             runtime.clone(),
-            tcp.clone(),
+            counting_tcp.clone(),
             runtime,
         );
         let client = self.make_client(runtime)?;
@@ -223,7 +246,7 @@ impl Job {
             }
         };
 
-        println!("TCP stats: {:?}", tcp.counts());
+        println!("TCP stats: {:?}", counting_tcp.counts());
 
         result
     }
diff --git a/crates/arti-testing/src/rt.rs b/crates/arti-testing/src/rt.rs
index b5b36c37be..3809829076 100644
--- a/crates/arti-testing/src/rt.rs
+++ b/crates/arti-testing/src/rt.rs
@@ -2,4 +2,5 @@
 //!
 //! Some simulate failure conditions; some monitor activity.
 
+pub(crate) mod badtcp;
 pub(crate) mod count;
diff --git a/crates/arti-testing/src/rt/badtcp.rs b/crates/arti-testing/src/rt/badtcp.rs
new file mode 100644
index 0000000000..105427db3f
--- /dev/null
+++ b/crates/arti-testing/src/rt/badtcp.rs
@@ -0,0 +1,66 @@
+//! Implement a tcpProvider that can break things.
+
+use tor_rtcompat::{Runtime, TcpProvider};
+
+use async_trait::async_trait;
+use rand::{thread_rng, Rng};
+use std::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
+use std::net::SocketAddr;
+use std::time::Duration;
+
+/// An action that we can take upon trying to make a TCP connection.
+#[derive(Debug, Clone)]
+pub(crate) enum Action {
+    /// Let the connection work as intended.
+    Work,
+    /// Wait for a random interval up to the given duration, then return an error.
+    Fail(Duration, IoErrorKind),
+    /// Time out indefinitely.
+    Timeout,
+}
+
+/// A TcpProvider that can make its connections fail.
+#[derive(Debug, Clone)]
+pub(crate) struct BrokenTcpProvider<R> {
+    /// An underlying TcpProvider to use when we actually want our connections to succeed
+    inner: R,
+    /// The action to take when we try to make an outbound connection.
+    ///
+    /// TODO: make this conditional, mutable, etc.
+    action: Action,
+}
+
+impl<R> BrokenTcpProvider<R> {
+    /// Construct a new BrokenTcpProvider which responds to all outbound
+    /// connections by taking the specified action.
+    pub(crate) fn new(inner: R, action: Action) -> Self {
+        Self { inner, action }
+    }
+
+    /// Return the action to take for a connection to `addr`.
+    fn get_action(&self, _addr: &SocketAddr) -> Action {
+        self.action.clone()
+    }
+}
+
+#[async_trait]
+impl<R: Runtime> TcpProvider for BrokenTcpProvider<R> {
+    type TcpStream = R::TcpStream;
+    type TcpListener = R::TcpListener;
+
+    async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::TcpStream> {
+        match self.get_action(addr) {
+            Action::Work => self.inner.connect(addr).await,
+            Action::Fail(dur, kind) => {
+                let d = thread_rng().gen_range(Duration::from_secs(0)..dur);
+                self.inner.sleep(d).await;
+                Err(IoError::new(kind, anyhow::anyhow!("intentional failure")))
+            }
+            Action::Timeout => futures::future::pending().await,
+        }
+    }
+
+    async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::TcpListener> {
+        self.inner.listen(addr).await
+    }
+}
-- 
GitLab