Skip to content
Snippets Groups Projects
Commit 5d91fa7f authored by Nick Mathewson's avatar Nick Mathewson :game_die:
Browse files

arti-testing: CLI for making TCP connections break.

parent ca6070b9
No related branches found
No related tags found
1 merge request!392arti-testing: support for simulated TCP breakage
//! Reading configuration and command line issues in arti-testing.
use crate::{Action, Job};
use crate::{Action, Job, TcpBreakage};
use anyhow::{anyhow, Result};
use clap::{App, AppSettings, Arg, SubCommand};
......@@ -62,6 +62,27 @@ pub(crate) fn parse_cmdline() -> Result<Job> {
.value_name("success|failure|timeout")
.global(true),
)
.arg(
Arg::with_name("tcp-failure")
.long("tcp-failure")
.takes_value(true)
.value_name("none|timeout|error")
.global(true),
)
.arg(
Arg::with_name("tcp-failure-stage")
.long("tcp-failure-stage")
.takes_value(true)
.value_name("bootstrap|connect")
.global(true),
)
.arg(
Arg::with_name("tcp-failure-delay")
.long("tcp-failure-delay")
.takes_value(true)
.value_name("SECS")
.global(true),
)
.subcommand(
SubCommand::with_name("connect")
.about("Try to bootstrap and connect to an address")
......@@ -117,6 +138,24 @@ pub(crate) fn parse_cmdline() -> Result<Job> {
.map(crate::Expectation::from_str)
.transpose()?;
let tcp_breakage = {
let action = matches.value_of("tcp-failure").unwrap_or("none").parse()?;
let stage = matches
.value_of("tcp-failure-stage")
.unwrap_or("bootstrap")
.parse()?;
let delay = matches
.value_of("tcp-failure-delay")
.map(|d| d.parse().map(Duration::from_secs))
.transpose()?;
TcpBreakage {
action,
stage,
delay,
}
};
let action = if let Some(_m) = matches.subcommand_matches("bootstrap") {
Action::Bootstrap
} else if let Some(matches) = matches.subcommand_matches("connect") {
......@@ -138,6 +177,7 @@ pub(crate) fn parse_cmdline() -> Result<Job> {
action,
config,
timeout,
tcp_breakage,
console_log,
expectation,
})
......
......@@ -35,8 +35,9 @@
//! o With various errors
//! o by timing out
//! - sporadically
//! - by succeeding and black-holing data.
//! - depending on address / port / family
//! - Install this after a delay
//! o Install this after a delay
//! - make TLS fail
//! - With wrong cert
//! - Mysteriously
......@@ -91,6 +92,8 @@ mod traces;
use arti_client::TorClient;
use arti_config::ArtiConfig;
use futures::task::SpawnExt;
use rt::badtcp::BrokenTcpProvider;
use tor_rtcompat::{PreferredRuntime, Runtime, SleepProviderExt};
use anyhow::{anyhow, Result};
......@@ -142,12 +145,69 @@ impl FromStr for Expectation {
}
}
/// At what stage to install a kind of breakage
#[derive(Debug, Clone, PartialEq, Eq)]
enum BreakageStage {
/// Create breakage while bootstrapping
Bootstrap,
/// Create breakage while connecting
Connect,
}
impl FromStr for BreakageStage {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"bootstrap" => BreakageStage::Bootstrap,
"connect" => BreakageStage::Connect,
_ => return Err(anyhow!("unrecognized breakage stage {:?}", s)),
})
}
}
/// Describes how (if at all) to break TCP connection attempts
#[derive(Debug, Clone)]
struct TcpBreakage {
/// What kind of breakage to install (if any)
action: rt::badtcp::Action,
/// What stage to apply the breakage at.
stage: BreakageStage,
/// Delay (if any) after the start of the stage to apply breakage
delay: Option<Duration>,
}
impl TcpBreakage {
/// Apply the configured breakage to breakage_provider. Use `main_runtime` to sleep if necessary.
fn apply<R: Runtime, R2: Send + Sync + 'static>(
&self,
main_runtime: &R,
breakage_provider: BrokenTcpProvider<R2>,
) {
if let Some(delay) = self.delay {
let rt_clone = main_runtime.clone();
let action = self.action.clone();
main_runtime
.spawn(async move {
rt_clone.sleep(delay).await;
breakage_provider.set_action(action);
})
.expect("can't spawn.");
} else {
breakage_provider.set_action(self.action.clone());
}
}
}
/// Descriptions of an action to take, and what to expect as an outcome.
#[derive(Debug, Clone)]
struct Job {
/// The action that the client should try to take
action: Action,
/// Describes how (if at all) to break the TCP connections.
tcp_breakage: TcpBreakage,
/// The tracing configuration for our console log.
console_log: String,
......@@ -172,7 +232,16 @@ impl Job {
}
/// Run the body of a job.
async fn run_job_inner<R: Runtime>(&self, client: TorClient<R>) -> Result<()> {
async fn run_job_inner<R: Runtime, R2: Send + Sync + Clone + 'static>(
&self,
broken_tcp: rt::badtcp::BrokenTcpProvider<R2>,
client: TorClient<R>,
) -> Result<()> {
if self.tcp_breakage.stage == BreakageStage::Bootstrap {
self.tcp_breakage
.apply(client.runtime(), broken_tcp.clone());
}
client.bootstrap().await?; // all jobs currently start with a bootstrap.
match &self.action {
......@@ -181,6 +250,11 @@ impl Job {
target,
retry_delay,
} => {
if self.tcp_breakage.stage == BreakageStage::Connect {
self.tcp_breakage
.apply(client.runtime(), broken_tcp.clone());
}
loop {
let outcome = client.connect(target).await;
match (outcome, retry_delay) {
......@@ -217,7 +291,7 @@ impl Job {
let outcome = client
.clone()
.runtime()
.timeout(self.timeout, self.run_job_inner(client))
.timeout(self.timeout, self.run_job_inner(broken_tcp.clone(), client))
.await;
let result = match (&self.expectation, outcome) {
......
......@@ -2,10 +2,13 @@
use tor_rtcompat::{Runtime, TcpProvider};
use anyhow::anyhow;
use async_trait::async_trait;
use rand::{thread_rng, Rng};
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
/// An action that we can take upon trying to make a TCP connection.
......@@ -19,27 +22,47 @@ pub(crate) enum Action {
Timeout,
}
impl FromStr for Action {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"none" | "work" => Action::Work,
"error" => Action::Fail(Duration::from_millis(10), IoErrorKind::Other),
"timeout" => Action::Timeout,
_ => return Err(anyhow!("unrecognized tcp breakage action {:?}", s)),
})
}
}
/// A TcpProvider that can make its connections fail.
#[derive(Debug, Clone)]
pub(crate) struct BrokenTcpProvider<R> {
/// An underlying TcpProvider to use when we actually want our connections to succeed
inner: R,
/// The action to take when we try to make an outbound connection.
///
/// TODO: make this conditional, mutable, etc.
action: Action,
action: Arc<Mutex<Action>>,
}
impl<R> BrokenTcpProvider<R> {
/// Construct a new BrokenTcpProvider which responds to all outbound
/// connections by taking the specified action.
pub(crate) fn new(inner: R, action: Action) -> Self {
Self { inner, action }
Self {
inner,
action: Arc::new(Mutex::new(action)),
}
}
/// Cause the provider to respond to all outbound connection attempts
/// with the specified action.
pub(crate) fn set_action(&self, action: Action) {
*self.action.lock().expect("Lock poisoned") = action;
}
/// Return the action to take for a connection to `addr`.
fn get_action(&self, _addr: &SocketAddr) -> Action {
self.action.clone()
self.action.lock().expect("Lock poisoned").clone()
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment