Skip to content
Snippets Groups Projects
Commit 53ed5f40 authored by Nick Mathewson's avatar Nick Mathewson :game_die:
Browse files

DirMgr: Unify error return paths

We no longer have separate return paths for recoverable and fatal
errors; instead, they are merged, and distinguished based on
recovery actions.

Since it is now possible for download() to give an error that should
_not_ destroy the previous state, it takes `&mut Box<dyn DirState>`.
This change unfortunately means that we can no longer call `state =
state.advance()`, but instead have to do some mem::swap junk with
poisoned values.  Any better solution would be a good thing.

Additionally, the reset() and advance() methods can no longer fail.

There is still a separate return path for reset-triggering errors;
I'm about to fix that.
parent d7a3fd2c
No related branches found
No related tags found
1 merge request!511DirMgr: Revise error handling to better tolerate reset-able failures
......@@ -8,7 +8,8 @@ use std::{
time::{Duration, SystemTime},
};
use crate::state::DirState;
use crate::err::BootstrapAction;
use crate::state::{DirState, PoisonedState};
use crate::DirMgrConfig;
use crate::DocSource;
use crate::{
......@@ -19,7 +20,6 @@ use crate::{
use futures::channel::oneshot;
use futures::FutureExt;
use futures::StreamExt;
use tor_checkable::TimeValidityError;
use tor_dirclient::DirResponse;
use tor_rtcompat::{Runtime, SleepProviderExt};
use tracing::{debug, info, trace, warn};
......@@ -33,6 +33,20 @@ use tor_circmgr::{CircMgr, DirInfo};
use tor_netdir::NetDir;
use tor_netdoc::doc::netstatus::ConsensusFlavor;
/// Given a Result<()>, exit the current function if it is anything other than
/// Ok(), or a nonfatal error.
macro_rules! propagate_fatal_errors {
( $e:expr ) => {
let v: Result<()> = $e;
if let Err(e) = v {
match e.bootstrap_action() {
BootstrapAction::Nonfatal => {}
_ => return Err(e),
}
}
};
}
/// If there were errors from a peer in `outcome`, record those errors by
/// marking the circuit (if any) as needing retirement, and noting the peer
/// (if any) as having failed.
......@@ -294,20 +308,15 @@ async fn fetch_multiple<R: Runtime>(
Ok(useful_responses)
}
/// Try tp update `state` by loading cached information from `dirmgr`.
/// Return true if anything changed.
///
/// Return an `Err` only on a fatal error that means we should stop
/// downloading entirely; return recovered-from errors inside the `Ok()`
/// variant.
/// Try to update `state` by loading cached information from `dirmgr`.
async fn load_once<R: Runtime>(
dirmgr: &Arc<DirMgr<R>>,
state: &mut Box<dyn DirState>,
) -> Result<Option<Error>> {
) -> Result<()> {
let missing = state.missing_docs();
let outcome: Result<Option<Error>> = if missing.is_empty() {
let outcome: Result<()> = if missing.is_empty() {
trace!("Found no missing documents; can't advance current state");
Ok(Some(Error::NoChange(DocSource::LocalCache)))
Err(Error::NoChange(DocSource::LocalCache))
} else {
trace!(
"Found {} missing documents; trying to load them",
......@@ -319,19 +328,10 @@ async fn load_once<R: Runtime>(
load_documents_from_store(&missing, store.deref())?
};
match state.add_from_cache(documents) {
Err(Error::UntimelyObject(TimeValidityError::Expired(_))) => {
// This is just an expired object from the cache; we don't need
// to call that an error. Treat it as if it were absent.
Ok(None)
}
other => other,
}
state.add_from_cache(documents)
};
if outcome.is_ok() {
dirmgr.update_status(state.bootstrap_status());
}
dirmgr.update_status(state.bootstrap_status());
outcome
}
......@@ -347,25 +347,38 @@ pub(crate) async fn load<R: Runtime>(
let mut safety_counter = 0_usize;
loop {
trace!(state=%state.describe(), "Loading from cache");
let nonfatal_err = load_once(&dirmgr, &mut state).await?;
let outcome = load_once(&dirmgr, &mut state).await;
{
let mut store = dirmgr.store.lock().expect("store lock poisoned");
dirmgr.apply_netdir_changes(&mut state, store.deref_mut())?;
}
if let Some(e) = &nonfatal_err {
debug!("Recoverable loading from cache: {}", e);
let mut no_change = false;
if let Err(e) = outcome {
if matches!(e, Error::NoChange(_)) {
no_change = true;
}
match e.bootstrap_action() {
BootstrapAction::Nonfatal => {
debug!("Recoverable error loading from cache: {}", e);
}
BootstrapAction::Fatal | BootstrapAction::Reset | BootstrapAction::Impossible => {
return Err(e);
}
}
}
if let Some(e) = state.blocking_error() {
return Err(e);
}
if state.can_advance() {
state = state.advance()?;
state = state.advance();
safety_counter = 0;
} else {
if matches!(nonfatal_err, Some(Error::NoChange(_))) {
// XXXX refactor more.
if no_change {
// TODO: Are there more nonfatal errors that mean we should
// break?
break;
}
safety_counter += 1;
......@@ -409,24 +422,28 @@ async fn download_attempt<R: Runtime>(
let doc_source = DocSource::DirServer {
source: source.clone(),
};
let opt_error =
state.add_from_download(&text, &client_req, doc_source, Some(&dirmgr.store))?;
if let Some(e) = &opt_error {
warn!("error while adding directory info: {}", e);
}
let outcome =
state.add_from_download(&text, &client_req, doc_source, Some(&dirmgr.store));
if let Some(source) = source {
if let Some(e) = opt_error {
note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
if let Err(e) = &outcome {
note_cache_error(dirmgr.circmgr()?.deref(), &source, e);
} else {
note_cache_success(dirmgr.circmgr()?.deref(), &source);
}
}
if let Err(e) = &outcome {
warn!("error while adding directory info: {}", e);
}
propagate_fatal_errors!(outcome);
}
Err(e) => {
warn!("Error when expanding directory text: {}", e);
if let Some(source) = source {
note_cache_error(dirmgr.circmgr()?.deref(), &source, &e);
}
propagate_fatal_errors!(Err(e));
}
}
}
......@@ -445,15 +462,11 @@ async fn download_attempt<R: Runtime>(
///
/// The first time that the state becomes ["usable"](Readiness::Usable), notify
/// the sender in `on_usable`.
///
/// Return Err only on a non-recoverable error. On an error that merits another
/// bootstrap attempt with the same state (after a reset), return the state and
/// an Error object in an option.
pub(crate) async fn download<R: Runtime>(
dirmgr: Weak<DirMgr<R>>,
mut state: Box<dyn DirState>,
state: &mut Box<dyn DirState>,
on_usable: &mut Option<oneshot::Sender<()>>,
) -> Result<(Box<dyn DirState>, Option<Error>)> {
) -> Result<()> {
let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
'next_state: loop {
......@@ -465,24 +478,26 @@ pub(crate) async fn download<R: Runtime>(
// state must never grow, then we'll need to move it inside.
let mut now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
load_once(&dirmgr, &mut state).await?;
let load_result = load_once(&dirmgr, state).await;
propagate_fatal_errors!(load_result);
dirmgr.runtime.wallclock()
};
// Have we gotten ourselves into a state that can't advance? If so,
// let the caller reset.
if let Some(e) = state.blocking_error() {
// XXXXX: remove.
// Report an error, if there's a source to report it about.
if let Some(source) = e.responsible_cache() {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
note_cache_error(dirmgr.circmgr()?.deref(), source, &e);
}
return Ok((state, Some(e)));
return Err(e);
}
// Skip the downloads if we can...
if state.can_advance() {
state = state.advance()?;
advance(state);
continue 'next_state;
}
// Apply any netdir changes that the state gives us.
......@@ -490,10 +505,10 @@ pub(crate) async fn download<R: Runtime>(
{
let dirmgr = upgrade_weak_ref(&dirmgr)?;
let mut store = dirmgr.store.lock().expect("store lock poisoned");
dirmgr.apply_netdir_changes(&mut state, store.deref_mut())?;
dirmgr.apply_netdir_changes(state, store.deref_mut())?;
}
if state.is_ready(Readiness::Complete) {
return Ok((state, None));
return Ok(());
}
let reset_time = no_more_than_a_week_from(runtime.wallclock(), state.reset_time());
......@@ -515,7 +530,7 @@ pub(crate) async fn download<R: Runtime>(
futures::select_biased! {
_ = reset_timeout_future => {
info!("Download attempt timed out completely; resetting download state.");
state = state.reset()?;
reset(state);
continue 'next_state;
}
_ = FutureExt::fuse(runtime.sleep(delay)) => {}
......@@ -528,10 +543,11 @@ pub(crate) async fn download<R: Runtime>(
now = {
let dirmgr = upgrade_weak_ref(&dirmgr)?;
futures::select_biased! {
outcome = download_attempt(&dirmgr, &mut state, parallelism.into()).fuse() => {
outcome = download_attempt(&dirmgr, state, parallelism.into()).fuse() => {
if let Err(e) = outcome {
warn!("Error while downloading: {}", e);
continue 'next_attempt;
warn!("Error while downloading: {}", e);
propagate_fatal_errors!(Err(e));
continue 'next_attempt;
}
}
_ = runtime.sleep_until_wallclock(reset_time).fuse() => {
......@@ -539,7 +555,7 @@ pub(crate) async fn download<R: Runtime>(
// example) we're downloading the last few
// microdescriptors on a consensus that now
// we're ready to replace.
state = state.reset()?;
reset(state);
continue 'next_state;
},
};
......@@ -551,12 +567,13 @@ pub(crate) async fn download<R: Runtime>(
{
let dirmgr = upgrade_weak_ref(&dirmgr)?;
let mut store = dirmgr.store.lock().expect("store lock poisoned");
dirmgr.apply_netdir_changes(&mut state, store.deref_mut())?;
let outcome = dirmgr.apply_netdir_changes(state, store.deref_mut());
propagate_fatal_errors!(outcome);
}
// Exit if there is nothing more to download.
if state.is_ready(Readiness::Complete) {
return Ok((state, None));
return Ok(());
}
// Report usable-ness if appropriate.
......@@ -574,12 +591,12 @@ pub(crate) async fn download<R: Runtime>(
if let Some(source) = e.responsible_cache() {
note_cache_error(dirmgr.circmgr()?.deref(), source, &e);
}
return Ok((state, Some(e)));
return Err(e);
}
if state.can_advance() {
// We have enough info to advance to another state.
state = state.advance()?;
advance(state);
continue 'next_state;
}
}
......@@ -588,10 +605,26 @@ pub(crate) async fn download<R: Runtime>(
warn!(n_attempts=retry_config.n_attempts(),
state=%state.describe(),
"Unable to advance downloading state");
return Ok((state, Some(Error::CantAdvanceState)));
return Err(Error::CantAdvanceState);
}
}
/// Replace `state` with `state.reset()`.
fn reset(state: &mut Box<dyn DirState>) {
let mut this_state: Box<dyn DirState> = Box::new(PoisonedState);
std::mem::swap(&mut this_state, state);
this_state = this_state.reset();
std::mem::swap(&mut this_state, state);
}
/// Replace `state` with `state.advance()`.
fn advance(state: &mut Box<dyn DirState>) {
let mut this_state: Box<dyn DirState> = Box::new(PoisonedState);
std::mem::swap(&mut this_state, state);
this_state = this_state.advance();
std::mem::swap(&mut this_state, state);
}
/// Helper: Clamp `v` so that it is no more than one week from `now`.
///
/// If `v` is absent, return the time that's one week from now.
......@@ -706,7 +739,7 @@ mod test {
})
.collect()
}
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<Option<Error>> {
fn add_from_cache(&mut self, docs: HashMap<DocId, DocumentText>) -> Result<()> {
let mut changed = false;
for id in docs.keys() {
if let DocId::Microdesc(id) = id {
......@@ -717,9 +750,9 @@ mod test {
}
}
if changed {
Ok(None)
Ok(())
} else {
Ok(Some(Error::NoChange(DocSource::LocalCache)))
Err(Error::NoChange(DocSource::LocalCache))
}
}
fn add_from_download(
......@@ -728,7 +761,7 @@ mod test {
_request: &ClientRequest,
_source: DocSource,
_storage: Option<&Mutex<DynStore>>,
) -> Result<Option<Error>> {
) -> Result<()> {
let mut changed = false;
for token in text.split_ascii_whitespace() {
if let Ok(v) = hex::decode(token) {
......@@ -741,26 +774,26 @@ mod test {
}
}
if changed {
Ok(None)
Ok(())
} else {
Ok(Some(Error::NoChange(DocSource::LocalCache)))
Err(Error::NoChange(DocSource::LocalCache))
}
}
fn dl_config(&self) -> DownloadSchedule {
DownloadSchedule::default()
}
fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
fn advance(self: Box<Self>) -> Box<dyn DirState> {
if self.can_advance() {
Ok(Box::new(Self::new2()))
Box::new(Self::new2())
} else {
Ok(self)
self
}
}
fn reset_time(&self) -> Option<SystemTime> {
None
}
fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
Ok(Box::new(Self::new1()))
fn reset(self: Box<Self>) -> Box<dyn DirState> {
Box::new(Self::new1())
}
}
......@@ -785,13 +818,13 @@ mod test {
assert!(result.is_ready(Readiness::Complete));
// Try a bootstrap that could (but won't!) download.
let state = Box::new(DemoState::new1());
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
let mut on_usable = None;
let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
assert!(result.0.is_ready(Readiness::Complete));
assert!(state.is_ready(Readiness::Complete));
});
}
......@@ -821,11 +854,11 @@ mod test {
let mgr = Arc::new(mgr);
let mut on_usable = None;
let state = Box::new(DemoState::new1());
let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
let mut state: Box<dyn DirState> = Box::new(DemoState::new1());
super::download(Arc::downgrade(&mgr), &mut state, &mut on_usable)
.await
.unwrap();
assert!(result.0.is_ready(Readiness::Complete));
assert!(state.is_ready(Readiness::Complete));
});
}
}
......@@ -70,6 +70,7 @@ mod storage;
pub mod filter;
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
use crate::err::BootstrapAction;
#[cfg(not(feature = "experimental-api"))]
use crate::shared_ref::SharedMutArc;
#[cfg(feature = "experimental-api")]
......@@ -79,6 +80,7 @@ use postage::watch;
pub use retry::{DownloadSchedule, DownloadScheduleBuilder};
use tor_circmgr::CircMgr;
use tor_dirclient::SourceInfo;
use tor_error::into_internal;
use tor_netdir::{DirEvent, MdReceiver, NetDir, NetDirProvider};
use async_trait::async_trait;
......@@ -561,24 +563,34 @@ impl<R: Runtime> DirMgr<R> {
let mut retry_delay = retry_config.schedule();
'retry_attempt: for _ in retry_config.attempts() {
let (newstate, recoverable_err) =
bootstrap::download(Weak::clone(&weak), state, &mut on_complete).await?;
state = newstate;
let outcome =
bootstrap::download(Weak::clone(&weak), &mut state, &mut on_complete).await;
if let Some(err) = recoverable_err {
if let Err(err) = outcome {
if state.is_ready(Readiness::Usable) {
usable = true;
info!("Unable to completely download a directory: {}. Nevertheless, the directory is usable, so we'll pause for now.", err);
break 'retry_attempt;
}
match err.bootstrap_action() {
BootstrapAction::Nonfatal => {
return Err(into_internal!(
"Nonfatal error should not have propagated here"
)(err)
.into());
}
BootstrapAction::Reset => {}
BootstrapAction::Fatal | BootstrapAction::Impossible => return Err(err),
}
let delay = retry_delay.next_delay(&mut rand::thread_rng());
warn!(
"Unable to download a usable directory: {}. We will restart in {:?}.",
err, delay
);
runtime.sleep(delay).await;
state = state.reset()?;
state = state.reset();
} else {
info!("Directory is complete.");
usable = true;
......@@ -605,7 +617,7 @@ impl<R: Runtime> DirMgr<R> {
Some(t) => runtime.sleep_until_wallclock(t).await,
None => return Ok(()),
}
state = state.reset()?;
state = state.reset();
}
}
......@@ -864,13 +876,11 @@ impl<R: Runtime> DirMgr<R> {
}
/// If `state` has netdir changes to apply, apply them to our netdir.
///
/// Return `true` if `state` just replaced the netdir.
fn apply_netdir_changes(
self: &Arc<Self>,
state: &mut Box<dyn DirState>,
store: &mut dyn Store,
) -> Result<bool> {
) -> Result<()> {
if let Some(change) = state.get_netdir_change() {
match change {
NetDirChange::AttemptReplace {
......@@ -884,7 +894,7 @@ impl<R: Runtime> DirMgr<R> {
.netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None"))
{
debug!("Got a new NetDir, but it doesn't have enough guards yet.");
return Ok(false);
return Ok(());
}
}
let is_stale = {
......@@ -917,7 +927,7 @@ impl<R: Runtime> DirMgr<R> {
// Now that a consensus is usable, older consensuses may
// need to expire.
store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
Ok(true)
Ok(())
}
NetDirChange::AddMicrodescs(mds) => {
self.netdir.mutate(|netdir| {
......@@ -927,11 +937,11 @@ impl<R: Runtime> DirMgr<R> {
Ok(())
})?;
self.events.publish(DirEvent::NewDescriptors);
Ok(false)
Ok(())
}
}
} else {
Ok(false)
Ok(())
}
}
}
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment