Loading Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3447,6 +3447,7 @@ dependencies = [ "cipher", "coarsetime", "digest 0.10.3", "educe", "futures", "generic-array", "hex", Loading crates/tor-proto/Cargo.toml +1 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ bytes = "1" cipher = "0.3.0" coarsetime = "0.1.20" digest = "0.10.0" educe = "0.4.6" futures = "0.3.14" asynchronous-codec = "0.6.0" generic-array = "0.14.3" Loading crates/tor-proto/src/stream/data.rs +23 −32 Original line number Diff line number Diff line Loading @@ -15,12 +15,15 @@ use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite #[cfg(feature = "tokio")] use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use std::fmt::{self, Debug}; use std::fmt::Debug; use std::io::Result as IoResult; use std::pin::Pin; use educe::Educe; use crate::circuit::StreamTarget; use crate::stream::StreamReader; use tor_bytes::skip_fmt; use tor_cell::relaycell::msg::{Data, RelayMsg}; use tor_error::internal; Loading Loading @@ -258,6 +261,8 @@ impl TokioAsyncWrite for DataStream { /// We have to use an enum here because, for as long as we're waiting /// for a flush operation to complete, the future returned by /// `flush_cell()` owns the DataWriterImpl. #[derive(Educe)] #[educe(Debug)] enum DataWriterState { /// The writer has closed or gotten an error: nothing more to do. Closed, Loading @@ -265,20 +270,15 @@ enum DataWriterState { /// immediately. Ready(DataWriterImpl), /// The writer is flushing a cell. Flushing(Pin<Box<dyn Future<Output = (DataWriterImpl, Result<()>)> + Send>>), } impl Debug for DataWriterState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { DataWriterState::Closed => write!(f, "DataWriterState::Closed"), DataWriterState::Ready(i) => write!(f, "DataWriterState::Ready({:?})", i), DataWriterState::Flushing(_) => write!(f, "DataWriterState::Flushing(..)"), } } Flushing( #[educe(Debug(method = "skip_fmt"))] Pin<Box<dyn Future<Output = (DataWriterImpl, Result<()>)> + Send>>, ), } /// Internal: the write part of a DataStream #[derive(Educe)] #[educe(Debug)] struct DataWriterImpl { /// The underlying StreamTarget object. s: StreamTarget, Loading @@ -288,21 +288,13 @@ struct DataWriterImpl { // enough for now. If we _do_ make it bigger, we'll have to change // our use of Data::split_from to handle the case where we can't fit // all the data. #[educe(Debug(method = "skip_fmt"))] buf: Box<[u8; Data::MAXLEN]>, /// Number of unflushed bytes in buf. n_pending: usize, } impl Debug for DataWriterImpl { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("DataWriterImpl") .field("s", &self.s) .field("n_pending", &self.n_pending) .finish_non_exhaustive() } } impl DataWriter { /// Helper for poll_flush() and poll_close(): Performs a flush, then /// closes the stream if should_close is true. Loading Loading @@ -465,6 +457,8 @@ impl DataWriterImpl { /// DataCellImpl. If we wanted to store the future and the cell at the /// same time, we'd need to make a self-referential structure, which isn't /// possible in safe Rust AIUI. #[derive(Educe)] #[educe(Debug)] enum DataReaderState { /// In this state we have received an end cell or an error. Closed, Loading @@ -473,28 +467,25 @@ enum DataReaderState { Ready(DataReaderImpl), /// The reader is currently fetching a cell: this future is the /// progress it is making. ReadingCell(Pin<Box<dyn Future<Output = (DataReaderImpl, Result<()>)> + Send>>), } impl std::fmt::Debug for DataReaderState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Closed => write!(f, "Closed"), Self::Ready(_) => write!(f, "Ready"), Self::ReadingCell(_) => write!(f, "ReadingCell"), } } ReadingCell( #[educe(Debug(method = "skip_fmt"))] Pin<Box<dyn Future<Output = (DataReaderImpl, Result<()>)> + Send>>, ), } /// Wrapper for the read part of a DataStream #[derive(Educe)] #[educe(Debug)] struct DataReaderImpl { /// The underlying StreamReader object. #[educe(Debug(method = "skip_fmt"))] s: StreamReader, /// If present, data that we received on this stream but have not /// been able to send to the caller yet. // TODO: This data structure is probably not what we want, but // it's good enough for now. #[educe(Debug(method = "skip_fmt"))] pending: Vec<u8>, /// Index into pending to show what we've already read. Loading Loading
Cargo.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3447,6 +3447,7 @@ dependencies = [ "cipher", "coarsetime", "digest 0.10.3", "educe", "futures", "generic-array", "hex", Loading
crates/tor-proto/Cargo.toml +1 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ bytes = "1" cipher = "0.3.0" coarsetime = "0.1.20" digest = "0.10.0" educe = "0.4.6" futures = "0.3.14" asynchronous-codec = "0.6.0" generic-array = "0.14.3" Loading
crates/tor-proto/src/stream/data.rs +23 −32 Original line number Diff line number Diff line Loading @@ -15,12 +15,15 @@ use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite #[cfg(feature = "tokio")] use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use std::fmt::{self, Debug}; use std::fmt::Debug; use std::io::Result as IoResult; use std::pin::Pin; use educe::Educe; use crate::circuit::StreamTarget; use crate::stream::StreamReader; use tor_bytes::skip_fmt; use tor_cell::relaycell::msg::{Data, RelayMsg}; use tor_error::internal; Loading Loading @@ -258,6 +261,8 @@ impl TokioAsyncWrite for DataStream { /// We have to use an enum here because, for as long as we're waiting /// for a flush operation to complete, the future returned by /// `flush_cell()` owns the DataWriterImpl. #[derive(Educe)] #[educe(Debug)] enum DataWriterState { /// The writer has closed or gotten an error: nothing more to do. Closed, Loading @@ -265,20 +270,15 @@ enum DataWriterState { /// immediately. Ready(DataWriterImpl), /// The writer is flushing a cell. Flushing(Pin<Box<dyn Future<Output = (DataWriterImpl, Result<()>)> + Send>>), } impl Debug for DataWriterState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { DataWriterState::Closed => write!(f, "DataWriterState::Closed"), DataWriterState::Ready(i) => write!(f, "DataWriterState::Ready({:?})", i), DataWriterState::Flushing(_) => write!(f, "DataWriterState::Flushing(..)"), } } Flushing( #[educe(Debug(method = "skip_fmt"))] Pin<Box<dyn Future<Output = (DataWriterImpl, Result<()>)> + Send>>, ), } /// Internal: the write part of a DataStream #[derive(Educe)] #[educe(Debug)] struct DataWriterImpl { /// The underlying StreamTarget object. s: StreamTarget, Loading @@ -288,21 +288,13 @@ struct DataWriterImpl { // enough for now. If we _do_ make it bigger, we'll have to change // our use of Data::split_from to handle the case where we can't fit // all the data. #[educe(Debug(method = "skip_fmt"))] buf: Box<[u8; Data::MAXLEN]>, /// Number of unflushed bytes in buf. n_pending: usize, } impl Debug for DataWriterImpl { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("DataWriterImpl") .field("s", &self.s) .field("n_pending", &self.n_pending) .finish_non_exhaustive() } } impl DataWriter { /// Helper for poll_flush() and poll_close(): Performs a flush, then /// closes the stream if should_close is true. Loading Loading @@ -465,6 +457,8 @@ impl DataWriterImpl { /// DataCellImpl. If we wanted to store the future and the cell at the /// same time, we'd need to make a self-referential structure, which isn't /// possible in safe Rust AIUI. #[derive(Educe)] #[educe(Debug)] enum DataReaderState { /// In this state we have received an end cell or an error. Closed, Loading @@ -473,28 +467,25 @@ enum DataReaderState { Ready(DataReaderImpl), /// The reader is currently fetching a cell: this future is the /// progress it is making. ReadingCell(Pin<Box<dyn Future<Output = (DataReaderImpl, Result<()>)> + Send>>), } impl std::fmt::Debug for DataReaderState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Closed => write!(f, "Closed"), Self::Ready(_) => write!(f, "Ready"), Self::ReadingCell(_) => write!(f, "ReadingCell"), } } ReadingCell( #[educe(Debug(method = "skip_fmt"))] Pin<Box<dyn Future<Output = (DataReaderImpl, Result<()>)> + Send>>, ), } /// Wrapper for the read part of a DataStream #[derive(Educe)] #[educe(Debug)] struct DataReaderImpl { /// The underlying StreamReader object. #[educe(Debug(method = "skip_fmt"))] s: StreamReader, /// If present, data that we received on this stream but have not /// been able to send to the caller yet. // TODO: This data structure is probably not what we want, but // it's good enough for now. #[educe(Debug(method = "skip_fmt"))] pending: Vec<u8>, /// Index into pending to show what we've already read. Loading