Loading crates/tor-basic-utils/src/futures.rs +115 −13 Original line number Diff line number Diff line Loading @@ -21,12 +21,32 @@ where { /// For processing an item obtained from a future, avoiding async cancel lossage /// /// Prepares to send a output message `OM` to an input stream `OS` (`self`), /// ``` /// # use futures::channel::mpsc; /// # use tor_basic_utils::futures::SinkExt as _; /// # /// # #[tokio::main] /// # async fn main() -> Result<(),mpsc::SendError> { /// # let (mut sink, sink_r) = mpsc::unbounded::<usize>(); /// # let message_generator_future = futures::future::ready(42); /// # let process_message = |m| Ok::<_,mpsc::SendError>(m); /// let (message, sendable) = sink.prepare_send_from( /// message_generator_future /// ).await?; /// let message = process_message(message)?; /// sendable.send(message); /// # Ok(()) /// # } /// ``` /// /// Prepares to send a output message[^terminology] `OM` to an output sink `OS` (`self`), /// where the `OM` is made from an input message `IM`, /// and the `IM` is obtained from a future, `generator: IF`. // This slightly inconsistent terminology, "item" vs "message", // avoids having to have the generic parameters named `OI` and `II` // where `I` is sometimes "item" and sometimes "input". /// /// [^terminology]: We sometimes use slightly inconsistent terminology, /// "item" vs "message". /// This avoids having to have the generic parameters by named `OI` and `II` /// where `I` is sometimes "item" and sometimes "input". /// /// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`. /// Loading @@ -35,14 +55,73 @@ where /// /// # Why use this /// /// This avoids the following async cancellation hazaard /// This avoids the an async cancellation hazard /// which exists with naive use of `select!` /// followed by `OS.send().await`: /// followed by `OS.send().await`. You might write this: /// /// If the input is ready, the corresponding `select!` branch /// will trigger, yielding the next input item. Then, if the output is *not* ready, awaiting /// will have that arm return `Pending`, disscarding the item. /// ```rust,ignore /// select!{ /// message = input_stream.next() => { /// if let Some(message) = message { /// let message = do_our_processing(message); /// output_sink(message).await; // <---**BUG** /// } /// } /// control = something_else() => { .. } /// } /// ``` /// /// If, when we reach `BUG`, the output sink is not ready to receive the message, /// the future for that particular `select!` branch will be suspended. /// But when `select!` finds that *any one* of the branches is ready, /// it *drops* the futures for the other branches. /// That drops all the local variables, including possibly `message`, losing it. /// /// For more about cancellation safety, see /// [Rust for the Polyglot Programmer](https://www.chiark.greenend.org.uk/~ianmdlvl/rust-polyglot/async.html#cancellation-safety) /// which has a general summary, and /// Matthias Einwag's /// [extensive discussion in his gist](https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8#cancellation-in-async-rust) /// with comparisons to other languages. /// /// ## Alternatives /// /// Unbounded mpsc channels, and certain other primitives, /// do not suffer from this problem because they do not block. /// `UnboundedSender` offers /// [`unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send) /// but only as an inherent method, so this does not compose with `Sink` combinators. /// And of course unbounded channels do not implement any backpressure. /// /// The problem can otherwise be avoided by completely eschewing use of `select!` /// and writing manual implementations of `Future`, `Sink`, and so on, /// However, such code is typically considerably more complex and involves /// entangling the primary logic with future machinery. /// It is normally better to write primary functionality in `async { }` /// using utilities (often "futures combinators") such as this one. /// // Personal note from @Diziet: // IMO it is generally accepted in the Rust communit that // it is not good practice to write principal code at the manual futues level. // However, I have not been able to find very clear support for this proposition. // There are endless articles explaining how futures work internally, // often by describing how to reimplement standard combinators such as `map`. // ISTM that these exist to help understanding, // but it seems to be only rarely stated that doing this is not generally a good idea. // // I did find the following: // // https://dev.to/mindflavor/rust-futures-an-uneducated-short-and-hopefully-not-boring-tutorial---part-4---a-real-future-from-scratch-734#conclusion // // Of course you generally do not write a future manually. You use the ones provided by // libraries and compose them as needed. It's important to understand how they work // nevertheless. // // And of curse the existence of the `futures` crate is indicative: // it consists almost entirely of combinators and utilities // whose purpose is to allow you to write many structures in async code // without needing to resort to manual future impls. // /// # Example /// /// This comprehensive example demonstrates how to read from possibly multiple sources Loading Loading @@ -79,6 +158,7 @@ where /// # Formally /// /// [`prepare_send_from`](SinkExt::prepare_send_from) /// returns a [`SinkPrepareSendFuture`] which, when awaited: /// /// * Waits for `OS` to be ready to receive an item. /// * Runs `message_generator` to obtain a `IM`. Loading @@ -87,13 +167,14 @@ where /// The caller should then: /// /// * Check the error from `prepare_send_from` /// (which came from the *output* stream). /// (which came from the *output* sink). /// * Process the `IM`, making an `OM` out of it. /// * Call [`sendable.send()`](SinkSendable::send) (and check its error). /// /// # Flushing /// /// `prepare_send_from` will [`flush`](futures::SinkExt::flush) the output sink /// `prepare_send_from` will (when awaited) /// [`flush`](futures::SinkExt::flush) the output sink /// when it finds the input is not ready yet. /// Until then items may be buffered /// (as if they had been written with [`feed`](futures::SinkExt::feed)). Loading Loading @@ -184,6 +265,14 @@ pub struct SinkPrepareSendFuture<'w, IF, OS, OM> { // can't move `output` out of this struct to put it into the `SinkSendable`. // (The poll() impl cannot borrow from SinkPrepareSendFuture.) output: Option<Pin<&'w mut OS>>, // `fn(OM)` gives contravariance in OM. Variance is confusing. // Loosely, a SinkPrepareSendFuture<..OM> consumes an OM. // Actually, we don't really need to add any variance restricions wrt OM, // because the &mut OS already implies the correct variance, // so we could have used the PhantomData<fn(*const OM)> trick. // Happily there is no unsafe anywhere nearby, so it is not possible for us to write // a bug due to getting the variance wrong - only to erroneously prevent some use // case. tw: PhantomData<fn(OM)>, } Loading @@ -196,6 +285,10 @@ pub struct SinkPrepareSendFuture<'w, IF, OS, OM> { /// (and constitutes a proof token that the sink has declared itself ready for that). /// /// The only useful method is [`send`](SinkSendable::send). /// /// `SinkSendable` has no drop glue and can be freely dropped, /// for example if you prepare to send a message and then /// encounter an error when producing the output message. #[must_use] pub struct SinkSendable<'w, OS, OM> { output: Pin<&'w mut OS>, Loading @@ -215,7 +308,12 @@ where let () = match ready!(self_.output.as_mut().unwrap().as_mut().poll_ready(cx)) { Err(e) => { dprintln!("poll: output poll = IF.Err SO IF.Err"); // Deliberately don't fuse by taking output // Deliberately don't fuse by `take`ing output. If we did that, we would expose // our caller to an additional panic risk. There is no harm in polling the output // sink again: although `Sink` documents that a sink that returns errors will // probalby continue to do so, it is not forbidden to try it and see. This is in // any case better than definitely crashing if the `SinkPrepareSendFuture` is // polled after it gave Ready. return Poll::Ready(Err(e)); } Ok(x) => { Loading @@ -227,7 +325,11 @@ where let value = match self_.generator.as_mut().poll(cx) { Poll::Pending => { // We defer flushing the output until the input stops yielding. // Or to put it another way, we do not return `Pending` without flushing. // This allows our caller (which is typically a loop) to transfer multiple // items from their input to their output between flushes. // // But we must not return `Pending` without flushing, or the caller could block // without flushing output, leading to untimely delivery of buffered data. dprintln!("poll: generator = Pending calling output flush"); let flushed = self_.output.as_mut().unwrap().as_mut().poll_flush(cx); return match flushed { Loading Loading
crates/tor-basic-utils/src/futures.rs +115 −13 Original line number Diff line number Diff line Loading @@ -21,12 +21,32 @@ where { /// For processing an item obtained from a future, avoiding async cancel lossage /// /// Prepares to send a output message `OM` to an input stream `OS` (`self`), /// ``` /// # use futures::channel::mpsc; /// # use tor_basic_utils::futures::SinkExt as _; /// # /// # #[tokio::main] /// # async fn main() -> Result<(),mpsc::SendError> { /// # let (mut sink, sink_r) = mpsc::unbounded::<usize>(); /// # let message_generator_future = futures::future::ready(42); /// # let process_message = |m| Ok::<_,mpsc::SendError>(m); /// let (message, sendable) = sink.prepare_send_from( /// message_generator_future /// ).await?; /// let message = process_message(message)?; /// sendable.send(message); /// # Ok(()) /// # } /// ``` /// /// Prepares to send a output message[^terminology] `OM` to an output sink `OS` (`self`), /// where the `OM` is made from an input message `IM`, /// and the `IM` is obtained from a future, `generator: IF`. // This slightly inconsistent terminology, "item" vs "message", // avoids having to have the generic parameters named `OI` and `II` // where `I` is sometimes "item" and sometimes "input". /// /// [^terminology]: We sometimes use slightly inconsistent terminology, /// "item" vs "message". /// This avoids having to have the generic parameters by named `OI` and `II` /// where `I` is sometimes "item" and sometimes "input". /// /// When successfully run, `prepare_send_from` gives `(IM, SinkSendable)`. /// Loading @@ -35,14 +55,73 @@ where /// /// # Why use this /// /// This avoids the following async cancellation hazaard /// This avoids the an async cancellation hazard /// which exists with naive use of `select!` /// followed by `OS.send().await`: /// followed by `OS.send().await`. You might write this: /// /// If the input is ready, the corresponding `select!` branch /// will trigger, yielding the next input item. Then, if the output is *not* ready, awaiting /// will have that arm return `Pending`, disscarding the item. /// ```rust,ignore /// select!{ /// message = input_stream.next() => { /// if let Some(message) = message { /// let message = do_our_processing(message); /// output_sink(message).await; // <---**BUG** /// } /// } /// control = something_else() => { .. } /// } /// ``` /// /// If, when we reach `BUG`, the output sink is not ready to receive the message, /// the future for that particular `select!` branch will be suspended. /// But when `select!` finds that *any one* of the branches is ready, /// it *drops* the futures for the other branches. /// That drops all the local variables, including possibly `message`, losing it. /// /// For more about cancellation safety, see /// [Rust for the Polyglot Programmer](https://www.chiark.greenend.org.uk/~ianmdlvl/rust-polyglot/async.html#cancellation-safety) /// which has a general summary, and /// Matthias Einwag's /// [extensive discussion in his gist](https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8#cancellation-in-async-rust) /// with comparisons to other languages. /// /// ## Alternatives /// /// Unbounded mpsc channels, and certain other primitives, /// do not suffer from this problem because they do not block. /// `UnboundedSender` offers /// [`unbounded_send`](futures::channel::mpsc::UnboundedSender::unbounded_send) /// but only as an inherent method, so this does not compose with `Sink` combinators. /// And of course unbounded channels do not implement any backpressure. /// /// The problem can otherwise be avoided by completely eschewing use of `select!` /// and writing manual implementations of `Future`, `Sink`, and so on, /// However, such code is typically considerably more complex and involves /// entangling the primary logic with future machinery. /// It is normally better to write primary functionality in `async { }` /// using utilities (often "futures combinators") such as this one. /// // Personal note from @Diziet: // IMO it is generally accepted in the Rust communit that // it is not good practice to write principal code at the manual futues level. // However, I have not been able to find very clear support for this proposition. // There are endless articles explaining how futures work internally, // often by describing how to reimplement standard combinators such as `map`. // ISTM that these exist to help understanding, // but it seems to be only rarely stated that doing this is not generally a good idea. // // I did find the following: // // https://dev.to/mindflavor/rust-futures-an-uneducated-short-and-hopefully-not-boring-tutorial---part-4---a-real-future-from-scratch-734#conclusion // // Of course you generally do not write a future manually. You use the ones provided by // libraries and compose them as needed. It's important to understand how they work // nevertheless. // // And of curse the existence of the `futures` crate is indicative: // it consists almost entirely of combinators and utilities // whose purpose is to allow you to write many structures in async code // without needing to resort to manual future impls. // /// # Example /// /// This comprehensive example demonstrates how to read from possibly multiple sources Loading Loading @@ -79,6 +158,7 @@ where /// # Formally /// /// [`prepare_send_from`](SinkExt::prepare_send_from) /// returns a [`SinkPrepareSendFuture`] which, when awaited: /// /// * Waits for `OS` to be ready to receive an item. /// * Runs `message_generator` to obtain a `IM`. Loading @@ -87,13 +167,14 @@ where /// The caller should then: /// /// * Check the error from `prepare_send_from` /// (which came from the *output* stream). /// (which came from the *output* sink). /// * Process the `IM`, making an `OM` out of it. /// * Call [`sendable.send()`](SinkSendable::send) (and check its error). /// /// # Flushing /// /// `prepare_send_from` will [`flush`](futures::SinkExt::flush) the output sink /// `prepare_send_from` will (when awaited) /// [`flush`](futures::SinkExt::flush) the output sink /// when it finds the input is not ready yet. /// Until then items may be buffered /// (as if they had been written with [`feed`](futures::SinkExt::feed)). Loading Loading @@ -184,6 +265,14 @@ pub struct SinkPrepareSendFuture<'w, IF, OS, OM> { // can't move `output` out of this struct to put it into the `SinkSendable`. // (The poll() impl cannot borrow from SinkPrepareSendFuture.) output: Option<Pin<&'w mut OS>>, // `fn(OM)` gives contravariance in OM. Variance is confusing. // Loosely, a SinkPrepareSendFuture<..OM> consumes an OM. // Actually, we don't really need to add any variance restricions wrt OM, // because the &mut OS already implies the correct variance, // so we could have used the PhantomData<fn(*const OM)> trick. // Happily there is no unsafe anywhere nearby, so it is not possible for us to write // a bug due to getting the variance wrong - only to erroneously prevent some use // case. tw: PhantomData<fn(OM)>, } Loading @@ -196,6 +285,10 @@ pub struct SinkPrepareSendFuture<'w, IF, OS, OM> { /// (and constitutes a proof token that the sink has declared itself ready for that). /// /// The only useful method is [`send`](SinkSendable::send). /// /// `SinkSendable` has no drop glue and can be freely dropped, /// for example if you prepare to send a message and then /// encounter an error when producing the output message. #[must_use] pub struct SinkSendable<'w, OS, OM> { output: Pin<&'w mut OS>, Loading @@ -215,7 +308,12 @@ where let () = match ready!(self_.output.as_mut().unwrap().as_mut().poll_ready(cx)) { Err(e) => { dprintln!("poll: output poll = IF.Err SO IF.Err"); // Deliberately don't fuse by taking output // Deliberately don't fuse by `take`ing output. If we did that, we would expose // our caller to an additional panic risk. There is no harm in polling the output // sink again: although `Sink` documents that a sink that returns errors will // probalby continue to do so, it is not forbidden to try it and see. This is in // any case better than definitely crashing if the `SinkPrepareSendFuture` is // polled after it gave Ready. return Poll::Ready(Err(e)); } Ok(x) => { Loading @@ -227,7 +325,11 @@ where let value = match self_.generator.as_mut().poll(cx) { Poll::Pending => { // We defer flushing the output until the input stops yielding. // Or to put it another way, we do not return `Pending` without flushing. // This allows our caller (which is typically a loop) to transfer multiple // items from their input to their output between flushes. // // But we must not return `Pending` without flushing, or the caller could block // without flushing output, leading to untimely delivery of buffered data. dprintln!("poll: generator = Pending calling output flush"); let flushed = self_.output.as_mut().unwrap().as_mut().poll_flush(cx); return match flushed { Loading