Commit 7a52e77e authored by eta's avatar eta
Browse files

tor-dirmgr/state.rs: add new NetDirChange API, consume it

- The new DirState::get_netdir_change() API lets the state machine
  export a NetDirChange: a request to either replace the current netdir,
  or add microdescs to it.
- bootstrap.rs now consumes this new API, even though nothing implements
  it yet.
- This will let us implement GetMicrodescsState without having to
  directly mutate the netdir. The calling code also handles checking the
  netdir against the circmgr for sufficiency, and updating the consensus
  metadata in the store, meaning the revised GetMicrodescsState will not
  have to perform these tasks.
parent cad815e3
Loading
Loading
Loading
Loading
+72 −3
Original line number Diff line number Diff line
//! Functions to download or load directory objects, using the
//! state machines in the `states` module.

use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::{
    collections::HashMap,
    sync::{Arc, Weak},
    time::{Duration, SystemTime},
};

use crate::state::DirState;
use crate::state::{DirState, NetDirChange, WriteNetDir};
use crate::{
    docid::{self, ClientRequest},
    upgrade_weak_ref, DirMgr, DocId, DocQuery, DocumentText, Error, Readiness, Result,
@@ -28,7 +28,7 @@ use once_cell::sync::Lazy;
#[cfg(test)]
use std::sync::Mutex;
use tor_circmgr::{CircMgr, DirInfo};
use tor_netdir::NetDir;
use tor_netdir::{MdReceiver, NetDir};
use tor_netdoc::doc::netstatus::ConsensusFlavor;

/// If there were errors from a peer in `outcome`, record those errors by
@@ -328,6 +328,60 @@ pub(crate) async fn load<R: Runtime>(
    Ok(state)
}

/// If `state` has netdir changes to apply, apply them to the netdir in `dirmgr`.
///
/// Return `true` if `state` just replaced the netdir.
fn apply_netdir_changes<R: Runtime>(
    dirmgr: &Arc<DirMgr<R>>,
    state: &mut Box<dyn DirState>,
    store: &mut dyn Store,
) -> Result<bool> {
    if let Some(change) = state.get_netdir_change() {
        match change {
            NetDirChange::AttemptReplace {
                netdir,
                consensus_meta,
            } => {
                // Check the new netdir is sufficient, if we have a circmgr.
                // (Unwraps are fine because the `Option` is `Some` until we take it.)
                if let Some(ref cm) = dirmgr.circmgr {
                    if !cm.netdir_is_sufficient(netdir.as_ref().expect("AttemptReplace had None")) {
                        return Ok(false);
                    }
                }
                let cfg = dirmgr.config.get();
                dirmgr.netdir.mutate(|dirmgr_netdir| {
                    // Seems like this netdir is usable, so let's use it.
                    *dirmgr_netdir = netdir.take().expect("AttemptReplace had None");
                    dirmgr_netdir.replace_overridden_parameters(&cfg.override_net_params);
                    Ok(())
                })?;
                dirmgr.netdir_consensus_changed();
                dirmgr.netdir_descriptors_changed();

                info!("Marked consensus usable.");
                store.mark_consensus_usable(consensus_meta)?;
                // Now that a consensus is usable, older consensuses may
                // need to expire.
                store.expire_all(&crate::storage::EXPIRATION_DEFAULTS)?;
                Ok(true)
            }
            NetDirChange::AddMicrodescs(mds) => {
                dirmgr.netdir.mutate(|netdir| {
                    for md in mds {
                        netdir.add_microdesc(md);
                    }
                    Ok(())
                })?;
                dirmgr.netdir_descriptors_changed();
                Ok(false)
            }
        }
    } else {
        Ok(false)
    }
}

/// Helper: Make a set of download attempts for the current directory state,
/// and on success feed their results into the state object.
///
@@ -429,6 +483,13 @@ pub(crate) async fn download<R: Runtime>(
            state = state.advance()?;
            continue 'next_state;
        }
        // Apply any netdir changes that the state gives us.
        // FIXME(eta): Don't throw away the return value (once we actually use this API).
        {
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
            let mut store = dirmgr.store.lock().expect("store lock poisoned");
            apply_netdir_changes(&dirmgr, &mut state, store.deref_mut())?;
        }
        if state.is_ready(Readiness::Complete) {
            return Ok((state, None));
        }
@@ -488,6 +549,14 @@ pub(crate) async fn download<R: Runtime>(
                dirmgr.runtime.wallclock()
            };

            // Apply any netdir changes that the state gives us.
            // FIXME(eta): Don't throw away the return value (once we actually use this API).
            {
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
                let mut store = dirmgr.store.lock().expect("store lock poisoned");
                apply_netdir_changes(&dirmgr, &mut state, store.deref_mut())?;
            }

            // Exit if there is nothing more to download.
            if state.is_ready(Readiness::Complete) {
                return Ok((state, None));
+25 −0
Original line number Diff line number Diff line
@@ -48,6 +48,26 @@ use tor_netdoc::{
};
use tor_rtcompat::Runtime;

/// A change to the currently running `NetDir`, returned by the state machines in this module.
#[allow(dead_code)] // FIXME
pub(crate) enum NetDirChange<'a> {
    /// If the provided `NetDir` is suitable for use (i.e. the caller determines it can build
    /// circuits with it), replace the current `NetDir` with it.
    ///
    /// The caller must call `DirState::on_netdir_replaced` if the replace was successful.
    AttemptReplace {
        /// The netdir to replace the current one with, if it's usable.
        ///
        /// The `Option` is always `Some` when returned from the state machine; it's there
        /// so that the caller can call `.take()` to avoid cloning the netdir.
        netdir: &'a mut Option<NetDir>,
        /// The consensus metadata for this netdir.
        consensus_meta: &'a ConsensusMeta,
    },
    /// Add the provided microdescriptors to the current `NetDir`.
    AddMicrodescs(Vec<Microdesc>),
}

/// A "state" object used to represent our progress in downloading a
/// directory.
///
@@ -76,6 +96,11 @@ pub(crate) trait DirState: Send {
    fn missing_docs(&self) -> Vec<DocId>;
    /// Describe whether this state has reached `ready` status.
    fn is_ready(&self, ready: Readiness) -> bool;
    /// If the state object wants to make changes to the currently running `NetDir`,
    /// return the proposed changes.
    fn get_netdir_change(&mut self) -> Option<NetDirChange<'_>> {
        None
    }
    /// Return true if this state can advance to another state via its
    /// `advance` method.
    fn can_advance(&self) -> bool;