Commit 62c9a80d authored by Nick Mathewson's avatar Nick Mathewson 🥔
Browse files

rproxy: Implement the actual "proxy" part of the code.

Now we finally have the code to copy the data back and forth on the
two AsyncRead+AsyncWrite objects that we create.
parent d208c062
Loading
Loading
Loading
Loading
+78 −10
Original line number Diff line number Diff line
@@ -3,13 +3,14 @@
use std::sync::{Arc, Mutex};

use futures::{
    channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncWrite, Future, FutureExt as _, Stream,
    StreamExt as _,
    channel::oneshot, task::SpawnExt as _, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
    Future, FutureExt as _, Stream, StreamExt as _,
};
use std::io::Result as IoResult;
use tor_cell::relaycell::msg as relaymsg;
use tor_error::debug_report;
use tor_hsservice::{OnionServiceDataStream, StreamRequest};
use tor_proto::stream::IncomingStreamRequest;
use tor_hsservice::StreamRequest;
use tor_proto::stream::{DataStream, IncomingStreamRequest};
use tor_rtcompat::Runtime;
use tracing::debug;

@@ -148,16 +149,16 @@ pub(super) async fn run_action<R: Runtime>(
/// and trandmit data between the two stream indefinitely.  On failure, close
/// `request`.
async fn forward_connection<R, FUT, TS, E>(
    _runtime: R,
    runtime: R,
    request: StreamRequest,
    target_stream_future: FUT,
) where
    R: Runtime,
    FUT: Future<Output = Result<TS, E>>,
    TS: AsyncRead + AsyncWrite,
    TS: AsyncRead + AsyncWrite + Send + 'static,
    E: std::fmt::Display,
{
    let _local_stream = match target_stream_future.await {
    let local_stream = match target_stream_future.await {
        Ok(s) => s,
        Err(e) => {
            // TODO HSS: We should log more, since this is likely a missing
@@ -172,7 +173,7 @@ async fn forward_connection<R, FUT, TS, E>(
        }
    };

    let _onion_service_stream: OnionServiceDataStream = {
    let onion_service_stream: DataStream = {
        // TODO HSS: Does this match the behavior from C tor?
        let connected = relaymsg::Connected::new_empty();
        match request.accept(connected).await {
@@ -184,6 +185,73 @@ async fn forward_connection<R, FUT, TS, E>(
        }
    };

    // TODO HSS: Forward data indefinitely.
    // TODO HSS: Why is OnionServiceDataStream not the same type as DataStream?
    let (svc_r, svc_w) = onion_service_stream.split();
    let (local_r, local_w) = local_stream.split();

    // TODO HSS: Actually detect errors.
    let _ignore_outcome = runtime.spawn(copy_interactive(local_r, svc_w).map(|_| ()));
    let _ignore_outcome = runtime.spawn(copy_interactive(svc_r, local_w).map(|_| ()));
}

/// Copy all the data from `reader` into `writer` until we encounter an EOF or
/// an error.
///
/// Unlike as futures::io::copy(), this function is meant for use with
/// interactive readers and writers, where the reader might pause for
/// a while, but where we want to send data on the writer as soon as
/// it is available.
///
/// This function assumes that the writer might need to be flushed for
/// any buffered data to be sent.  It tries to minimize the number of
/// flushes, however, by only flushing the writer when the reader has no data.
///
/// NOTE: This is duplicate code from `arti::socks`.  But instead of
/// deduplicating it, we should change the behavior in `DataStream` that makes
/// it necessary. See arti#786 for a fuller discussion.
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
where
    R: AsyncRead + Unpin,
    W: AsyncWrite + Unpin,
{
    use futures::{poll, task::Poll};

    let mut buf = [0_u8; 1024];

    // At this point we could just loop, calling read().await,
    // write_all().await, and flush().await.  But we want to be more
    // clever than that: we only want to flush when the reader is
    // stalled.  That way we can pack our data into as few cells as
    // possible, but flush it immediately whenever there's no more
    // data coming.
    let loop_result: IoResult<()> = loop {
        let mut read_future = reader.read(&mut buf[..]);
        match poll!(&mut read_future) {
            Poll::Ready(Err(e)) => break Err(e),
            Poll::Ready(Ok(0)) => break Ok(()), // EOF
            Poll::Ready(Ok(n)) => {
                writer.write_all(&buf[..n]).await?;
                continue;
            }
            Poll::Pending => writer.flush().await?,
        }

        // The read future is pending, so we should wait on it.
        match read_future.await {
            Err(e) => break Err(e),
            Ok(0) => break Ok(()),
            Ok(n) => writer.write_all(&buf[..n]).await?,
        }
    };

    // Make sure that we flush any lingering data if we can.
    //
    // If there is a difference between closing and dropping, then we
    // only want to do a "proper" close if the reader closed cleanly.
    let flush_result = if loop_result.is_ok() {
        writer.close().await
    } else {
        writer.flush().await
    };

    loop_result.or(flush_result)
}