Loading crates/tor-chanmgr/src/lib.rs +17 −3 Original line number Original line Diff line number Diff line Loading @@ -82,6 +82,17 @@ pub struct ChanMgr<R: Runtime> { bootstrap_status: event::ConnStatusEvents, bootstrap_status: event::ConnStatusEvents, } } /// Description of how we got a channel. #[non_exhaustive] #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum ChanProvenance { /// This channel was newly launched, or was in progress and finished while /// we were waiting. NewlyCreated, /// This channel already existed when we asked for it. Preexisting, } impl<R: Runtime> ChanMgr<R> { impl<R: Runtime> ChanMgr<R> { /// Construct a new channel manager. /// Construct a new channel manager. /// /// Loading Loading @@ -118,16 +129,19 @@ impl<R: Runtime> ChanMgr<R> { /// If there is already a channel launch attempt in progress, this /// If there is already a channel launch attempt in progress, this /// function will wait until that launch is complete, and succeed /// function will wait until that launch is complete, and succeed /// or fail depending on its outcome. /// or fail depending on its outcome. pub async fn get_or_launch<T: ChanTarget + ?Sized>(&self, target: &T) -> Result<Channel> { pub async fn get_or_launch<T: ChanTarget + ?Sized>( &self, target: &T, ) -> Result<(Channel, ChanProvenance)> { let ed_identity = target.ed_identity(); let ed_identity = target.ed_identity(); let targetinfo = OwnedChanTarget::from_chan_target(target); let targetinfo = OwnedChanTarget::from_chan_target(target); let chan = self.mgr.get_or_launch(*ed_identity, targetinfo).await?; let (chan, provenance) = self.mgr.get_or_launch(*ed_identity, targetinfo).await?; // Double-check the match to make sure that the RSA identity is // Double-check the match to make sure that the RSA identity is // what we wanted too. // what we wanted too. chan.check_match(target) chan.check_match(target) .map_err(Error::from_proto_no_skew)?; .map_err(Error::from_proto_no_skew)?; Ok(chan) Ok((chan, provenance)) } } /// Return a stream of [`ConnStatus`] events to tell us about changes /// Return a stream of [`ConnStatus`] events to tell us about changes Loading crates/tor-chanmgr/src/mgr.rs +14 −11 Original line number Original line Diff line number Diff line //! Abstract implementation of a channel manager //! Abstract implementation of a channel manager use crate::mgr::map::OpenEntry; use crate::mgr::map::OpenEntry; use crate::{Error, Result}; use crate::{ChanProvenance, Error, Result}; use async_trait::async_trait; use async_trait::async_trait; use futures::channel::oneshot; use futures::channel::oneshot; Loading Loading @@ -110,7 +110,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { &self, &self, ident: <<CF as ChannelFactory>::Channel as AbstractChannel>::Ident, ident: <<CF as ChannelFactory>::Channel as AbstractChannel>::Ident, target: CF::BuildSpec, target: CF::BuildSpec, ) -> Result<CF::Channel> { ) -> Result<(CF::Channel, ChanProvenance)> { use map::ChannelState::*; use map::ChannelState::*; /// Possible actions that we'll decide to take based on the /// Possible actions that we'll decide to take based on the Loading @@ -123,7 +123,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { /// We're going to wait for it to finish. /// We're going to wait for it to finish. Wait(Pending<C>), Wait(Pending<C>), /// We found a usable channel. We're going to return it. /// We found a usable channel. We're going to return it. Return(Result<C>), Return(Result<(C, ChanProvenance)>), } } /// How many times do we try? /// How many times do we try? const N_ATTEMPTS: usize = 2; const N_ATTEMPTS: usize = 2; Loading @@ -140,7 +140,10 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { Some(Open(ref ent)) => { Some(Open(ref ent)) => { if ent.channel.is_usable() { if ent.channel.is_usable() { // Good channel. Return it. // Good channel. Return it. let action = Action::Return(Ok(ent.channel.clone())); let action = Action::Return(Ok(( ent.channel.clone(), ChanProvenance::Preexisting, ))); (oldstate, action) (oldstate, action) } else { } else { // Unusable channel. Move to the Building // Unusable channel. Move to the Building Loading Loading @@ -181,7 +184,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { } } // There's an in-progress channel. Wait for it. // There's an in-progress channel. Wait for it. Action::Wait(pend) => match pend.await { Action::Wait(pend) => match pend.await { Ok(Ok(chan)) => return Ok(chan), Ok(Ok(chan)) => return Ok((chan, ChanProvenance::NewlyCreated)), Ok(Err(e)) => { Ok(Err(e)) => { last_err = Some(e); last_err = Some(e); } } Loading @@ -207,7 +210,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { // It's okay if all the receivers went away: // It's okay if all the receivers went away: // that means that nobody was waiting for this channel. // that means that nobody was waiting for this channel. let _ignore_err = send.send(Ok(chan.clone())); let _ignore_err = send.send(Ok(chan.clone())); return Ok(chan); return Ok((chan, ChanProvenance::NewlyCreated)); } } Err(e) => { Err(e) => { // The channel failed. Make it non-pending, tell the // The channel failed. Make it non-pending, tell the Loading Loading @@ -340,8 +343,8 @@ mod test { let cf = FakeChannelFactory::new(runtime); let cf = FakeChannelFactory::new(runtime); let mgr = AbstractChanMgr::new(cf); let mgr = AbstractChanMgr::new(cf); let target = (413, '!'); let target = (413, '!'); let chan1 = mgr.get_or_launch(413, target).await.unwrap(); let chan1 = mgr.get_or_launch(413, target).await.unwrap().0; let chan2 = mgr.get_or_launch(413, target).await.unwrap(); let chan2 = mgr.get_or_launch(413, target).await.unwrap().0; assert_eq!(chan1, chan2); assert_eq!(chan1, chan2); Loading Loading @@ -411,14 +414,14 @@ mod test { mgr.get_or_launch(5, (5, 'a')), mgr.get_or_launch(5, (5, 'a')), ); ); let ch3 = ch3.unwrap(); let ch3 = ch3.unwrap().0; let _ch4 = ch4.unwrap(); let _ch4 = ch4.unwrap(); let ch5 = ch5.unwrap(); let ch5 = ch5.unwrap().0; ch3.start_closing(); ch3.start_closing(); ch5.start_closing(); ch5.start_closing(); let ch3_new = mgr.get_or_launch(3, (3, 'b')).await.unwrap(); let ch3_new = mgr.get_or_launch(3, (3, 'b')).await.unwrap().0; assert_ne!(ch3, ch3_new); assert_ne!(ch3, ch3_new); assert_eq!(ch3_new.mood, 'b'); assert_eq!(ch3_new.mood, 'b'); Loading crates/tor-circmgr/src/build.rs +8 −7 Original line number Original line Diff line number Diff line Loading @@ -73,7 +73,8 @@ async fn create_common<RT: Runtime, CT: ChanTarget>( rt: &RT, rt: &RT, target: &CT, target: &CT, ) -> Result<PendingClientCirc> { ) -> Result<PendingClientCirc> { let chan = chanmgr let (chan, _provenance) = chanmgr .get_or_launch(target) .get_or_launch(target) .await .await .map_err(|cause| Error::Channel { .map_err(|cause| Error::Channel { Loading doc/semver_status.md +1 −0 Original line number Original line Diff line number Diff line Loading @@ -25,6 +25,7 @@ MODIFIED: Added `reset()` method to RetrySchedule. ### tor-chanmgr ### tor-chanmgr BREAKING: Added members to `Error::Proto` BREAKING: Added members to `Error::Proto` BREAKING: Added `ChanProvenance` to `ChanMgr::get_or_launch`. ### tor-circmgr ### tor-circmgr Loading Loading
crates/tor-chanmgr/src/lib.rs +17 −3 Original line number Original line Diff line number Diff line Loading @@ -82,6 +82,17 @@ pub struct ChanMgr<R: Runtime> { bootstrap_status: event::ConnStatusEvents, bootstrap_status: event::ConnStatusEvents, } } /// Description of how we got a channel. #[non_exhaustive] #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum ChanProvenance { /// This channel was newly launched, or was in progress and finished while /// we were waiting. NewlyCreated, /// This channel already existed when we asked for it. Preexisting, } impl<R: Runtime> ChanMgr<R> { impl<R: Runtime> ChanMgr<R> { /// Construct a new channel manager. /// Construct a new channel manager. /// /// Loading Loading @@ -118,16 +129,19 @@ impl<R: Runtime> ChanMgr<R> { /// If there is already a channel launch attempt in progress, this /// If there is already a channel launch attempt in progress, this /// function will wait until that launch is complete, and succeed /// function will wait until that launch is complete, and succeed /// or fail depending on its outcome. /// or fail depending on its outcome. pub async fn get_or_launch<T: ChanTarget + ?Sized>(&self, target: &T) -> Result<Channel> { pub async fn get_or_launch<T: ChanTarget + ?Sized>( &self, target: &T, ) -> Result<(Channel, ChanProvenance)> { let ed_identity = target.ed_identity(); let ed_identity = target.ed_identity(); let targetinfo = OwnedChanTarget::from_chan_target(target); let targetinfo = OwnedChanTarget::from_chan_target(target); let chan = self.mgr.get_or_launch(*ed_identity, targetinfo).await?; let (chan, provenance) = self.mgr.get_or_launch(*ed_identity, targetinfo).await?; // Double-check the match to make sure that the RSA identity is // Double-check the match to make sure that the RSA identity is // what we wanted too. // what we wanted too. chan.check_match(target) chan.check_match(target) .map_err(Error::from_proto_no_skew)?; .map_err(Error::from_proto_no_skew)?; Ok(chan) Ok((chan, provenance)) } } /// Return a stream of [`ConnStatus`] events to tell us about changes /// Return a stream of [`ConnStatus`] events to tell us about changes Loading
crates/tor-chanmgr/src/mgr.rs +14 −11 Original line number Original line Diff line number Diff line //! Abstract implementation of a channel manager //! Abstract implementation of a channel manager use crate::mgr::map::OpenEntry; use crate::mgr::map::OpenEntry; use crate::{Error, Result}; use crate::{ChanProvenance, Error, Result}; use async_trait::async_trait; use async_trait::async_trait; use futures::channel::oneshot; use futures::channel::oneshot; Loading Loading @@ -110,7 +110,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { &self, &self, ident: <<CF as ChannelFactory>::Channel as AbstractChannel>::Ident, ident: <<CF as ChannelFactory>::Channel as AbstractChannel>::Ident, target: CF::BuildSpec, target: CF::BuildSpec, ) -> Result<CF::Channel> { ) -> Result<(CF::Channel, ChanProvenance)> { use map::ChannelState::*; use map::ChannelState::*; /// Possible actions that we'll decide to take based on the /// Possible actions that we'll decide to take based on the Loading @@ -123,7 +123,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { /// We're going to wait for it to finish. /// We're going to wait for it to finish. Wait(Pending<C>), Wait(Pending<C>), /// We found a usable channel. We're going to return it. /// We found a usable channel. We're going to return it. Return(Result<C>), Return(Result<(C, ChanProvenance)>), } } /// How many times do we try? /// How many times do we try? const N_ATTEMPTS: usize = 2; const N_ATTEMPTS: usize = 2; Loading @@ -140,7 +140,10 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { Some(Open(ref ent)) => { Some(Open(ref ent)) => { if ent.channel.is_usable() { if ent.channel.is_usable() { // Good channel. Return it. // Good channel. Return it. let action = Action::Return(Ok(ent.channel.clone())); let action = Action::Return(Ok(( ent.channel.clone(), ChanProvenance::Preexisting, ))); (oldstate, action) (oldstate, action) } else { } else { // Unusable channel. Move to the Building // Unusable channel. Move to the Building Loading Loading @@ -181,7 +184,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { } } // There's an in-progress channel. Wait for it. // There's an in-progress channel. Wait for it. Action::Wait(pend) => match pend.await { Action::Wait(pend) => match pend.await { Ok(Ok(chan)) => return Ok(chan), Ok(Ok(chan)) => return Ok((chan, ChanProvenance::NewlyCreated)), Ok(Err(e)) => { Ok(Err(e)) => { last_err = Some(e); last_err = Some(e); } } Loading @@ -207,7 +210,7 @@ impl<CF: ChannelFactory> AbstractChanMgr<CF> { // It's okay if all the receivers went away: // It's okay if all the receivers went away: // that means that nobody was waiting for this channel. // that means that nobody was waiting for this channel. let _ignore_err = send.send(Ok(chan.clone())); let _ignore_err = send.send(Ok(chan.clone())); return Ok(chan); return Ok((chan, ChanProvenance::NewlyCreated)); } } Err(e) => { Err(e) => { // The channel failed. Make it non-pending, tell the // The channel failed. Make it non-pending, tell the Loading Loading @@ -340,8 +343,8 @@ mod test { let cf = FakeChannelFactory::new(runtime); let cf = FakeChannelFactory::new(runtime); let mgr = AbstractChanMgr::new(cf); let mgr = AbstractChanMgr::new(cf); let target = (413, '!'); let target = (413, '!'); let chan1 = mgr.get_or_launch(413, target).await.unwrap(); let chan1 = mgr.get_or_launch(413, target).await.unwrap().0; let chan2 = mgr.get_or_launch(413, target).await.unwrap(); let chan2 = mgr.get_or_launch(413, target).await.unwrap().0; assert_eq!(chan1, chan2); assert_eq!(chan1, chan2); Loading Loading @@ -411,14 +414,14 @@ mod test { mgr.get_or_launch(5, (5, 'a')), mgr.get_or_launch(5, (5, 'a')), ); ); let ch3 = ch3.unwrap(); let ch3 = ch3.unwrap().0; let _ch4 = ch4.unwrap(); let _ch4 = ch4.unwrap(); let ch5 = ch5.unwrap(); let ch5 = ch5.unwrap().0; ch3.start_closing(); ch3.start_closing(); ch5.start_closing(); ch5.start_closing(); let ch3_new = mgr.get_or_launch(3, (3, 'b')).await.unwrap(); let ch3_new = mgr.get_or_launch(3, (3, 'b')).await.unwrap().0; assert_ne!(ch3, ch3_new); assert_ne!(ch3, ch3_new); assert_eq!(ch3_new.mood, 'b'); assert_eq!(ch3_new.mood, 'b'); Loading
crates/tor-circmgr/src/build.rs +8 −7 Original line number Original line Diff line number Diff line Loading @@ -73,7 +73,8 @@ async fn create_common<RT: Runtime, CT: ChanTarget>( rt: &RT, rt: &RT, target: &CT, target: &CT, ) -> Result<PendingClientCirc> { ) -> Result<PendingClientCirc> { let chan = chanmgr let (chan, _provenance) = chanmgr .get_or_launch(target) .get_or_launch(target) .await .await .map_err(|cause| Error::Channel { .map_err(|cause| Error::Channel { Loading
doc/semver_status.md +1 −0 Original line number Original line Diff line number Diff line Loading @@ -25,6 +25,7 @@ MODIFIED: Added `reset()` method to RetrySchedule. ### tor-chanmgr ### tor-chanmgr BREAKING: Added members to `Error::Proto` BREAKING: Added members to `Error::Proto` BREAKING: Added `ChanProvenance` to `ChanMgr::get_or_launch`. ### tor-circmgr ### tor-circmgr Loading