Skip to content
Snippets Groups Projects
Commit cc6ba72d authored by Nick Mathewson's avatar Nick Mathewson :game_die:
Browse files

Merge branch 'bench_multicirc' into 'main'

arti-bench: support multiple streams per circuit, multiple circuits per sample.

Closes #380

See merge request tpo/core/arti!384
parents 8231e702 49216a52
No related branches found
No related tags found
No related merge requests found
......@@ -36,10 +36,9 @@
#![allow(clippy::unwrap_used)]
use anyhow::{anyhow, Result};
use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_client::{IsolationToken, TorAddr, TorClient, TorClientConfig};
use arti_config::ArtiConfig;
use clap::{App, Arg};
use futures::stream::Stream;
use futures::StreamExt;
use rand::distributions::Standard;
use rand::Rng;
......@@ -283,14 +282,25 @@ fn main() -> Result<()> {
.help("How many samples to take per benchmark run.")
)
.arg(
Arg::with_name("num-parallel")
Arg::with_name("num-streams")
.short("p")
.long("num-parallel")
.long("streams")
.aliases(&["num-parallel"])
.takes_value(true)
.required(true)
.value_name("COUNT")
.default_value("3")
.help("How many simultaneous streams per benchmark run.")
.help("How many simultaneous streams per circuit.")
)
.arg(
Arg::with_name("num-circuits")
.short("C")
.long("num-circuits")
.takes_value(true)
.required(false)
.value_name("COUNT")
.default_value("1")
.help("How many simultaneous circuits per run.")
)
.arg(
Arg::with_name("output")
......@@ -347,7 +357,8 @@ fn main() -> Result<()> {
.unwrap()
.parse::<usize>()?;
let samples = matches.value_of("num-samples").unwrap().parse::<usize>()?;
let parallel = matches.value_of("num-parallel").unwrap().parse::<usize>()?;
let streams_per_circ = matches.value_of("num-streams").unwrap().parse::<usize>()?;
let circs_per_sample = matches.value_of("num-circuits").unwrap().parse::<usize>()?;
info!("Generating test payloads, please wait...");
let upload_payload = random_payload(upload_bytes).into();
let download_payload = random_payload(download_bytes).into();
......@@ -366,7 +377,8 @@ fn main() -> Result<()> {
let mut benchmark = Benchmark {
connect_addr,
samples,
concurrent: parallel,
streams_per_circ,
circs_per_sample,
upload_payload,
download_payload,
runtime: tor_rtcompat::tokio::TokioNativeTlsRuntime::create()?,
......@@ -416,7 +428,8 @@ where
runtime: R,
connect_addr: SocketAddr,
samples: usize,
concurrent: usize,
streams_per_circ: usize,
circs_per_sample: usize,
upload_payload: Arc<[u8]>,
download_payload: Arc<[u8]>,
/// All benchmark results conducted, indexed by benchmark type.
......@@ -502,8 +515,10 @@ struct BenchmarkResults {
ty: BenchmarkType,
/// The number of times the benchmark was run.
samples: usize,
/// The number of concurrent connections used during the run.
connections: usize,
/// The number of concurrent streams per circuit used during the run.
streams_per_circ: usize,
/// The number of circuits used during the run.
circuits: usize,
/// The time to first byte (TTFB) for the download benchmark, in milliseconds.
download_ttfb_msec: Statistic,
/// The average download speed, in megabits per second.
......@@ -519,7 +534,12 @@ struct BenchmarkResults {
impl BenchmarkResults {
/// Generate summarized benchmark results from raw run data.
fn generate(ty: BenchmarkType, connections: usize, raw: Vec<TimingSummary>) -> Self {
fn generate(
ty: BenchmarkType,
streams_per_circ: usize,
circuits: usize,
raw: Vec<TimingSummary>,
) -> Self {
let download_ttfb_msecs = raw
.iter()
.map(|s| s.download_ttfb_sec * 1000.0)
......@@ -540,7 +560,8 @@ impl BenchmarkResults {
BenchmarkResults {
ty,
samples,
connections,
streams_per_circ,
circuits,
download_ttfb_msec: Statistic::from_samples(download_ttfb_msecs),
download_rate_megabit: Statistic::from_samples(download_rate_megabits),
upload_ttfb_msec: Statistic::from_samples(upload_ttfb_msecs),
......@@ -562,49 +583,36 @@ struct BenchmarkSummary {
}
impl<R: Runtime> Benchmark<R> {
/// Run a type of benchmark (`ty`), performing `self.samples` benchmark runs, and using
/// `self.concurrent` concurrent connections.
/// Run a type of benchmark (`ty`), performing `self.samples` benchmark
/// runs, using `self.circs_per_sample` concurrent circuits, and
/// `self.streams_per_circ` concurrent streams on each circuit.
///
/// Uses `stream_generator`, a stream that generates futures that themselves generate streams,
/// in order to obtain the required number of streams to run the test over.
fn run<F, G, S, E>(&mut self, ty: BenchmarkType, stream_generator: F) -> Result<()>
/// Uses `stream_generator`, function that returns futures that themselves
/// generate streams, in order to obtain the required number of streams to
/// run the test over. The function takes an index of the current run.
fn run<F, G, S, E>(&mut self, ty: BenchmarkType, mut stream_generator: F) -> Result<()>
where
F: Stream<Item = G> + Unpin,
F: FnMut(usize) -> G,
G: Future<Output = Result<S, E>>,
S: AsyncRead + AsyncWrite + Unpin,
E: std::error::Error + Send + Sync + 'static,
{
let mut results = vec![];
// NOTE(eta): This could make more streams than we need. We assume this is okay.
let mut stream_generator = stream_generator
.buffered(self.concurrent)
.take(self.samples * self.concurrent);
for n in 0..self.samples {
let mut streams = vec![];
for _ in 0..self.concurrent {
let stream =
self.runtime
.block_on(stream_generator.next())
.ok_or_else(|| {
anyhow!(
"internal error: stream generator couldn't supply enough streams"
)
})??; // one ? for the error above, next ? for G's output
streams.push(stream);
}
let futures = streams
.into_iter()
.map(|stream| {
let total_streams = self.streams_per_circ * self.circs_per_sample;
let futures = (0..total_streams)
.map(|_| {
let up = Arc::clone(&self.upload_payload);
let dp = Arc::clone(&self.download_payload);
Box::pin(async move { client(stream, up, dp).await })
let stream = stream_generator(n);
Box::pin(async move { client(stream.await?, up, dp).await })
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Vec<_>>();
info!(
"Benchmarking {:?} with {} connections, run {}/{}...",
ty,
self.concurrent,
self.streams_per_circ,
n + 1,
self.samples
);
......@@ -616,7 +624,8 @@ impl<R: Runtime> Benchmark<R> {
.collect::<Result<Vec<_>>>()?;
results.extend(stats);
}
let results = BenchmarkResults::generate(ty, self.concurrent, results);
let results =
BenchmarkResults::generate(ty, self.streams_per_circ, self.circs_per_sample, results);
self.results.insert(ty, results);
Ok(())
}
......@@ -624,19 +633,23 @@ impl<R: Runtime> Benchmark<R> {
/// Benchmark without Arti on loopback.
fn without_arti(&mut self) -> Result<()> {
let ca = self.connect_addr;
self.run(
BenchmarkType::RawLoopback,
futures::stream::repeat_with(|| tokio::net::TcpStream::connect(ca)),
)
self.run(BenchmarkType::RawLoopback, |_| {
tokio::net::TcpStream::connect(ca)
})
}
/// Benchmark through a SOCKS5 proxy at address `addr`.
fn with_proxy(&mut self, addr: &str) -> Result<()> {
let ca = self.connect_addr;
self.run(
BenchmarkType::Socks,
futures::stream::repeat_with(|| Socks5Stream::connect(addr, ca)),
)
let mut iso = StreamIsolationTracker::new(self.streams_per_circ);
self.run(BenchmarkType::Socks, |run| {
// Tor uses the username,password tuple of socks authentication do decide how to isolate streams.
let iso_string = format!("{:?}", iso.next_in(run));
async move {
Socks5Stream::connect_with_password(addr, ca, &iso_string, &iso_string).await
}
})
}
/// Benchmark through Arti, using the provided `TorClientConfig`.
......@@ -650,9 +663,73 @@ impl<R: Runtime> Benchmark<R> {
let addr = TorAddr::dangerously_from(self.connect_addr)?;
self.run(
BenchmarkType::Arti,
futures::stream::repeat_with(|| tor_client.connect(addr.clone())),
)
let mut iso = StreamIsolationTracker::new(self.streams_per_circ);
self.run(BenchmarkType::Arti, |run| {
let mut prefs = arti_client::StreamPrefs::new();
prefs.set_isolation_group(iso.next_in(run));
tor_client.connect(addr.clone())
})
}
}
/// Helper type: track a StreamIsolation token over a set of runs.
///
/// We want to return a new token every `streams_per_circ` calls for each run,
/// but always give a new token when a new run begins.
#[derive(Debug, Clone)]
struct StreamIsolationTracker {
/// The number of streams to assign to each circuit.
streams_per_circ: usize,
/// The current run index.
cur_run: usize,
/// The stream index within the run that we expect on the _next_ call to `next_in`.
next_stream: usize,
/// The isolation token we're currently handing out.
cur_token: IsolationToken,
}
impl StreamIsolationTracker {
/// Construct a new StreamIsolationTracker.
fn new(streams_per_circ: usize) -> Self {
Self {
streams_per_circ,
cur_run: 0,
next_stream: 0,
cur_token: IsolationToken::new(),
}
}
/// Return the isolation token to use for the next stream in the given
/// `run`. Requires that runs are not interleaved.
fn next_in(&mut self, run: usize) -> IsolationToken {
if run != self.cur_run {
self.cur_run = run;
self.next_stream = 0;
self.cur_token = IsolationToken::new();
} else if self.next_stream % self.streams_per_circ == 0 {
self.cur_token = IsolationToken::new();
}
self.next_stream += 1;
self.cur_token
}
}
#[cfg(test)]
mod test {
use super::StreamIsolationTracker;
#[test]
fn test_iso_tracker() {
let mut tr = StreamIsolationTracker::new(2);
let r1: Vec<_> = (0..9).map(|_| tr.next_in(0)).collect();
let r2: Vec<_> = (0..6).map(|_| tr.next_in(1)).collect();
assert_eq!(r1[0], r1[1]);
assert_ne!(r1[1], r1[2]);
assert_eq!(r1[2], r1[3]);
assert_eq!(r2[0], r2[1]);
assert_ne!(r2[1], r2[2]);
assert!(!r1.contains(&r2[0]));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment