Skip to content
Snippets Groups Projects
Commit db2ca6eb authored by Nick Mathewson's avatar Nick Mathewson :game_die:
Browse files

Merge branch 'bootstrap_reporting'

parents ca358736 d1c362f3
No related branches found
No related tags found
1 merge request!245Expose and collect bootstrap status information from tor-chanmgr and tor-dirmgr
......@@ -965,6 +965,12 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce81f49ae8a0482e4c55ea62ebbd7e5a686af544c00b9d090bba3ff9be97b3d"
[[package]]
name = "float_eq"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1d53499e94f9a7828e63c574adf62bcade7f358c3738f9ea70d7c2edb61023d"
[[package]]
name = "fnv"
version = "1.0.7"
......@@ -2644,9 +2650,12 @@ name = "tor-chanmgr"
version = "0.0.3"
dependencies = [
"async-trait",
"derive_more",
"float_eq",
"futures",
"futures-await-test",
"hex-literal",
"postage",
"thiserror",
"tor-linkspec",
"tor-llcrypto",
......@@ -2755,6 +2764,7 @@ dependencies = [
"derive_builder",
"digest 0.10.1",
"event-listener",
"float_eq",
"fslock",
"futures",
"futures-await-test",
......
......@@ -7,7 +7,6 @@
use crate::address::IntoTorAddr;
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
use futures::channel::oneshot;
use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort};
use tor_config::MutCfg;
use tor_dirmgr::DirEvent;
......@@ -240,11 +239,6 @@ impl<R: Runtime> TorClient<R> {
let status_receiver = status::BootstrapEvents {
inner: status_receiver,
};
// TODO(nickm): we should use a real set of information sources here.
// This is just temporary.
let (tor_ready_sender, tor_ready_receiver) = oneshot::channel();
runtime.spawn(status::report_status(status_sender, tor_ready_receiver))?;
let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
let circmgr =
tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))?;
......@@ -255,6 +249,17 @@ impl<R: Runtime> TorClient<R> {
)
.await?;
// TODO: This happens too late. We need to create dirmgr and get its
// event stream, and only THEN get its status.
let conn_status = chanmgr.bootstrap_events();
let dir_status = dirmgr.bootstrap_events();
runtime.spawn(status::report_status(
status_sender,
conn_status,
dir_status,
))?;
circmgr.update_network_parameters(dirmgr.netdir().params());
// Launch a daemon task to inform the circmgr about new
......@@ -285,9 +290,6 @@ impl<R: Runtime> TorClient<R> {
let client_isolation = IsolationToken::new();
// At this point, we're bootstrapped.
let _ = tor_ready_sender.send(());
Ok(TorClient {
runtime,
client_isolation,
......
//! Code to collect and publish information about a client's bootstrapping
//! status.
use std::{borrow::Cow, fmt};
use std::{borrow::Cow, fmt, time::SystemTime};
use derive_more::Display;
use futures::{channel::oneshot, future, Stream, StreamExt};
use futures::{Stream, StreamExt};
use tor_chanmgr::{ConnBlockage, ConnStatus, ConnStatusEvents};
use tor_dirmgr::{DirBootstrapEvents, DirBootstrapStatus};
use tracing::debug;
/// Information about how ready a [`crate::TorClient`] is to handle requests.
///
......@@ -20,9 +23,10 @@ use futures::{channel::oneshot, future, Stream, StreamExt};
// its data.
#[derive(Debug, Clone, Default)]
pub struct BootstrapStatus {
/// A placeholder field: we'll be replacing this as the branch gets support
/// for more information sources.
ready: bool,
/// Status for our connection to the tor network
conn_status: ConnStatus,
/// Status for our directory information.
dir_status: DirBootstrapStatus,
}
impl BootstrapStatus {
......@@ -31,11 +35,8 @@ impl BootstrapStatus {
///
/// 0 is defined as "just started"; 1 is defined as "ready to use."
pub fn as_frac(&self) -> f32 {
if self.ready {
1.0
} else {
0.0
}
// Coefficients chosen arbitrarily.
self.conn_status.frac() * 0.15 + self.dir_status.frac_at(SystemTime::now()) * 0.85
}
/// Return true if the status indicates that the client is ready for
......@@ -44,7 +45,8 @@ impl BootstrapStatus {
/// For the purposes of this function, the client is "ready for traffic" if,
/// as far as we know, we can start acting on a new client request immediately.
pub fn ready_for_traffic(&self) -> bool {
self.ready
let now = SystemTime::now();
self.conn_status.usable() && self.dir_status.usable_at(now)
}
/// If the client is unable to make forward progress for some reason, return
......@@ -66,8 +68,23 @@ impl BootstrapStatus {
/// can't make connections to the internet" rather than "You are
/// not on the internet."
pub fn blocked(&self) -> Option<Blockage> {
// TODO(nickm): implement this or remove it.
None
if let Some(b) = self.conn_status.blockage() {
let message = b.to_string().into();
let kind = b.into();
Some(Blockage { kind, message })
} else {
None
}
}
/// Adjust this status based on new connection-status information.
fn apply_conn_status(&mut self, status: ConnStatus) {
self.conn_status = status;
}
/// Adjust this status based on new directory-status information.
fn apply_dir_status(&mut self, status: DirBootstrapStatus) {
self.dir_status = status;
}
}
......@@ -87,15 +104,25 @@ pub struct Blockage {
#[derive(Clone, Debug, Display)]
#[non_exhaustive]
pub enum BlockageKind {
/// It looks like we can't make connections to the internet.
#[display(fmt = "Unable to connect to the internet")]
NoInternet,
/// It looks like we can't reach any Tor relays.
#[display(fmt = "Unable to reach Tor")]
/// There is some kind of problem with connecting to the network.
#[display(fmt = "We seem to be offline")]
Offline,
/// We can connect, but our connections seem to be filtered.
#[display(fmt = "Our internet connection seems filtered")]
Filtering,
/// We have some other kind of problem connecting to Tor
#[display(fmt = "Can't reach the Tor network")]
CantReachTor,
/// We've been unable to download our directory information for some reason.
#[display(fmt = "Stalled fetching a Tor directory")]
DirectoryStalled,
}
impl From<ConnBlockage> for BlockageKind {
fn from(b: ConnBlockage) -> BlockageKind {
match b {
ConnBlockage::NoTcp => BlockageKind::Offline,
ConnBlockage::NoHandshake => BlockageKind::Filtering,
_ => BlockageKind::CantReachTor,
}
}
}
impl fmt::Display for BootstrapStatus {
......@@ -109,8 +136,11 @@ impl fmt::Display for BootstrapStatus {
if let Some(problem) = self.blocked() {
write!(f, "Stuck at {}%: {}", percent, problem)
} else {
// TODO(nickm): describe what we're doing.
write!(f, "{}%", percent)
write!(
f,
"{}%: {}; {}",
percent, &self.conn_status, &self.dir_status
)
}
}
}
......@@ -126,17 +156,27 @@ impl fmt::Display for BootstrapStatus {
/// dropped.
pub(crate) async fn report_status(
mut sender: postage::watch::Sender<BootstrapStatus>,
ready: oneshot::Receiver<()>,
conn_status: ConnStatusEvents,
dir_status: DirBootstrapEvents,
) {
{
sender.borrow_mut().ready = false;
}
if ready.await.is_ok() {
sender.borrow_mut().ready = true;
/// Internal enumeration to combine incoming status changes.
enum Event {
/// A connection status change
Conn(ConnStatus),
/// A directory status change
Dir(DirBootstrapStatus),
}
let mut stream =
futures::stream::select(conn_status.map(Event::Conn), dir_status.map(Event::Dir));
// wait forever.
future::pending::<()>().await;
while let Some(event) = stream.next().await {
let mut b = sender.borrow_mut();
match event {
Event::Conn(e) => b.apply_conn_status(e),
Event::Dir(e) => b.apply_dir_status(e),
}
debug!("{}", *b);
}
}
/// A [`Stream`] of [`BootstrapStatus`] events.
......
......@@ -19,11 +19,14 @@ tor-linkspec = { path="../tor-linkspec", version = "0.0.3"}
tor-llcrypto = { path="../tor-llcrypto", version = "0.0.3"}
async-trait = "0.1.2"
derive_more = "0.99"
futures = "0.3"
postage = { version = "0.4", default-features = false, features = ["futures-traits"] }
tracing = "0.1.18"
thiserror = "1"
[dev-dependencies]
float_eq = "0.7"
futures-await-test = "0.3.0"
hex-literal = "0.3"
tor-rtmock = { path="../tor-rtmock", version = "0.0.3"}
......
//! Implement a concrete type to build channels.
use crate::Error;
use std::sync::Mutex;
use crate::{event::ChanMgrEventSender, Error};
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_llcrypto::pk;
......@@ -16,16 +18,19 @@ use futures::task::SpawnExt;
pub(crate) struct ChanBuilder<R: Runtime> {
/// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
runtime: R,
/// Used to update our bootstrap reporting status.
event_sender: Mutex<ChanMgrEventSender>,
/// Object to build TLS connections.
tls_connector: <R as TlsProvider<R::TcpStream>>::Connector,
}
impl<R: Runtime> ChanBuilder<R> {
/// Construct a new ChanBuilder.
pub(crate) fn new(runtime: R) -> Self {
pub(crate) fn new(runtime: R, event_sender: ChanMgrEventSender) -> Self {
let tls_connector = runtime.tls_connector();
ChanBuilder {
runtime,
event_sender: Mutex::new(event_sender),
tls_connector,
}
}
......@@ -69,9 +74,23 @@ impl<R: Runtime> ChanBuilder<R> {
tracing::info!("Negotiating TLS with {}", addr);
{
self.event_sender
.lock()
.expect("Lock poisoned")
.record_attempt();
}
// Establish a TCP connection.
let stream = self.runtime.connect(addr).await?;
{
self.event_sender
.lock()
.expect("Lock poisoned")
.record_tcp_success();
}
// TODO: add a random hostname here if it will be used for SNI?
let tls = self
.tls_connector
......@@ -82,6 +101,13 @@ impl<R: Runtime> ChanBuilder<R> {
.peer_certificate()?
.ok_or(Error::Internal("TLS connection with no peer certificate"))?;
{
self.event_sender
.lock()
.expect("Lock poisoned")
.record_tls_finished();
}
// 2. Set up the channel.
let mut builder = ChannelBuilder::new();
builder.set_declared_addr(*addr);
......@@ -90,6 +116,13 @@ impl<R: Runtime> ChanBuilder<R> {
let chan = chan.check(target, &peer_cert, Some(now))?;
let (chan, reactor) = chan.finish().await?;
{
self.event_sender
.lock()
.expect("Lock poisoned")
.record_handshake_done();
}
// 3. Launch a task to run the channel reactor.
self.runtime.spawn(async {
let _ = reactor.run().await;
......@@ -164,7 +197,8 @@ mod test {
client_rt.jump_to(now);
// Create the channelbuilder that we want to test.
let builder = ChanBuilder::new(client_rt);
let (snd, _rcv) = crate::event::channel();
let builder = ChanBuilder::new(client_rt, snd);
let (r1, r2): (Result<Channel>, Result<LocalStream>) = futures::join!(
async {
......
//! Code for exporting events from the channel manager.
#![allow(dead_code, unreachable_pub)]
use futures::{Stream, StreamExt};
use postage::watch;
use std::{
fmt,
time::{Duration, Instant},
};
/// The status of our connection to the internet.
#[derive(Default, Debug, Clone)]
pub struct ConnStatus {
/// Have we been able to make TCP connections?
///
/// True if we've been able to make outgoing connections recently.
/// False if we've definitely been failing.
/// None if we haven't succeeded yet, but it's too early to say if
/// that's a problem.
online: Option<bool>,
/// Have we been able to successfully negotiate full Tor handshakes?
///
/// True if we've been able to make TLS sessions recently.
/// False if we've definitely been failing.
/// None if we haven't succeeded yet, but it's too early to say if
/// that's a problem.
tls_works: Option<bool>,
}
/// A problem detected while connecting to the Tor network.
#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
#[non_exhaustive]
pub enum ConnBlockage {
#[display(fmt = "unable to connect to the internet")]
/// We haven't been able to make successful TCP connections.
NoTcp,
/// We've made TCP connections, but our TLS connections either failed, or
/// got hit by an attempted man-in-the-middle attack.
#[display(fmt = "our internet connection seems to be filtered")]
NoHandshake,
}
impl ConnStatus {
/// Return true if this status is equal to `other`.
///
/// Note:(This would just be a PartialEq implementation, but I'm not sure I
/// want to expose that PartialEq for this struct.)
fn eq(&self, other: &ConnStatus) -> bool {
self.online == other.online && self.tls_works == other.tls_works
}
/// Return true if this status indicates that we can successfully open Tor channels.
pub fn usable(&self) -> bool {
self.online == Some(true) && self.tls_works == Some(true)
}
/// Return a float representing "how bootstrapped" we are with respect to
/// connecting to the Tor network, where 0 is "not at all" and 1 is
/// "successful".
///
/// Callers _should not_ depend on the specific meaning of any particular
/// fraction; we may change these fractions in the future.
pub fn frac(&self) -> f32 {
match self {
Self {
online: Some(true),
tls_works: Some(true),
} => 1.0,
Self {
online: Some(true), ..
} => 0.5,
_ => 0.0,
}
}
/// Return the cause of why we aren't able to connect to the Tor network,
/// if we think we're stuck.
pub fn blockage(&self) -> Option<ConnBlockage> {
match self {
Self {
online: Some(false),
..
} => Some(ConnBlockage::NoTcp),
Self {
tls_works: Some(false),
..
} => Some(ConnBlockage::NoHandshake),
_ => None,
}
}
}
impl fmt::Display for ConnStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
ConnStatus {
online: Some(false),
..
} => write!(f, "unable to connect to the internet"),
ConnStatus {
tls_works: None, ..
} => write!(f, "handshaking with Tor relays"),
ConnStatus {
tls_works: Some(false),
..
} => write!(f, "unable to handshake with Tor relays"),
ConnStatus {
online: Some(true),
tls_works: Some(true),
} => write!(f, "connecting successfully"),
}
}
}
/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
///
/// This stream is lossy; a reader might not see some events on the stream, if
/// they are produced faster than the reader can consume. In that case, the
/// reader will see more recent updates, and miss older ones.
///
/// Note that the bootstrap status is not monotonic: we might become less
/// bootstrapped than we were before. (For example, the internet could go
/// down.)
#[derive(Clone)]
pub struct ConnStatusEvents {
/// The receiver that implements this stream.
///
/// (We wrap it in a new type here so that we can replace the implementation
/// later on if we need to.)
inner: watch::Receiver<ConnStatus>,
}
impl fmt::Debug for ConnStatusEvents {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnStatusEvents").finish_non_exhaustive()
}
}
impl Stream for ConnStatusEvents {
type Item = ConnStatus;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
/// Crate-internal view of "how connected are we to the internet?"
///
/// This is a more complex and costly structure than ConnStatus, so we track
/// this here, and only expose the minimum via ConnStatus over a
/// `postage::watch`. Later, we might want to expose more of this information.
//
// TODO: Eventually we should add some ability to reset our bootstrap status, if
// our connections start failing.
#[derive(Debug, Clone)]
struct ChanMgrStatus {
/// When did we first get initialized?
startup: Instant,
/// Since we started, how many channels have we tried to build?
n_attempts: usize,
/// When (if ever) have we made a TCP connection to (what we hoped was) a
/// Tor relay?
///
/// If we don't reach this point, we're probably not on the internet.
///
/// If we get no further than this, we're probably having our TCP
/// connections captured or replaced.
last_tcp_success: Option<Instant>,
/// When (if ever) have we successfully finished a TLS handshake to (what we
/// hoped was) a Tor relay?
///
/// If we get no further than this, we might be facing a TLS MITM attack.
//
// TODO: We don't actually use this information yet: our output doesn't
// distinguish filtering where TLS succeeds but gets MITM'd from filtering
// where TLS fails.
last_tls_success: Option<Instant>,
/// When (if ever) have we successfully finished the inner Tor handshake
/// with a relay?
///
/// If we get to this point, we can successfully talk to something that
/// holds the private key that it's supposed to.
last_chan_success: Option<Instant>,
}
impl ChanMgrStatus {
/// Construct a new ChanMgr status.
///
/// It will be built as having been initialized at the time `now`.
fn new_at(now: Instant) -> ChanMgrStatus {
ChanMgrStatus {
startup: now,
n_attempts: 0,
last_tcp_success: None,
last_tls_success: None,
last_chan_success: None,
}
}
/// Return a [`ConnStatus`] for the current state, at time `now`.
///
/// (The time is necessary because a lack of success doesn't indicate a
/// problem until enough time has passed.)
fn conn_status_at(&self, now: Instant) -> ConnStatus {
/// How long do we need to be online before we'll acknowledge failure?
const MIN_DURATION: Duration = Duration::from_secs(60);
/// How many attempts do we need to launch before we'll acknowledge failure?
const MIN_ATTEMPTS: usize = 6;
// If set, it's too early to determine failure.
let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
let online = match (self.last_tcp_success.is_some(), early) {
(true, _) => Some(true),
(_, true) => None,
(false, false) => Some(false),
};
let tls_works = match (self.last_chan_success.is_some(), early) {
(true, _) => Some(true),
(_, true) => None,
(false, false) => Some(false),
};
ConnStatus { online, tls_works }
}
/// Note that an attempt to connect has been started.
fn record_attempt(&mut self) {
self.n_attempts += 1;
}
/// Note that we've successfully done a TCP handshake with an alleged relay.
fn record_tcp_success(&mut self, now: Instant) {
self.last_tcp_success = Some(now);
}
/// Note that we've completed a TLS handshake with an alleged relay.
///
/// (Its identity won't be verified till the next step.)
fn record_tls_finished(&mut self, now: Instant) {
self.last_tls_success = Some(now);
}
/// Note that we've completed a Tor handshake with a relay.
///
/// (This includes performing the TLS handshake, and verifying that the
/// relay was indeed the one that we wanted to reach.)
fn record_handshake_done(&mut self, now: Instant) {
self.last_chan_success = Some(now);
}
}
/// Object that manages information about a `ChanMgr`'s status, and sends
/// information about connectivity changes over an asynchronous channel
pub(crate) struct ChanMgrEventSender {
/// The last ConnStatus that we sent over the channel.
last_conn_status: ConnStatus,
/// The unsummarized status information from the ChanMgr.
mgr_status: ChanMgrStatus,
/// The channel that we use for sending ConnStatus information.
sender: watch::Sender<ConnStatus>,
}
impl ChanMgrEventSender {
/// If the status has changed as of `now`, tell any listeners.
///
/// (This takes a time because we need to know how much time has elapsed
/// without successful attempts.)
///
/// # Limitations
///
/// We are dependent on calls to `record_attempt()` and similar methods to
/// actually invoke this function; if they were never called, we'd never
/// notice that we had gone too long without building connections. That's
/// okay for now, though, since any Tor client will immediately start
/// building circuits, which will launch connection attempts until one
/// succeeds or the client gives up entirely.
fn push_at(&mut self, now: Instant) {
let status = self.mgr_status.conn_status_at(now);
if !status.eq(&self.last_conn_status) {
self.last_conn_status = status.clone();
let mut b = self.sender.borrow_mut();
*b = status;
}
}
/// Note that an attempt to connect has been started.
pub(crate) fn record_attempt(&mut self) {
self.mgr_status.record_attempt();
self.push_at(Instant::now());
}
/// Note that we've successfully done a TCP handshake with an alleged relay.
pub(crate) fn record_tcp_success(&mut self) {
let now = Instant::now();
self.mgr_status.record_tcp_success(now);
self.push_at(now);
}
/// Note that we've completed a TLS handshake with an alleged relay.
///
/// (Its identity won't be verified till the next step.)
pub(crate) fn record_tls_finished(&mut self) {
let now = Instant::now();
self.mgr_status.record_tls_finished(now);
self.push_at(now);
}
/// Note that we've completed a Tor handshake with a relay.
///
/// (This includes performing the TLS handshake, and verifying that the
/// relay was indeed the one that we wanted to reach.)
pub(crate) fn record_handshake_done(&mut self) {
let now = Instant::now();
self.mgr_status.record_handshake_done(now);
self.push_at(now);
}
}
/// Create a new channel for sending connectivity status events to other crates.
pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
let (sender, receiver) = watch::channel();
let receiver = ConnStatusEvents { inner: receiver };
let sender = ChanMgrEventSender {
last_conn_status: ConnStatus::default(),
mgr_status: ChanMgrStatus::new_at(Instant::now()),
sender,
};
(sender, receiver)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
mod test {
use super::*;
use float_eq::assert_float_eq;
/// Tolerance for float comparison.
const TOL: f32 = 0.00001;
#[test]
fn status_basics() {
let s1 = ConnStatus::default();
assert_eq!(s1.to_string(), "connecting to the internet");
assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
assert!(s1.eq(&s1));
assert!(s1.blockage().is_none());
let s2 = ConnStatus {
online: Some(false),
tls_works: None,
};
assert_eq!(s2.to_string(), "unable to connect to the internet");
assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
assert!(s2.eq(&s2));
assert!(!s2.eq(&s1));
assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
assert_eq!(
s2.blockage().unwrap().to_string(),
"unable to connect to the internet"
);
let s3 = ConnStatus {
online: Some(true),
tls_works: None,
};
assert_eq!(s3.to_string(), "handshaking with Tor relays");
assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
assert_eq!(s3.blockage(), None);
assert!(!s3.eq(&s1));
let s4 = ConnStatus {
online: Some(true),
tls_works: Some(false),
};
assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
assert_eq!(
s4.blockage().unwrap().to_string(),
"our internet connection seems to be filtered"
);
assert!(!s4.eq(&s1));
assert!(!s4.eq(&s2));
assert!(!s4.eq(&s3));
assert!(s4.eq(&s4));
let s5 = ConnStatus {
online: Some(true),
tls_works: Some(true),
};
assert_eq!(s5.to_string(), "connecting successfully");
assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
assert!(s5.blockage().is_none());
assert!(s5.eq(&s5));
assert!(!s5.eq(&s4));
}
#[test]
fn derive_status() {
let start = Instant::now();
let sec = Duration::from_secs(1);
let hour = Duration::from_secs(3600);
let mut ms = ChanMgrStatus::new_at(start);
// when we start, we're unable to reach any conclusions.
let s0 = ms.conn_status_at(start);
assert!(s0.online.is_none());
assert!(s0.tls_works.is_none());
// Time won't let us make conclusions either, unless there have been
// attempts.
let s = ms.conn_status_at(start + hour);
assert!(s.eq(&s0));
// But if there have been attempts, _and_ time has passed, we notice
// failure.
for _ in 0..10 {
ms.record_attempt();
}
// (Not immediately...)
let s = ms.conn_status_at(start);
assert!(s.eq(&s0));
// (... but after a while.)
let s = ms.conn_status_at(start + hour);
assert_eq!(s.online, Some(false));
assert_eq!(s.tls_works, Some(false));
// If TCP has succeeded, we should notice that.
ms.record_tcp_success(start + sec);
let s = ms.conn_status_at(start + sec * 2);
assert_eq!(s.online, Some(true));
assert!(s.tls_works.is_none());
let s = ms.conn_status_at(start + hour);
assert_eq!(s.online, Some(true));
assert_eq!(s.tls_works, Some(false));
// If the handshake succeeded, we can notice that too.
ms.record_handshake_done(start + sec * 2);
let s = ms.conn_status_at(start + sec * 3);
assert_eq!(s.online, Some(true));
assert_eq!(s.tls_works, Some(true));
}
#[test]
fn sender() {
let (mut snd, rcv) = channel();
{
let s = rcv.inner.borrow().clone();
assert_float_eq!(s.frac(), 0.0, abs <= TOL);
}
snd.record_attempt();
snd.record_tcp_success();
snd.record_tls_finished();
snd.record_handshake_done();
{
let s = rcv.inner.borrow().clone();
assert_float_eq!(s.frac(), 1.0, abs <= TOL);
}
}
}
......@@ -48,6 +48,7 @@
mod builder;
mod err;
mod event;
mod mgr;
#[cfg(test)]
mod testing;
......@@ -62,6 +63,8 @@ use tor_rtcompat::Runtime;
/// A Result as returned by this crate.
pub type Result<T> = std::result::Result<T, Error>;
pub use event::{ConnBlockage, ConnStatus, ConnStatusEvents};
/// A Type that remembers a set of live channels, and launches new
/// ones on request.
///
......@@ -70,14 +73,21 @@ pub type Result<T> = std::result::Result<T, Error>;
pub struct ChanMgr<R: Runtime> {
/// Internal channel manager object that does the actual work.
mgr: mgr::AbstractChanMgr<builder::ChanBuilder<R>>,
/// Stream of [`ConnStatus`] events.
bootstrap_status: event::ConnStatusEvents,
}
impl<R: Runtime> ChanMgr<R> {
/// Construct a new channel manager.
pub fn new(runtime: R) -> Self {
let builder = builder::ChanBuilder::new(runtime);
let (sender, receiver) = event::channel();
let builder = builder::ChanBuilder::new(runtime, sender);
let mgr = mgr::AbstractChanMgr::new(builder);
ChanMgr { mgr }
ChanMgr {
mgr,
bootstrap_status: receiver,
}
}
/// Try to get a suitable channel to the provided `target`,
......@@ -96,4 +106,13 @@ impl<R: Runtime> ChanMgr<R> {
chan.check_match(target)?;
Ok(chan)
}
/// Return a stream of [`ConnStatus`] events to tell us about changes
/// in our ability to connect to the internet.
///
/// Note that this stream can be lossy: the caller will not necessarily
/// observe every event on the stream
pub fn bootstrap_events(&self) -> ConnStatusEvents {
self.bootstrap_status.clone()
}
}
......@@ -40,6 +40,7 @@ hex = "0.4"
itertools = "0.10.1"
tracing = "0.1.18"
memmap2 = { version = "0.5.0", optional = true }
once_cell = "1"
postage = { version = "0.4", default-features = false, features = [
"futures-traits",
] }
......@@ -58,4 +59,4 @@ tempfile = "3"
tor-rtcompat = { path = "../tor-rtcompat", version = "0.0.3", features = [
"tokio",
] }
once_cell = "1"
float_eq = "0.7"
......@@ -122,6 +122,11 @@ async fn load_once<R: Runtime>(
let documents = load_all(dirmgr, missing)?;
state.add_from_cache(documents, dirmgr.store_if_rw())
};
if matches!(outcome, Ok(true)) {
dirmgr.update_status(state.bootstrap_status());
}
outcome
}
......@@ -189,6 +194,10 @@ async fn download_attempt<R: Runtime>(
}
}
if changed {
dirmgr.update_status(state.bootstrap_status());
}
Ok(changed)
}
......@@ -391,6 +400,9 @@ mod test {
fn describe(&self) -> String {
format!("{:?}", &self)
}
fn bootstrap_status(&self) -> crate::event::DirStatus {
crate::event::DirStatus::default()
}
fn is_ready(&self, ready: Readiness) -> bool {
match (ready, self.second_time_around) {
(_, false) => false,
......
//! Code for notifying other modules about changes in the directory.
// TODO(nickm): After we have enough experience with this code, we might want to
// make it a public interface. If we do it should probably move into another
// crate.
// TODO(nickm): After we have enough experience with this FlagPublisher, we
// might want to make it a public interface. If we do it should probably move
// into another crate.
use std::{
fmt,
marker::PhantomData,
pin::Pin,
sync::{
......@@ -11,9 +13,12 @@ use std::{
Arc,
},
task::Poll,
time::SystemTime,
};
use futures::{stream::Stream, Future};
use futures::{stream::Stream, Future, StreamExt};
use time::OffsetDateTime;
use tor_netdoc::doc::netstatus;
/// An event that a DirMgr can broadcast to indicate that a change in
/// the status of its directory.
......@@ -231,9 +236,299 @@ impl<F: FlagEvent> Stream for FlagListener<F> {
}
}
/// Description of the directory manager's current bootstrapping status.
///
/// This status does not necessarily increase monotonically: it can go backwards
/// if (for example) our directory information expires before we're able to get
/// new information.
#[derive(Clone, Debug, Default)]
pub struct DirBootstrapStatus {
/// The status for the current directory that we're using right now.
pub(crate) current: DirStatus,
/// The status for a directory that we're downloading to replace the current
/// directory.
///
/// This is "None" if we haven't started fetching the next consensus yet.
pub(crate) next: Option<DirStatus>,
}
/// The status for a single directory.
#[derive(Clone, Debug)]
pub struct DirStatus(DirStatusInner);
/// The contents of a single DirStatus.
///
/// This is a separate type so that we don't make the variants public.
#[derive(Clone, Debug)]
pub(crate) enum DirStatusInner {
/// We don't have any information yet.
NoConsensus {
/// If present, we are fetching a consensus whose valid-after time
/// postdates this time.
after: Option<SystemTime>,
},
/// We've downloaded a consensus, but we haven't validated it yet.
FetchingCerts {
/// The lifetime of the consensus.
lifetime: netstatus::Lifetime,
/// A fraction (in (numerator,denominator) format) of the certificates
/// we have for this consensus.
n_certs: (u16, u16),
},
/// We've validated a consensus and we're fetching (or have fetched) its
/// microdescriptors.
Validated {
/// The lifetime of the consensus.
lifetime: netstatus::Lifetime,
/// A fraction (in (numerator,denominator) form) of the microdescriptors
/// that we have for this consensus.
n_mds: (u32, u32),
/// True iff we've decided that the consensus is usable.
usable: bool,
// TODO(nickm) Someday we could add a field about whether any primary
// guards are missing microdescriptors, to give a better explanation for
// the case where we won't switch our consensus because of that.
},
}
impl Default for DirStatus {
fn default() -> Self {
DirStatus(DirStatusInner::NoConsensus { after: None })
}
}
impl From<DirStatusInner> for DirStatus {
fn from(inner: DirStatusInner) -> DirStatus {
DirStatus(inner)
}
}
impl fmt::Display for DirStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Format this time in a format useful for displaying
/// lifetime boundaries.
fn fmt_time(t: SystemTime) -> String {
use once_cell::sync::Lazy;
/// Formatter object for lifetime boundaries.
///
/// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
/// sub-second times here, and using non-UTC offsets is confusing
/// in this context.
static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
.expect("Invalid time format")
});
OffsetDateTime::from(t)
.format(&FORMAT)
.unwrap_or_else(|_| "(could not format)".into())
}
match &self.0 {
DirStatusInner::NoConsensus { .. } => write!(f, "fetching a consensus"),
DirStatusInner::FetchingCerts { n_certs, .. } => write!(
f,
"fetching authority certificates ({}/{})",
n_certs.0, n_certs.1
),
DirStatusInner::Validated {
usable: false,
n_mds,
..
} => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
DirStatusInner::Validated {
usable: true,
lifetime,
..
} => write!(
f,
"usable, fresh until {}, and valid until {}",
fmt_time(lifetime.fresh_until()),
fmt_time(lifetime.valid_until())
),
}
}
}
impl fmt::Display for DirBootstrapStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "directory is {}", self.current)?;
if let Some(ref next) = self.next {
write!(f, "; next directory is {}", next)?;
}
Ok(())
}
}
impl DirBootstrapStatus {
/// Return the fraction of completion for directory download, in a form
/// suitable for a progress bar at some particular time.
///
/// This value is not monotonic, and can go down as one directory is
/// replaced with another.
///
/// Callers _should not_ depend on the specific meaning of any particular
/// fraction; we may change these fractions in the future.
pub fn frac_at(&self, when: SystemTime) -> f32 {
self.current
.frac_at(when)
.or_else(|| self.next.as_ref().and_then(|next| next.frac_at(when)))
.unwrap_or(0.0)
}
/// Return true if this status indicates that we have a current usable
/// directory.
pub fn usable_at(&self, now: SystemTime) -> bool {
self.current.usable() && self.current.valid_at(now)
}
/// Update this status by replacing its current status (or its next status)
/// with `new_status`, as appropriate.
pub(crate) fn update(&mut self, new_status: DirStatus) {
if new_status.usable() {
// This is a usable directory, but it might be a stale one still
// getting updated. Make sure that it is at least as new as the one
// in `current` before we set `current`.
if new_status.at_least_as_new_as(&self.current) {
// This one will be `current`. Should we clear `next`? Only if
// this one is at least as recent as `next` too.
if let Some(ref next) = self.next {
if new_status.at_least_as_new_as(next) {
self.next = None;
}
}
self.current = new_status;
}
} else if !self.current.usable() {
// Not a usable directory, but we don't _have_ a usable directory. This is therefore current.
self.current = new_status;
} else {
// This is _not_ a usable directory, so it can only be `next`.
self.next = Some(new_status);
}
}
}
impl DirStatus {
/// Return the consensus lifetime for this directory, if we have one.
fn lifetime(&self) -> Option<&netstatus::Lifetime> {
match &self.0 {
DirStatusInner::NoConsensus { .. } => None,
DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime),
DirStatusInner::Validated { lifetime, .. } => Some(lifetime),
}
}
/// Return true if the directory is valid at the given time.
fn valid_at(&self, when: SystemTime) -> bool {
if let Some(lifetime) = self.lifetime() {
lifetime.valid_after() <= when && when < lifetime.valid_until()
} else {
false
}
}
/// As frac_at, but return None if this consensus is not valid at the given time.
fn frac_at(&self, when: SystemTime) -> Option<f32> {
if self.valid_at(when) {
Some(self.frac())
} else {
None
}
}
/// Return true if this status indicates a usable directory.
fn usable(&self) -> bool {
matches!(self.0, DirStatusInner::Validated { usable: true, .. })
}
/// Return the fraction of completion for directory download, in a form
/// suitable for a progress bar.
///
/// This is monotonically increasing for a single directory, but can go down
/// as one directory is replaced with another.
///
/// Callers _should not_ depend on the specific meaning of any particular
/// fraction; we may change these fractions in the future.
fn frac(&self) -> f32 {
// We arbitrarily decide that 25% is downloading the consensus, 10% is
// downloading the certificates, and the remaining 65% is downloading
// the microdescriptors until we become usable. We may want to re-tune that in the future, but
// the documentation of this function should allow us to do so.
match &self.0 {
DirStatusInner::NoConsensus { .. } => 0.0,
DirStatusInner::FetchingCerts { n_certs, .. } => {
0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
}
DirStatusInner::Validated {
usable: false,
n_mds,
..
} => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
DirStatusInner::Validated { usable: true, .. } => 1.0,
}
}
/// Return true if the consensus in this DirStatus (if any) is at least as
/// new as the one in `other`.
fn at_least_as_new_as(&self, other: &DirStatus) -> bool {
/// return a candidate "valid after" time for a DirStatus, for comparison purposes.
fn start_time(st: &DirStatus) -> Option<SystemTime> {
match &st.0 {
DirStatusInner::NoConsensus { after: Some(t) } => {
Some(*t + std::time::Duration::new(1, 0)) // Make sure this sorts _after_ t.
}
DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime.valid_after()),
DirStatusInner::Validated { lifetime, .. } => Some(lifetime.valid_after()),
_ => None,
}
}
match (start_time(self), start_time(other)) {
// If both have a lifetime, compare their valid_after times.
(Some(l1), Some(l2)) => l1 >= l2,
// Any consensus is newer than none.
(Some(_), None) => true,
// No consensus is never newer than anything.
(None, _) => false,
}
}
}
/// A stream of [`DirBootstrapStatus`] events.
#[derive(Clone)]
pub struct DirBootstrapEvents {
/// The `postage::watch::Receiver` that we're wrapping.
///
/// We wrap this type so that we don't expose its entire API, and so that we
/// can migrate to some other implementation in the future if we want.
pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
}
impl Stream for DirBootstrapEvents {
type Item = DirBootstrapStatus;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl std::fmt::Debug for DirBootstrapEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DirBootstrapStatusEvents")
.finish_non_exhaustive()
}
}
#[allow(clippy::unwrap_used)]
#[cfg(test)]
mod test {
use std::time::Duration;
use super::*;
use float_eq::assert_float_eq;
use futures::stream::StreamExt;
use tor_rtcompat::test_with_all_runtimes;
......@@ -359,4 +654,170 @@ mod test {
fn failed_conversion() {
assert_eq!(DirEvent::from_index(999), None);
}
#[test]
fn dir_status_basics() {
let now = SystemTime::now();
let hour = Duration::new(3600, 0);
let nothing = DirStatus(DirStatusInner::NoConsensus { after: None });
let unval = DirStatus(DirStatusInner::FetchingCerts {
lifetime: netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap(),
n_certs: (3, 5),
});
let with_c = DirStatus(DirStatusInner::Validated {
lifetime: netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap(),
n_mds: (30, 40),
usable: false,
});
// lifetime()
assert!(nothing.lifetime().is_none());
assert_eq!(unval.lifetime().unwrap().valid_after(), now);
assert_eq!(with_c.lifetime().unwrap().valid_until(), now + hour * 3);
// at_least_as_new_as()
assert!(!nothing.at_least_as_new_as(&nothing));
assert!(unval.at_least_as_new_as(&nothing));
assert!(unval.at_least_as_new_as(&unval));
assert!(!unval.at_least_as_new_as(&with_c));
assert!(with_c.at_least_as_new_as(&unval));
assert!(with_c.at_least_as_new_as(&with_c));
// frac() (It's okay if we change the actual numbers here later; the
// current ones are more or less arbitrary.)
const TOL: f32 = 0.00001;
assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
// frac_at()
let t1 = now + hour / 2;
let t2 = t1 + hour * 2;
assert!(nothing.frac_at(t1).is_none());
assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
assert!(with_c.frac_at(t1).is_none());
assert!(nothing.frac_at(t2).is_none());
assert!(unval.frac_at(t2).is_none());
assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
}
#[test]
fn dir_status_display() {
use time::macros::datetime;
let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
let hour = Duration::new(3600, 0);
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let ds = DirStatus(DirStatusInner::NoConsensus { after: None });
assert_eq!(ds.to_string(), "fetching a consensus");
let ds = DirStatus(DirStatusInner::FetchingCerts {
lifetime: lifetime.clone(),
n_certs: (3, 5),
});
assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
let ds = DirStatus(DirStatusInner::Validated {
lifetime: lifetime.clone(),
n_mds: (30, 40),
usable: false,
});
assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
let ds = DirStatus(DirStatusInner::Validated {
lifetime,
n_mds: (30, 40),
usable: true,
});
assert_eq!(
ds.to_string(),
"usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
);
}
#[test]
fn bootstrap_status() {
use time::macros::datetime;
let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
let hour = Duration::new(3600, 0);
let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
let ds1: DirStatus = DirStatusInner::Validated {
lifetime: lifetime.clone(),
n_mds: (3, 40),
usable: true,
}
.into();
let ds2: DirStatus = DirStatusInner::Validated {
lifetime: lifetime2.clone(),
n_mds: (5, 40),
usable: false,
}
.into();
let bs = DirBootstrapStatus {
current: ds1.clone(),
next: Some(ds2.clone()),
};
assert_eq!(bs.to_string(),
"directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
);
const TOL: f32 = 0.00001;
assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
assert_float_eq!(
bs.frac_at(t1 + hour * 3 + hour / 2),
0.35 + 0.65 * 0.125,
abs <= TOL
);
// Now try updating.
// Case 1: we have a usable directory and the updated status isn't usable.
let mut bs = bs;
let ds3 = DirStatus(DirStatusInner::Validated {
lifetime: lifetime2.clone(),
n_mds: (10, 40),
usable: false,
});
bs.update(ds3);
assert!(matches!(
bs.next.as_ref().unwrap().0,
DirStatusInner::Validated {
n_mds: (10, 40),
..
}
));
// Case 2: The new directory _is_ usable and newer. It will replace the old one.
let ds4 = DirStatus(DirStatusInner::Validated {
lifetime: lifetime2.clone(),
n_mds: (20, 40),
usable: true,
});
bs.update(ds4);
assert!(bs.next.as_ref().is_none());
assert_eq!(
bs.current.lifetime().unwrap().valid_after(),
lifetime2.valid_after()
);
// Case 3: The new directory is usable but older. Nothing will happen.
bs.update(ds1);
assert!(bs.next.as_ref().is_none());
assert_ne!(
bs.current.lifetime().unwrap().valid_after(),
lifetime.valid_after()
);
// Case 4: starting with an unusable directory, we always replace.
let mut bs = DirBootstrapStatus::default();
assert!(!ds2.usable());
assert!(bs.current.lifetime().is_none());
bs.update(ds2);
assert!(bs.current.lifetime().is_some());
}
}
......@@ -70,6 +70,7 @@ mod storage;
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
use crate::shared_ref::SharedMutArc;
use crate::storage::sqlite::SqliteStore;
use postage::watch;
pub use retry::DownloadSchedule;
use tor_circmgr::CircMgr;
use tor_netdir::NetDir;
......@@ -90,7 +91,7 @@ pub use config::{
};
pub use docid::DocId;
pub use err::Error;
pub use event::DirEvent;
pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirEvent, DirStatus};
pub use storage::DocumentText;
pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
......@@ -127,6 +128,17 @@ pub struct DirMgr<R: Runtime> {
/// A publisher handle that we notify whenever the consensus changes.
events: event::FlagPublisher<DirEvent>,
/// A publisher handle that we notify whenever our bootstrapping status
/// changes.
send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
/// A receiver handle that gets notified whenever our bootstrapping status
/// changes.
///
/// We don't need to keep this drained, since `postage::watch` already knows
/// to discard unread events.
receive_status: DirBootstrapEvents,
/// A circuit manager, if this DirMgr supports downloading.
circmgr: Option<Arc<CircMgr<R>>>,
......@@ -438,6 +450,25 @@ impl<R: Runtime> DirMgr<R> {
Ok(())
}
/// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
/// in the latest directory's bootstrap status.
///
/// Note that this stream can be lossy: the caller will not necessarily
/// observe every event on the stream
pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
self.receive_status.clone()
}
/// Replace the latest status with `new_status` and broadcast to anybody
/// watching via a [`DirBootstrapEvents`] stream.
fn update_status(&self, new_status: DirStatus) {
// TODO(nickm): can I kill off this lock by having something else own the sender?
let mut sender = self.send_status.lock().expect("poisoned lock");
let mut status = sender.borrow_mut();
status.update(new_status);
}
/// Try to make this a directory manager with read-write access to its
/// storage.
///
......@@ -477,11 +508,19 @@ impl<R: Runtime> DirMgr<R> {
let netdir = SharedMutArc::new();
let events = event::FlagPublisher::new();
let (send_status, receive_status) = postage::watch::channel();
let send_status = Mutex::new(send_status);
let receive_status = DirBootstrapEvents {
inner: receive_status,
};
Ok(DirMgr {
config: config.into(),
store,
netdir,
events,
send_status,
receive_status,
circmgr,
runtime,
})
......@@ -766,6 +805,9 @@ trait DirState: Send {
request: &ClientRequest,
storage: Option<&Mutex<SqliteStore>>,
) -> Result<bool>;
/// Return a summary of this state as a [`DirStatus`].
fn bootstrap_status(&self) -> event::DirStatus;
/// Return a configuration for attempting downloads.
fn dl_config(&self) -> Result<DownloadSchedule>;
/// If possible, advance to the next state.
......
......@@ -20,6 +20,7 @@ use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
use tor_netdoc::doc::netstatus::Lifetime;
use tracing::{info, warn};
use crate::event::{DirStatus, DirStatusInner};
use crate::DirEvent;
use crate::{
docmeta::{AuthCertMeta, ConsensusMeta},
......@@ -113,6 +114,14 @@ pub(crate) struct GetConsensusState<DM: WriteNetDir> {
/// How should we get the consensus from the cache, if at all?
cache_usage: CacheUsage,
/// If present, a time after which we want our consensus to have
/// been published.
//
// TODO: This is not yet used everywhere it could be. In the future maybe
// it should be inserted into the DocId::LatestConsensus alternative rather
// than being recalculated in make_consensus_request,
after: Option<SystemTime>,
/// If present, our next state.
///
/// (This is present once we have a consensus.)
......@@ -133,18 +142,25 @@ impl<DM: WriteNetDir> GetConsensusState<DM> {
/// Create a new GetConsensusState from a weak reference to a
/// directory manager and a `cache_usage` flag.
pub(crate) fn new(writedir: Weak<DM>, cache_usage: CacheUsage) -> Result<Self> {
let authority_ids: Vec<_> = if let Some(writedir) = Weak::upgrade(&writedir) {
writedir
let (authority_ids, after) = if let Some(writedir) = Weak::upgrade(&writedir) {
let ids: Vec<_> = writedir
.config()
.authorities()
.iter()
.map(|auth| *auth.v3ident())
.collect()
.collect();
let after = writedir
.netdir()
.get()
.map(|nd| nd.lifetime().valid_after());
(ids, after)
} else {
return Err(Error::ManagerDropped);
};
Ok(GetConsensusState {
cache_usage,
after,
next: None,
authority_ids,
writedir,
......@@ -181,6 +197,13 @@ impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
fn can_advance(&self) -> bool {
self.next.is_some()
}
fn bootstrap_status(&self) -> DirStatus {
if let Some(next) = &self.next {
next.bootstrap_status()
} else {
DirStatusInner::NoConsensus { after: self.after }.into()
}
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_consensus())
......@@ -347,6 +370,16 @@ impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
fn can_advance(&self) -> bool {
self.unvalidated.key_is_correct(&self.certs[..]).is_ok()
}
fn bootstrap_status(&self) -> DirStatus {
let n_certs = self.certs.len();
let n_missing_certs = self.missing_certs.len();
let total_certs = n_missing_certs + n_certs;
DirStatusInner::FetchingCerts {
lifetime: self.consensus_meta.lifetime().clone(),
n_certs: (n_certs as u16, total_certs as u16),
}
.into()
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_certs())
......@@ -476,6 +509,8 @@ struct GetMicrodescsState<DM: WriteNetDir> {
cache_usage: CacheUsage,
/// The digests of the microdescriptors we are missing.
missing: HashSet<MdDigest>,
/// Total number of microdescriptors listed in the consensus.
n_microdescs: usize,
/// The dirmgr to inform about a usable directory.
writedir: Weak<DM>,
/// The current status of our netdir, if it is not yet ready to become the
......@@ -552,6 +587,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
writedir: Weak<DM>,
) -> Result<Self> {
let reset_time = consensus.lifetime().valid_until();
let n_microdescs = consensus.relays().len();
let partial_dir = match Weak::upgrade(&writedir) {
Some(wd) => {
......@@ -569,6 +605,7 @@ impl<DM: WriteNetDir> GetMicrodescsState<DM> {
let missing = partial_dir.missing_microdescs().map(Clone::clone).collect();
let mut result = GetMicrodescsState {
cache_usage,
n_microdescs,
missing,
writedir,
partial: Some(PendingNetDir::Partial(partial_dir)),
......@@ -666,6 +703,15 @@ impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
fn can_advance(&self) -> bool {
false
}
fn bootstrap_status(&self) -> DirStatus {
let n_present = self.n_microdescs - self.missing.len();
DirStatusInner::Validated {
lifetime: self.meta.lifetime().clone(),
n_mds: (n_present as u32, self.n_microdescs as u32),
usable: self.is_ready(Readiness::Usable),
}
.into()
}
fn dl_config(&self) -> Result<DownloadSchedule> {
if let Some(wd) = Weak::upgrade(&self.writedir) {
Ok(*wd.config().schedule().retry_microdescs())
......@@ -1005,6 +1051,9 @@ mod test {
// Basic properties: it doesn't want to reset.
assert!(state.reset_time().is_none());
// Its starting DirStatus is "fetching a consensus".
assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus");
// Download configuration is simple: only 1 request can be done in
// parallel. It uses a consensus retry schedule.
let retry = state.dl_config().unwrap();
......@@ -1111,6 +1160,12 @@ mod test {
let retry = state.dl_config().unwrap();
assert_eq!(&retry, DownloadScheduleConfig::default().retry_certs());
// Bootstrap status okay?
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (0/2)"
);
// Check that we get the right list of missing docs.
let missing = state.missing_docs();
assert_eq!(missing.len(), 2); // We are missing two certificates.
......@@ -1131,6 +1186,10 @@ mod test {
let missing = state.missing_docs();
assert_eq!(missing.len(), 1); // Now we're only missing one!
assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
assert_eq!(
state.bootstrap_status().to_string(),
"fetching authority certificates (1/2)"
);
// Now try to add the other from a download ... but fail
// because we didn't ask for it.
......@@ -1226,6 +1285,10 @@ mod test {
}
let retry = state.dl_config().unwrap();
assert_eq!(&retry, DownloadScheduleConfig::default().retry_microdescs());
assert_eq!(
state.bootstrap_status().to_string(),
"fetching microdescriptors (0/4)"
);
// Now check whether we're missing all the right microdescs.
let missing = state.missing_docs();
......@@ -1257,6 +1320,10 @@ mod test {
let missing = state.missing_docs();
assert_eq!(missing.len(), 3);
assert!(!missing.contains(&DocId::Microdesc(md1)));
assert_eq!(
state.bootstrap_status().to_string(),
"fetching microdescriptors (1/4)"
);
// Try adding the rest as if from a download.
let mut req = tor_dirclient::request::MicrodescRequest::new();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment