Loading crates/tor-circmgr/src/lib.rs +9 −1 Original line number Diff line number Diff line Loading @@ -210,9 +210,17 @@ impl<R: Runtime> CircMgr<R> { Ok(()) } /// Switch from having an unowned persistent state to having an owned one. /// /// Requires that we hold the lock on the state files. pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> { warn!("upgrade_to_owned_persistent_state isn't implemented."); Ok(()) } /// Flush state to the state manager, if there is any unsaved state and /// we have the lock. pub fn update_persistent_state(&self) -> Result<()> { pub fn store_persistent_state(&self) -> Result<()> { self.mgr.peek_builder().save_state()?; self.mgr .peek_builder() Loading crates/tor-client/src/client.rs +29 −24 Original line number Diff line number Diff line Loading @@ -151,7 +151,7 @@ impl<R: Runtime> TorClient<R> { client_cfg: ClientConfig, ) -> Result<TorClient<R>> { let statemgr = FsStateMgr::from_path(state_cfg)?; if statemgr.try_lock()? { if statemgr.try_lock()?.held() { debug!("It appears we have the lock on our state files."); } else { info!( Loading Loading @@ -365,38 +365,41 @@ async fn update_persistent_state<R: Runtime>( circmgr: Weak<tor_circmgr::CircMgr<R>>, statemgr: FsStateMgr, ) { #![allow(clippy::collapsible_else_if)] // TODO: Consider moving this into tor-circmgr after we have more // TODO: Consider moving this function into tor-circmgr after we have more // experience with the state system. loop { let had_lock = statemgr.can_store(); let lock_acquired = match statemgr.try_lock() { Ok(v) => v, if let Some(circmgr) = Weak::upgrade(&circmgr) { use tor_persist::LockStatus::*; match statemgr.try_lock() { Err(e) => { error!("Unable to check lock status: {}", e); error!("Problem with state lock file: {}", e); break; } }; if !had_lock && lock_acquired { Ok(NewlyAcquired) => { info!("We now own the lock on our state files."); if let Err(e) = circmgr.upgrade_to_owned_persistent_state() { error!("Unable to upgrade to owneed state files: {}", e); break; } if let Some(circmgr) = Weak::upgrade(&circmgr) { if !had_lock { } Ok(AlreadyHeld) => { if let Err(e) = circmgr.store_persistent_state() { error!("Unable to flush circmgr state: {}", e); break; } } Ok(NoLock) => { if let Err(e) = circmgr.reload_persistent_state() { error!("Unable to reload circmgr state: {}", e); break; } } else { if let Err(e) = circmgr.update_persistent_state() { error!("Unable to flush circmgr state: {}", e); break; } } } else { debug!("Circmgr has disappeared; task exiting."); break; return; } // XXXX This delay is probably too small. // Loading @@ -406,6 +409,8 @@ async fn update_persistent_state<R: Runtime>( // changes. runtime.sleep(Duration::from_secs(60)).await; } error!("State update task is exiting prematurely."); } /// Run indefinitely, launching circuits as needed to get a good Loading Loading @@ -449,7 +454,7 @@ impl<R: Runtime> Drop for TorClient<R> { // TODO: Consider moving this into tor-circmgr after we have more // experience with the state system. fn drop(&mut self) { match self.circmgr.update_persistent_state() { match self.circmgr.store_persistent_state() { Ok(()) => info!("Flushed persistent state at exit."), Err(tor_circmgr::Error::State(tor_persist::Error::NoLock)) => { debug!("Lock not held; no state to flush.") Loading crates/tor-client/src/err.rs +2 −2 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ pub enum Error { #[error("Protocol error while launching a stream: {0}")] Proto(#[from] tor_proto::Error), /// A protocol error while launching a stream #[error("Persist error while launching a stream: {0}")] /// An error while interfacing with the persistent data layer. #[error("Error from state manager: {0}")] Persist(#[from] tor_persist::Error), /// The directory cache took too long to reply to us. Loading crates/tor-guardmgr/src/lib.rs +1 −1 Original line number Diff line number Diff line Loading @@ -903,7 +903,7 @@ mod test { use tor_netdir::{testnet, MdReceiver, PartialNetDir}; let statemgr = TestingStateMgr::new(); let have_lock = statemgr.try_lock().unwrap(); assert!(have_lock); assert!(have_lock.held()); let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap(); let (con, mds) = testnet::construct_network().unwrap(); let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2" Loading crates/tor-persist/src/fs.rs +8 −6 Original line number Diff line number Diff line //! Filesystem + JSON implementation of StateMgr. use crate::{Error, Result, StateMgr}; use crate::{Error, LockStatus, Result, StateMgr}; use serde::{de::DeserializeOwned, Serialize}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; Loading Loading @@ -93,16 +93,18 @@ impl StateMgr for FsStateMgr { .expect("Poisoned lock on state lockfile"); lockfile.owns_lock() } fn try_lock(&self) -> Result<bool> { fn try_lock(&self) -> Result<LockStatus> { let mut lockfile = self .inner .lockfile .lock() .expect("Poisoned lock on state lockfile"); if lockfile.owns_lock() { Ok(true) Ok(LockStatus::AlreadyHeld) } else if lockfile.try_lock()? { Ok(LockStatus::NewlyAcquired) } else { Ok(lockfile.try_lock()?) Ok(LockStatus::NoLock) } } fn load<D>(&self, key: &str) -> Result<Option<D>> Loading Loading @@ -156,7 +158,7 @@ mod test { let dir = tempfile::TempDir::new().unwrap(); let store = FsStateMgr::from_path(dir.path())?; assert!(store.try_lock()?); assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired); let stuff: HashMap<_, _> = vec![("hello".to_string(), "world".to_string())] .into_iter() .collect(); Loading @@ -179,7 +181,7 @@ mod test { assert!(matches!(store.store("xyz", &stuff4), Err(Error::NoLock))); assert!(store.try_lock()?); assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired); store.store("xyz", &stuff4)?; let stuff5: Option<HashMap<String, String>> = store.load("xyz")?; Loading Loading
crates/tor-circmgr/src/lib.rs +9 −1 Original line number Diff line number Diff line Loading @@ -210,9 +210,17 @@ impl<R: Runtime> CircMgr<R> { Ok(()) } /// Switch from having an unowned persistent state to having an owned one. /// /// Requires that we hold the lock on the state files. pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> { warn!("upgrade_to_owned_persistent_state isn't implemented."); Ok(()) } /// Flush state to the state manager, if there is any unsaved state and /// we have the lock. pub fn update_persistent_state(&self) -> Result<()> { pub fn store_persistent_state(&self) -> Result<()> { self.mgr.peek_builder().save_state()?; self.mgr .peek_builder() Loading
crates/tor-client/src/client.rs +29 −24 Original line number Diff line number Diff line Loading @@ -151,7 +151,7 @@ impl<R: Runtime> TorClient<R> { client_cfg: ClientConfig, ) -> Result<TorClient<R>> { let statemgr = FsStateMgr::from_path(state_cfg)?; if statemgr.try_lock()? { if statemgr.try_lock()?.held() { debug!("It appears we have the lock on our state files."); } else { info!( Loading Loading @@ -365,38 +365,41 @@ async fn update_persistent_state<R: Runtime>( circmgr: Weak<tor_circmgr::CircMgr<R>>, statemgr: FsStateMgr, ) { #![allow(clippy::collapsible_else_if)] // TODO: Consider moving this into tor-circmgr after we have more // TODO: Consider moving this function into tor-circmgr after we have more // experience with the state system. loop { let had_lock = statemgr.can_store(); let lock_acquired = match statemgr.try_lock() { Ok(v) => v, if let Some(circmgr) = Weak::upgrade(&circmgr) { use tor_persist::LockStatus::*; match statemgr.try_lock() { Err(e) => { error!("Unable to check lock status: {}", e); error!("Problem with state lock file: {}", e); break; } }; if !had_lock && lock_acquired { Ok(NewlyAcquired) => { info!("We now own the lock on our state files."); if let Err(e) = circmgr.upgrade_to_owned_persistent_state() { error!("Unable to upgrade to owneed state files: {}", e); break; } if let Some(circmgr) = Weak::upgrade(&circmgr) { if !had_lock { } Ok(AlreadyHeld) => { if let Err(e) = circmgr.store_persistent_state() { error!("Unable to flush circmgr state: {}", e); break; } } Ok(NoLock) => { if let Err(e) = circmgr.reload_persistent_state() { error!("Unable to reload circmgr state: {}", e); break; } } else { if let Err(e) = circmgr.update_persistent_state() { error!("Unable to flush circmgr state: {}", e); break; } } } else { debug!("Circmgr has disappeared; task exiting."); break; return; } // XXXX This delay is probably too small. // Loading @@ -406,6 +409,8 @@ async fn update_persistent_state<R: Runtime>( // changes. runtime.sleep(Duration::from_secs(60)).await; } error!("State update task is exiting prematurely."); } /// Run indefinitely, launching circuits as needed to get a good Loading Loading @@ -449,7 +454,7 @@ impl<R: Runtime> Drop for TorClient<R> { // TODO: Consider moving this into tor-circmgr after we have more // experience with the state system. fn drop(&mut self) { match self.circmgr.update_persistent_state() { match self.circmgr.store_persistent_state() { Ok(()) => info!("Flushed persistent state at exit."), Err(tor_circmgr::Error::State(tor_persist::Error::NoLock)) => { debug!("Lock not held; no state to flush.") Loading
crates/tor-client/src/err.rs +2 −2 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ pub enum Error { #[error("Protocol error while launching a stream: {0}")] Proto(#[from] tor_proto::Error), /// A protocol error while launching a stream #[error("Persist error while launching a stream: {0}")] /// An error while interfacing with the persistent data layer. #[error("Error from state manager: {0}")] Persist(#[from] tor_persist::Error), /// The directory cache took too long to reply to us. Loading
crates/tor-guardmgr/src/lib.rs +1 −1 Original line number Diff line number Diff line Loading @@ -903,7 +903,7 @@ mod test { use tor_netdir::{testnet, MdReceiver, PartialNetDir}; let statemgr = TestingStateMgr::new(); let have_lock = statemgr.try_lock().unwrap(); assert!(have_lock); assert!(have_lock.held()); let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap(); let (con, mds) = testnet::construct_network().unwrap(); let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2" Loading
crates/tor-persist/src/fs.rs +8 −6 Original line number Diff line number Diff line //! Filesystem + JSON implementation of StateMgr. use crate::{Error, Result, StateMgr}; use crate::{Error, LockStatus, Result, StateMgr}; use serde::{de::DeserializeOwned, Serialize}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; Loading Loading @@ -93,16 +93,18 @@ impl StateMgr for FsStateMgr { .expect("Poisoned lock on state lockfile"); lockfile.owns_lock() } fn try_lock(&self) -> Result<bool> { fn try_lock(&self) -> Result<LockStatus> { let mut lockfile = self .inner .lockfile .lock() .expect("Poisoned lock on state lockfile"); if lockfile.owns_lock() { Ok(true) Ok(LockStatus::AlreadyHeld) } else if lockfile.try_lock()? { Ok(LockStatus::NewlyAcquired) } else { Ok(lockfile.try_lock()?) Ok(LockStatus::NoLock) } } fn load<D>(&self, key: &str) -> Result<Option<D>> Loading Loading @@ -156,7 +158,7 @@ mod test { let dir = tempfile::TempDir::new().unwrap(); let store = FsStateMgr::from_path(dir.path())?; assert!(store.try_lock()?); assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired); let stuff: HashMap<_, _> = vec![("hello".to_string(), "world".to_string())] .into_iter() .collect(); Loading @@ -179,7 +181,7 @@ mod test { assert!(matches!(store.store("xyz", &stuff4), Err(Error::NoLock))); assert!(store.try_lock()?); assert_eq!(store.try_lock()?, LockStatus::NewlyAcquired); store.store("xyz", &stuff4)?; let stuff5: Option<HashMap<String, String>> = store.load("xyz")?; Loading