Skip to content
Snippets Groups Projects
Commit c978a0d7 authored by Nick Mathewson's avatar Nick Mathewson :game_die:
Browse files

arti-bench: refactor stream construction to connect in parallel

Previously we tried to do each connection in a run, and only then did we
start transferring data over them.  Now we collect a bunch of the
futures that return an open stream, and run them all in parallel
with using them.  This change includes connect-time in our
benchmarks, and allows us to test contention in our connect code.

Instead of using a Stream, I've changed the connection-generation
code to call a future-returning function directly, so we have a way
to explicitly pass which run we're in.
parent 049d304e
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,6 @@ use anyhow::{anyhow, Result};
use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_config::ArtiConfig;
use clap::{App, Arg};
use futures::stream::Stream;
use futures::StreamExt;
use rand::distributions::Standard;
use rand::Rng;
......@@ -565,39 +564,29 @@ impl<R: Runtime> Benchmark<R> {
/// Run a type of benchmark (`ty`), performing `self.samples` benchmark runs, and using
/// `self.concurrent` concurrent connections.
///
/// Uses `stream_generator`, a stream that generates futures that themselves generate streams,
/// in order to obtain the required number of streams to run the test over.
fn run<F, G, S, E>(&mut self, ty: BenchmarkType, stream_generator: F) -> Result<()>
/// Uses `stream_generator`, function that returns futures that themselves generate streams,
/// in order to obtain the required number of streams to run the test over. The function takes
/// an index of the current run, and a second index of the current stream.
fn run<F, G, S, E>(&mut self, ty: BenchmarkType, mut stream_generator: F) -> Result<()>
where
F: Stream<Item = G> + Unpin,
F: FnMut(usize) -> G,
G: Future<Output = Result<S, E>>,
S: AsyncRead + AsyncWrite + Unpin,
E: std::error::Error + Send + Sync + 'static,
{
let mut results = vec![];
// NOTE(eta): This could make more streams than we need. We assume this is okay.
let mut stream_generator = stream_generator
.buffered(self.concurrent)
.take(self.samples * self.concurrent);
for n in 0..self.samples {
let mut streams = vec![];
let mut stream_futures = vec![];
for _ in 0..self.concurrent {
let stream =
self.runtime
.block_on(stream_generator.next())
.ok_or_else(|| {
anyhow!(
"internal error: stream generator couldn't supply enough streams"
)
})??; // one ? for the error above, next ? for G's output
streams.push(stream);
let stream_fut = stream_generator(n);
stream_futures.push(stream_fut);
}
let futures = streams
let futures = stream_futures
.into_iter()
.map(|stream| {
let up = Arc::clone(&self.upload_payload);
let dp = Arc::clone(&self.download_payload);
Box::pin(async move { client(stream, up, dp).await })
Box::pin(async move { client(stream.await?, up, dp).await })
})
.collect::<futures::stream::FuturesUnordered<_>>()
.collect::<Vec<_>>();
......@@ -624,19 +613,15 @@ impl<R: Runtime> Benchmark<R> {
/// Benchmark without Arti on loopback.
fn without_arti(&mut self) -> Result<()> {
let ca = self.connect_addr;
self.run(
BenchmarkType::RawLoopback,
futures::stream::repeat_with(|| tokio::net::TcpStream::connect(ca)),
)
self.run(BenchmarkType::RawLoopback, |_| {
tokio::net::TcpStream::connect(ca)
})
}
/// Benchmark through a SOCKS5 proxy at address `addr`.
fn with_proxy(&mut self, addr: &str) -> Result<()> {
let ca = self.connect_addr;
self.run(
BenchmarkType::Socks,
futures::stream::repeat_with(|| Socks5Stream::connect(addr, ca)),
)
self.run(BenchmarkType::Socks, |_| Socks5Stream::connect(addr, ca))
}
/// Benchmark through Arti, using the provided `TorClientConfig`.
......@@ -650,9 +635,6 @@ impl<R: Runtime> Benchmark<R> {
let addr = TorAddr::dangerously_from(self.connect_addr)?;
self.run(
BenchmarkType::Arti,
futures::stream::repeat_with(|| tor_client.connect(addr.clone())),
)
self.run(BenchmarkType::Arti, |_| tor_client.connect(addr.clone()))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment