DataReader read_to_end() does not return when the associated DataWriter is shutdown()

So I'm attempting to replicate the proxy logic to forward arti DataStreams to/from tokio::TcpStreams. I've run into an issue (which to be clear, I'm not sure my code there is completely right so I'll probably be back later) whereby if you split a DataStream into a reader and writer, shutting down the writer doesn't seem to make any pending reads fail.

As a result, one of my tests blocks forever as it:

  1. creates an onion service
  2. client connects to the onion service
  3. client sends a message to the onion service, and then shutdown()'s the associated DataStream writer
  4. the server now awaits AsyncReaderEx::read_to_end() forever on the associated DataStream reader since I guess the circuit hasn't actually been killed?

Min-Repro

// standard
use std::path::PathBuf;

// extern

use arti_client::{BootstrapBehavior, TorClient};
use arti_client::config::{CfgPath, TorClientConfigBuilder};
// use fs_mistrust::Mistrust;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

fn main() -> anyhow::Result<()> {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {

            // root dir for test arti instance
            let mut root_data_directory = std::env::temp_dir();
            root_data_directory.push("test_arti_bootstrap");

            // set custom config options
            let mut config_builder: TorClientConfigBuilder = Default::default();

            // manually set arti cache and data directories so we can have
            // multiple concurrent instances and control where it writes
            let mut cache_dir = PathBuf::from(root_data_directory.clone());
            cache_dir.push("cache");
            config_builder.storage().cache_dir(CfgPath::new_literal(cache_dir));

            let mut state_dir = PathBuf::from(root_data_directory.clone());
            state_dir.push("state");
            config_builder.storage().state_dir(CfgPath::new_literal(state_dir.clone()));

            config_builder.address_filter()
                .allow_local_addrs(false)
                .allow_onion_addrs(true);

            let config = config_builder.build()?;

            let arti_client = TorClient::builder()
                .config(config)
                .bootstrap_behavior(BootstrapBehavior::Manual)
                .create_bootstrapped().await?;

            println!("connnecting");
            // seems to leave socket open indefinitely waiting for data
            let data_stream = arti_client.connect(("echo.free.beeceptor.com", 80)).await?;
            let (mut reader, mut writer) = data_stream.split();

            let mut buffer = Vec::new();
            let read_to_end_future = reader.read_to_end(&mut buffer);

            println!("shutdown writer");
            writer.shutdown().await?;

            println!("read to end (will block as long as the remote server keeps the socket open)");
            read_to_end_future.await?;

            println!("Hello world");

            Ok(())
        })
}

Or maybe I'm using tokio wrong?