DataReader read_to_end() does not return when the associated DataWriter is shutdown()
So I'm attempting to replicate the proxy logic to forward arti DataStreams
to/from tokio::TcpStreams
. I've run into an issue (which to be clear, I'm not sure my code there is completely right so I'll probably be back later) whereby if you split a DataStream
into a reader and writer, shutting down the writer doesn't seem to make any pending reads fail.
As a result, one of my tests blocks forever as it:
- creates an onion service
- client connects to the onion service
- client sends a message to the onion service, and then
shutdown()
's the associatedDataStream
writer - the server now awaits
AsyncReaderEx::read_to_end()
forever on the associatedDataStream
reader since I guess the circuit hasn't actually been killed?
Min-Repro
// standard
use std::path::PathBuf;
// extern
use arti_client::{BootstrapBehavior, TorClient};
use arti_client::config::{CfgPath, TorClientConfigBuilder};
// use fs_mistrust::Mistrust;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
fn main() -> anyhow::Result<()> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// root dir for test arti instance
let mut root_data_directory = std::env::temp_dir();
root_data_directory.push("test_arti_bootstrap");
// set custom config options
let mut config_builder: TorClientConfigBuilder = Default::default();
// manually set arti cache and data directories so we can have
// multiple concurrent instances and control where it writes
let mut cache_dir = PathBuf::from(root_data_directory.clone());
cache_dir.push("cache");
config_builder.storage().cache_dir(CfgPath::new_literal(cache_dir));
let mut state_dir = PathBuf::from(root_data_directory.clone());
state_dir.push("state");
config_builder.storage().state_dir(CfgPath::new_literal(state_dir.clone()));
config_builder.address_filter()
.allow_local_addrs(false)
.allow_onion_addrs(true);
let config = config_builder.build()?;
let arti_client = TorClient::builder()
.config(config)
.bootstrap_behavior(BootstrapBehavior::Manual)
.create_bootstrapped().await?;
println!("connnecting");
// seems to leave socket open indefinitely waiting for data
let data_stream = arti_client.connect(("echo.free.beeceptor.com", 80)).await?;
let (mut reader, mut writer) = data_stream.split();
let mut buffer = Vec::new();
let read_to_end_future = reader.read_to_end(&mut buffer);
println!("shutdown writer");
writer.shutdown().await?;
println!("read to end (will block as long as the remote server keeps the socket open)");
read_to_end_future.await?;
println!("Hello world");
Ok(())
})
}
Or maybe I'm using tokio wrong?