Verified Commit bd631e1b authored by starius's avatar starius
Browse files

tor-chanmgr: Notify waiters when a launch is cancelled

If a future that owns a pending channel launch is dropped after
publishing the pending entry, other waiters can see the oneshot sender
disappear and report "channel build task disappeared" as an
internal bug.

Fix this by tying pending-entry cleanup and waiter notification
together. Once we take responsibility for a pending launch, every exit
path now removes or upgrades the pending entry and notifies waiters with
the observed result. Cancellation reports RequestCancelled, while
post-build failures keep their original error instead of turning into
the internal bug or retrying the launch owner.

Add regression tests that cover both successive dropped launch-owner
futures and a failure while installing a newly built channel, so
waiters see the expected error in each case.
parent 94ffb820
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -55,7 +55,6 @@ mod mgr;
#[cfg(test)]
mod testing;
pub mod transport;
pub(crate) mod util;

use futures::StreamExt;
use futures::select_biased;
+188 −40
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@

use crate::factory::BootstrapReporter;
use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
use crate::util::defer::Defer;
use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};

use async_trait::async_trait;
@@ -175,6 +174,72 @@ type Pending = Shared<oneshot::Receiver<Result<()>>>;
/// complete it).
type Sending = oneshot::Sender<Result<()>>;

/// Keeps a pending launch entry and its waiters in sync.
///
/// Every exit path from a launch attempt must either remove the pending entry
/// or upgrade it to an open channel, and must notify all waiters with the
/// outcome. This guard makes cancellation and early returns follow the same
/// cleanup path as ordinary failures.
struct PendingLaunchGuard<'a, CF: AbstractChannelFactory> {
    /// Channel state used to remove or upgrade the pending entry.
    channels: &'a state::MgrState<CF>,
    /// Handle to the pending entry, if it has not yet been removed.
    handle: Option<PendingChannelHandle>,
    /// Sender used to notify tasks waiting on this launch.
    send: Option<Sending>,
    /// Result to report to the waiters if the launch ends here.
    result: Result<()>,
}

impl<'a, CF: AbstractChannelFactory> PendingLaunchGuard<'a, CF> {
    /// Create a new guard for a pending launch.
    fn new(channels: &'a state::MgrState<CF>, handle: PendingChannelHandle, send: Sending) -> Self {
        Self {
            channels,
            handle: Some(handle),
            send: Some(send),
            result: Err(Error::RequestCancelled),
        }
    }

    /// Record the result that should be reported to any waiters.
    fn note_result(&mut self, result: Result<()>) {
        self.result = result;
    }

    /// Replace the pending channel with an open one.
    fn upgrade_pending_channel_to_open(&mut self, channel: Arc<CF::Channel>) -> Result<()> {
        let handle = self
            .handle
            .take()
            .expect("pending launch guard lost its handle before upgrade");
        self.channels
            .upgrade_pending_channel_to_open(handle, channel)
    }
}

impl<'a, CF: AbstractChannelFactory> Drop for PendingLaunchGuard<'a, CF> {
    fn drop(&mut self) {
        if let Some(handle) = self.handle.take() {
            if let Err(e) = self.channels.remove_pending_channel(handle) {
                // Just log an error if we're unable to remove it, since there's
                // nothing else we can do here, and returning the error would
                // hide the actual error that we care about (the channel build
                // failure).
                #[allow(clippy::missing_docs_in_private_items)]
                const MSG: &str = "Unable to remove the pending channel";
                error_report!(internal!("{e}"), "{}", MSG);
            }
        }

        if let Some(send) = self.send.take() {
            // It's okay if all the receivers went away:
            // that means that nobody was waiting for this channel.
            let _ignore_err = send.send(self.result.clone());
        }
    }
}

impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
    /// Make a new empty channel manager.
    pub(crate) fn new(
@@ -321,29 +386,16 @@ impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
                // We need to launch a channel.
                Some(Action::Launch((handle, send))) => {
                    trace!("Launching channel");
                    // If the remainder of this code returns early or is cancelled, we still want to
                    // clean up our pending entry in the channel map. The following closure will be
                    // run when dropped to ensure that it's cleaned up properly.
                    //
                    // The `remove_pending_channel` will acquire the lock within `MgrState`, but
                    // this won't lead to deadlocks since the lock is only ever acquired within
                    // methods of `MgrState`. When this `Defer` is being dropped, no other
                    // `MgrState` methods will be running on this thread, so the lock will not have
                    // already been acquired.
                    let defer_remove_pending = Defer::new(handle, |handle| {
                        if let Err(e) = self.channels.remove_pending_channel(handle) {
                            // Just log an error if we're unable to remove it, since there's
                            // nothing else we can do here, and returning the error would
                            // hide the actual error that we care about (the channel build
                            // failure).
                            #[allow(clippy::missing_docs_in_private_items)]
                            const MSG: &str = "Unable to remove the pending channel";
                            error_report!(internal!("{e}"), "{}", MSG);
                        }
                    });

                    let connector = self.channels.builder();
                    let memquota = ChannelAccount::new(&self.memquota)?;
                    let mut launch = PendingLaunchGuard::new(&self.channels, handle, send);
                    let memquota = match ChannelAccount::new(&self.memquota) {
                        Ok(memquota) => memquota,
                        Err(e) => {
                            let e: Error = e.into();
                            launch.note_result(Err(e.clone()));
                            return Err(e);
                        }
                    };

                    let outcome = connector
                        .build_channel(&target, self.reporter.clone(), memquota)
@@ -352,20 +404,19 @@ impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
                    match outcome {
                        Ok(ref chan) => {
                            // Replace the pending channel with the newly built channel.
                            let handle = defer_remove_pending.cancel();
                            self.channels
                                .upgrade_pending_channel_to_open(handle, Arc::clone(chan))?;
                            match launch.upgrade_pending_channel_to_open(Arc::clone(chan)) {
                                Ok(()) => launch.note_result(Ok(())),
                                Err(e) => {
                                    launch.note_result(Err(e.clone()));
                                    return Err(e);
                                }
                            }
                        }
                        Err(_) => {
                            // Remove the pending channel.
                            drop(defer_remove_pending);
                            launch.note_result(outcome.clone().map(|_| ()));
                        }
                    }

                    // It's okay if all the receivers went away:
                    // that means that nobody was waiting for this channel.
                    let _ignore_err = send.send(outcome.clone().map(|_| ()));

                    match outcome {
                        Ok(chan) => {
                            return Ok((chan, ChanProvenance::NewlyCreated));
@@ -513,10 +564,11 @@ mod test {
    use super::*;
    use crate::Error;

    use futures::join;
    use futures::{join, poll};
    use std::error::Error as StdError;
    use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
    use std::sync::Arc;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
    use std::time::Duration;
    use tor_error::bad_api_usage;
    use tor_linkspec::ChannelMethod;
@@ -533,6 +585,7 @@ mod test {
    #[derive(Clone)]
    struct FakeChannelFactory<RT> {
        runtime: RT,
        build_attempts: Arc<AtomicUsize>,
    }

    #[derive(Clone, Debug)]
@@ -568,7 +621,13 @@ mod test {
            _updates: Arc<ChannelPaddingInstructionsUpdates>,
        ) -> tor_proto::Result<()> {
            // *self.last_params.lock().unwrap() = Some((*updates).clone());
            Ok(())
            match self.mood {
                // Build succeeds, but installing the channel into the manager fails.
                'r' => Err(tor_proto::Error::ChanProto(
                    "synthetic reparameterize failure".into(),
                )),
                _ => Ok(()),
            }
        }
        fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
            Ok(())
@@ -595,21 +654,32 @@ mod test {
    }

    impl<RT: Runtime> FakeChannelFactory<RT> {
        fn new(runtime: RT) -> Self {
            FakeChannelFactory { runtime }
        fn new(runtime: RT, build_attempts: Arc<AtomicUsize>) -> Self {
            FakeChannelFactory {
                runtime,
                build_attempts,
            }
        }
    }

    fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
        let cf = FakeChannelFactory::new(runtime);
        AbstractChanMgr::new(
        new_test_abstract_chanmgr_and_build_attempts(runtime).0
    }

    fn new_test_abstract_chanmgr_and_build_attempts<R: Runtime>(
        runtime: R,
    ) -> (AbstractChanMgr<FakeChannelFactory<R>>, Arc<AtomicUsize>) {
        let build_attempts = Arc::new(AtomicUsize::new(0));
        let cf = FakeChannelFactory::new(runtime, Arc::clone(&build_attempts));
        let mgr = AbstractChanMgr::new(
            cf,
            Default::default(),
            Default::default(),
            &Default::default(),
            BootstrapReporter::fake(),
            ToplevelAccount::new_noop(),
        )
        );
        (mgr, build_attempts)
    }

    #[derive(Clone, Debug)]
@@ -640,6 +710,18 @@ mod test {
        bytes.into()
    }

    /// Return true if `needle` appears anywhere in `err`'s error chain.
    fn error_contains(err: &Error, needle: &str) -> bool {
        let mut source: Option<&(dyn StdError + 'static)> = Some(err);
        while let Some(err) = source {
            if err.to_string().contains(needle) || format!("{err:?}").contains(needle) {
                return true;
            }
            source = err.source();
        }
        false
    }

    #[async_trait]
    impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
        type Channel = FakeChannel;
@@ -652,6 +734,7 @@ mod test {
            _reporter: BootstrapReporter,
            _memquota: ChannelAccount,
        ) -> Result<Arc<FakeChannel>> {
            self.build_attempts.fetch_add(1, Ordering::SeqCst);
            yield_now().await;
            let FakeBuildSpec(ident, mood, id, _addr) = *target;
            let ed_ident = u32_to_ed(ident);
@@ -774,6 +857,71 @@ mod test {
        });
    }

    #[test]
    fn dropped_launch_reports_request_cancelled_to_waiters() {
        test_with_one_runtime!(|runtime| async {
            let mgr = new_test_abstract_chanmgr(runtime);
            let target = FakeBuildSpec(777, '💤', u32_to_ed(777), ADDR_A);
            let usage = CU::UserTraffic;

            let mut owner1 = Box::pin(mgr.get_or_launch(target.clone(), usage));
            assert!(poll!(&mut owner1).is_pending());

            let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
            assert!(poll!(&mut waiter).is_pending());

            drop(owner1);

            let mut owner2 = Box::pin(mgr.get_or_launch(target, usage));
            assert!(poll!(&mut owner2).is_pending());

            assert!(poll!(&mut waiter).is_pending());

            drop(owner2);

            let waiter = waiter.await;
            assert!(
                matches!(&waiter, Err(Error::RequestCancelled)),
                "{waiter:?}"
            );
            if let Err(ref err) = waiter {
                assert!(!error_contains(err, "channel build task disappeared"));
            }
        });
    }

    #[test]
    fn failed_upgrade_reports_original_error_without_owner_retry() {
        test_with_one_runtime!(|runtime| async {
            let (mgr, build_attempts) = new_test_abstract_chanmgr_and_build_attempts(runtime);
            let target = FakeBuildSpec(778, 'r', u32_to_ed(778), ADDR_A);
            let usage = CU::UserTraffic;

            let mut owner = Box::pin(mgr.get_or_launch(target.clone(), usage));
            assert!(poll!(&mut owner).is_pending());

            let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
            assert!(poll!(&mut waiter).is_pending());

            let owner = owner.await;
            assert!(matches!(&owner, Err(Error::Internal(_))), "{owner:?}");
            if let Err(ref err) = owner {
                assert!(error_contains(err, "failure on new channel"));
                assert!(!error_contains(err, "channel build task disappeared"));
            }

            assert_eq!(build_attempts.load(Ordering::SeqCst), 1);
            assert!(mgr.get_nowait(&u32_to_ed(778)).is_empty());

            let waiter = waiter.await;
            assert!(matches!(&waiter, Err(Error::Internal(_))), "{waiter:?}");
            if let Err(ref err) = waiter {
                assert!(error_contains(err, "failure on new channel"));
                assert!(!error_contains(err, "channel build task disappeared"));
            }
        });
    }

    #[test]
    fn unusable_entries() {
        test_with_one_runtime!(|runtime| async {

crates/tor-chanmgr/src/util.rs

deleted100644 → 0
+0 −3
Original line number Diff line number Diff line
//! Utilities used for the channel manager.

pub(crate) mod defer;
+0 −75
Original line number Diff line number Diff line
//! Defer a closure until later.

/// Runs a closure when dropped.
pub(crate) struct Defer<T, F: FnOnce(T)>(Option<DeferInner<T, F>>);

/// Everything contained by a [`Defer`].
struct DeferInner<T, F: FnOnce(T)> {
    /// The argument `f` should be called with when [`Defer`] is dropped.
    arg: T,
    /// The function to call.
    f: F,
}

impl<T, F: FnOnce(T)> Defer<T, F> {
    /// Defer running the provided closure `f` with `arg` until the returned [`Defer`] is dropped.
    #[must_use]
    pub(crate) fn new(arg: T, f: F) -> Self {
        Self(Some(DeferInner { arg, f }))
    }

    /// Return the provided `T` and drop the provided closure without running it.
    pub(crate) fn cancel(mut self) -> T {
        // other than the drop handler, there are no other places that mutate the `Option`, so it
        // should always be `Some` here
        self.0.take().expect("`Defer` is missing a value").arg
    }
}

impl<T, F: FnOnce(T)> std::ops::Drop for Defer<T, F> {
    fn drop(&mut self) {
        if let Some(DeferInner { arg, f }) = self.0.take() {
            f(arg);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::sync::atomic::{AtomicU32, Ordering};

    #[test]
    fn test_drop() {
        let x = AtomicU32::new(0);
        {
            let _defer = Defer::new(5, |inc| {
                x.fetch_add(inc, Ordering::Relaxed);
            });
            assert_eq!(x.load(Ordering::Relaxed), 0);
        }
        assert_eq!(x.load(Ordering::Relaxed), 5);
    }

    #[test]
    fn test_cancel() {
        let x = AtomicU32::new(0);
        {
            let defer = Defer::new(5, |inc| {
                x.fetch_add(inc, Ordering::Relaxed);
            });
            assert_eq!(defer.cancel(), 5);
            assert_eq!(x.load(Ordering::Relaxed), 0);
        }
        assert_eq!(x.load(Ordering::Relaxed), 0);
    }

    #[test]
    #[should_panic]
    fn test_panic() {
        let _ = Defer::new((), |()| {
            panic!("intentional panic");
        });
    }
}