lib.rs 33.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
//! Code to fetch, store, and update directory information.
//!
//! In its current design, Tor requires a set of up-to-date
//! authenticated directory documents in order to build multi-hop
//! anonymized circuits through the network.
//!
//! This directory manager crate is responsible for figuring out which
//! directory information we lack, downloading what we're missing, and
//! keeping a cache of it on disk.
10

11
12
13
#![deny(missing_docs)]
#![deny(clippy::missing_docs_in_private_items)]

14
15
16
17
pub mod authority;
mod config;
mod docmeta;
mod err;
18
mod retry;
Nick Mathewson's avatar
Nick Mathewson committed
19
mod storage;
20
mod updater;
21

22
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
23
use crate::retry::RetryDelay;
24
use crate::storage::sqlite::SqliteStore;
25
26
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
use tor_circmgr::{CircMgr, DirInfo};
27
use tor_netdir::{MDReceiver, NetDir, PartialNetDir};
28
29
30
31
use tor_netdoc::doc::authcert::{AuthCert, AuthCertKeyIds};
use tor_netdoc::doc::microdesc::{MDDigest, Microdesc, MicrodescReader};
use tor_netdoc::doc::netstatus::{MDConsensus, UnvalidatedMDConsensus};
use tor_netdoc::AllowAnnotations;
32
use tor_retry::RetryError;
33

34
use anyhow::{anyhow, Context, Result};
35
use async_rwlock::RwLock;
36
use futures::lock::Mutex;
37
use futures::stream::StreamExt;
38
use log::{debug, info, warn};
39
40
41
42
43
44

use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;

45
46
47
pub use authority::Authority;
pub use config::{NetDirConfig, NetDirConfigBuilder};
pub use err::Error;
48
pub use updater::DirectoryUpdater;
49

50
/// A directory manager to download, fetch, and cache a Tor directory
51
pub struct DirMgr {
52
53
    /// Configuration information: where to find directories, how to
    /// validate them, and so on.
54
    config: NetDirConfig,
55
    /// Handle to our sqlite cache.
56
    // XXXX I'd like to use an rwlock, but that's not feasible, since
57
58
    // rusqlite::Connection isn't Sync.
    store: Mutex<SqliteStore>,
59
60
61
62
    /// Our latest sufficiently bootstrapped directory, if we have one.
    ///
    /// We use the RwLock so that we can give this out to a bunch of other
    /// users, and replace it once a new directory is bootstrapped.
63
    netdir: RwLock<Option<Arc<NetDir>>>,
64
65
66
}

impl DirMgr {
67
68
69
70
71
72
73
74
    /// Return a new directory manager from a given configuration,
    /// bootstrapping from the network as necessary.
    pub async fn bootstrap_from_config(
        config: NetDirConfig,
        circmgr: Arc<CircMgr>,
    ) -> Result<Arc<Self>> {
        let dirmgr = Arc::new(DirMgr::from_config(config)?);

75
        // Try to load from the cache.
76
77
78
79
80
        if dirmgr
            .load_directory()
            .await
            .context("Error loading cached directory")?
        {
81
            info!("Loaded a good directory from cache.")
82
        } else {
83
84
85
            // Okay, we didn't get a directory from the cache.  We need to
            // try fetching it.
            info!("Didn't find a usable directory in the cache. Trying to booststrap.");
86
87
88
89
90
91
92
            dirmgr
                .bootstrap_directory(Arc::clone(&circmgr))
                .await
                .context("Unable to bootstrap directory")?;
            info!("Bootstrapped successfully.");
        }

93
94
        // Launch a task to run in the background and keep this dirmgr
        // up-to-date.
95
        Arc::clone(&dirmgr).launch_updater(circmgr);
96

97
98
99
        Ok(dirmgr)
    }

100
    /// Construct a DirMgr from a NetDirConfig.
101
    fn from_config(config: NetDirConfig) -> Result<Self> {
102
        let store = Mutex::new(config.open_sqlite_store()?);
103
104
105
106
107
108
        let netdir = RwLock::new(None);
        Ok(DirMgr {
            config,
            store,
            netdir,
        })
109
110
    }

111
112
113
114
    /// Load the latest non-pending non-expired directory from the
    /// cache, if it is newer than the one we have.
    ///
    /// Return false if there is no such consensus.
115
    async fn load_directory(&self) -> Result<bool> {
116
117
118
119
        let store = &self.store;

        let noinfo = NoInformation::new();

120
        // Load the consensus.
121
122
123
124
125
126
127
128
129
130
131
132
        let mut unval = match noinfo.load(false, &self.config, store).await? {
            NextState::SameState(_) => return Ok(false),
            NextState::NewState(unval) => unval,
        };

        let cached_vu = unval.consensus.peek_lifetime().valid_until();
        {
            if self.current_netdir_lasts_past(cached_vu).await {
                return Ok(false);
            }
        }

133
        // Load certificates, see if it's well-signed.
134
135
136
137
138
139
140
141
142
143
144
        unval.load(&self.config, store).await?;
        let mut partial = match unval.advance(&self.config)? {
            NextState::SameState(_) => {
                return Err(Error::CacheCorruption(
                    "Couldn't get certs for supposedly complete consensus",
                )
                .into());
            }
            NextState::NewState(p) => p,
        };

145
        // Load microdescs, make sure there are enough.
146
        partial.load(store, self.opt_netdir().await).await?;
147
148
149
150
151
152
153
154
155
156
        let nd = match partial.advance() {
            NextState::NewState(nd) => nd,
            NextState::SameState(_) => {
                return Err(Error::CacheCorruption(
                    "Couldn't get microdescs for supposedly complete consensus",
                )
                .into());
            }
        };

157
        // This is now a good directory.  Put it in self.netdir.
158
159
160
161
162
163
164
165
        {
            let mut w = self.netdir.write().await;
            *w = Some(Arc::new(nd));
        }

        Ok(true)
    }

166
167
    /// Run a complete bootstrapping process, using information from our
    /// cache when it is up-to-date enough.
168
    async fn bootstrap_directory(&self, circmgr: Arc<CircMgr>) -> Result<()> {
169
170
171
        self.fetch_directory(circmgr, true).await
    }

172
    /// Get a new directory, starting with a fresh consensus download.
Nick Mathewson's avatar
Nick Mathewson committed
173
    ///
174
    async fn fetch_new_directory(&self, circmgr: Arc<CircMgr>) -> Result<()> {
175
176
177
        self.fetch_directory(circmgr, false).await
    }

178
179
180
    /// Try to fetch and add a new set of microdescriptors to the
    /// current NetDir.  On success, return the number of
    /// microdescriptors that are still missing.
181
    async fn fetch_additional_microdescs(&self, circmgr: Arc<CircMgr>) -> Result<usize> {
182
183
184
        let new_microdescs = {
            // We introduce a scope here so that we'll drop our reference
            // to the old netdir when we're done downloading.
185
            let netdir = match self.opt_netdir().await {
186
187
188
189
190
191
192
193
194
195
196
197
                Some(nd) => nd,
                None => return Ok(0),
            };

            let mark_listed = netdir.lifetime().valid_after();

            let missing: Vec<_> = netdir.missing_microdescs().map(Clone::clone).collect();
            let n_missing = missing.len();
            if n_missing == 0 {
                return Ok(0);
            }

198
199
200
201
            debug!(
                "{} missing microdescsriptors. Attempting to download...",
                n_missing
            );
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
            let mds = download_mds(
                missing,
                mark_listed,
                &self.store,
                netdir.as_ref().into(),
                circmgr,
            )
            .await?;
            if mds.is_empty() {
                return Ok(n_missing);
            }
            mds
        };

        // Now we update the netdir.
        let new_netdir = {
            let mut w = self.netdir.write().await;
            if let Some(old_netdir) = w.take() {
                let new_netdir = Arc::new(old_netdir.extend(new_microdescs));
                *w = Some(Arc::clone(&new_netdir));
                new_netdir
            } else {
                // programming error here; warn?
                return Ok(0);
            }
        };

        Ok(new_netdir.missing_microdescs().count())
    }

232
233
    /// Launch an updater task that periodically re-fetches the
    /// directory to keep it up-to-date.
234
    fn launch_updater(self: Arc<Self>, circmgr: Arc<CircMgr>) -> Arc<DirectoryUpdater> {
Nick Mathewson's avatar
Nick Mathewson committed
235
236
        // TODO: XXXX: Need some way to keep two of these from running at
        // once.
237
238
239
240
241
242
243
244
245
246
        let updater = Arc::new(updater::DirectoryUpdater::new(self, circmgr));

        let updater_ref = Arc::clone(&updater);
        tor_rtcompat::task::spawn(async move {
            let _ = updater_ref.run().await;
        });

        updater
    }

247
248
249
250
    /// Run a complete bootstrapping process, using information from our
    /// cache when it is up-to-date enough.  When complete, update our
    /// NetDir with the one we've fetched.
    ///
251
252
253
    /// If use_cached_consensus is true, we start with a cached
    /// consensus if it is live; otherwise, we start with a consensus
    /// download.
254
    // TODO: We'll likely need to refactor this before too long.
Nick Mathewson's avatar
Nick Mathewson committed
255
256
    // TODO: This needs to exit with a failure if the consensus expires
    // partway through the process.
257
    pub async fn fetch_directory(
258
        &self,
259
        circmgr: Arc<CircMgr>,
260
        use_cached_consensus: bool,
261
    ) -> Result<()> {
262
        let store = &self.store;
263

264
265
        // Start out by using our previous netdir (if any): We'll need it
        // to decide where we ar downloading things from.
266
        let current_netdir = self.opt_netdir().await;
267
268
        let dirinfo = match current_netdir {
            Some(ref nd) => nd.as_ref().into(),
269
270
271
            None => self.config.fallbacks().into(),
        };

272
273
        // Get a cached consensus, or start from scratch, depending on
        // whether `use_cached_consensus` is true.
274
        let noinfo = NoInformation::new();
275
276
277
278
279
        let nextstate = if use_cached_consensus {
            noinfo.load(true, &self.config, store).await?
        } else {
            NextState::SameState(noinfo)
        };
280

281
        // TODO: XXXX-A1: Also check the age of our current one.
282
        let mut unval = match nextstate {
283
            NextState::SameState(noinfo) => {
284
                // Couldn't load a pending consensus. Have to fetch one.
285
                info!("Fetching a consensus directory.");
286
                noinfo
287
                    .fetch_consensus(&self.config, store, dirinfo, Arc::clone(&circmgr))
288
289
290
291
292
                    .await?
            }
            NextState::NewState(unval) => unval,
        };

293
294
        // At this point we have a pending consensus.  We need to get certs
        // for it.  See what we can get from disk...
295
        unval.load(&self.config, store).await?;
296
297
        // Then fetch whatever we're missing.
        info!("Fetching certificate(s)."); // XXXX only log this when we have some to fetch.
298
        unval
299
            .fetch_certs(&self.config, store, dirinfo, Arc::clone(&circmgr))
300
            .await?;
301
        let mut partial = match unval.advance(&self.config)? {
302
            NextState::SameState(_) => return Err(anyhow!("Couldn't get certs")),
303
304
305
            NextState::NewState(p) => p,
        };

306
        // Finally, get microdescs from the cache...
307
        partial.load(store, self.opt_netdir().await).await?;
308
        // .. and fetch whatever we're missing.
309
        partial
310
            .fetch_mds(store, dirinfo, Arc::clone(&circmgr))
311
312
            .await?;

313
314
        let nd = match partial.advance() {
            NextState::NewState(nd) => nd,
315
            NextState::SameState(_) => return Err(anyhow!("Didn't get enough mds")),
316
317
318
319
320
        };

        {
            let mut w = self.netdir.write().await;
            *w = Some(Arc::new(nd));
321
        }
322
323
324
325

        Ok(())
    }

326
327
328
329
330
331
332
333
334
335
336
337
    /// Return true if we have a netdir, and it will be valid at least
    /// till 'when'.
    async fn current_netdir_lasts_past(&self, when: SystemTime) -> bool {
        let r = self.netdir.read().await;
        if let Some(current_netdir) = r.as_ref() {
            let current_vu = current_netdir.lifetime().valid_until();
            current_vu >= when
        } else {
            false
        }
    }

338
    /// Return an Arc handle to our latest directory, if we have one.
339
    ///
340
341
342
343
    /// This is a private method, since by the time anybody else has a
    /// handle to a DirMgr, the NetDir should definitely be
    /// bootstrapped.
    async fn opt_netdir(&self) -> Option<Arc<NetDir>> {
344
        self.netdir.read().await.as_ref().map(Arc::clone)
345
    }
346
347
348
349
350
351
352
353

    /// Return an Arc handle to our latest directory, if we have one.
    // TODO: Add variants of this that make sure that it's up-to-date?
    pub async fn netdir(&self) -> Arc<NetDir> {
        self.opt_netdir()
            .await
            .expect("DirMgr was not bootstrapped!")
    }
354
355
}

356
357
/// Abstraction to handle the idea of a possible state transition
/// after fetching or loading directory information.
358
#[derive(Clone, Debug)]
359
enum NextState<A, B>
360
361
362
363
where
    A: Clone + Debug,
    B: Clone + Debug,
{
364
365
366
367
    /// We either got no new info, or we didn't get enough info to update
    /// to a new state.
    SameState(A),
    /// We found enough information to transition to a new state.
368
369
370
    NewState(B),
}

371
/// Initial directory state when no information is known.
372
373
374
///
/// We can advance from this state by loading or fetching a consensus
/// document.
375
#[derive(Debug, Clone, Default)]
376
struct NoInformation {}
377

378
379
380
381
/// Second directory state: We know a consensus directory document,
/// but not the certs to validate it.
///
/// We can advance from this state by loading or fetching certificates.
382
#[derive(Debug, Clone)]
383
struct UnvalidatedDir {
384
    /// True if we loaded this consensus from our local cache.
385
    from_cache: bool,
386
    /// The consensus we've received
387
    consensus: UnvalidatedMDConsensus,
388
389
    /// Information about digests and lifetimes of that consensus,
    consensus_meta: ConsensusMeta,
390
391
392
393
    /// The certificates that we've received for this consensus.
    ///
    /// We ensure that certificates are only included in this list if
    /// they are for authorities we believe in.
394
395
396
    certs: Vec<AuthCert>,
}

397
398
399
400
/// Third directory state: we've validated the consensus, but don't have
/// enough microdescs for it yet.
///
/// We advance from this state by loading or detching microdescriptors.
401
#[derive(Debug, Clone)]
402
struct PartialDir {
403
    /// True if we loaded the consensus from our local cache.
404
    from_cache: bool,
405
406
    /// Information about digests and lifetimes of the consensus.
    consensus_meta: ConsensusMeta,
407
    /// The consensus directory, partially filled in with microdescriptors.
408
409
410
411
    dir: PartialNetDir,
}

impl NoInformation {
412
413
    /// Construct a new `NoInformation` into which directory information
    /// can loaded or fetched.
414
415
    fn new() -> Self {
        NoInformation {}
416
417
    }

418
419
420
421
422
    /// Try to fetch a currently timely consensus directory document
    /// from the local cache in `store`.  If `pending`, then we'll
    /// happily return a pending document; otherwise, we'll only
    /// return a document that has been marked as having been completely
    /// bootstrapped.
423
    async fn load(
424
425
        self,
        pending: bool,
426
        config: &NetDirConfig,
427
        store: &Mutex<SqliteStore>,
428
429
    ) -> Result<NextState<Self, UnvalidatedDir>> {
        let consensus_text = {
430
            let store = store.lock().await;
431
432
            match store.latest_consensus(pending)? {
                Some(c) => c,
433
                None => return Ok(NextState::SameState(self)),
434
435
            }
        };
436

437
        let (consensus_meta, unvalidated) = {
438
            let string = consensus_text.as_str()?;
439
            let (signedval, remainder, parsed) = MDConsensus::parse(string)?;
440
            if let Ok(timely) = parsed.check_valid_now() {
441
442
443
                let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);

                (meta, timely)
444
            } else {
445
                return Ok(NextState::SameState(self));
446
447
            }
        };
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465

        // Make sure that we could in principle validate this
        // consensus. If we couldn't, pretend it isn't cached at all.
        let authority_ids: Vec<_> = config
            .authorities()
            .iter()
            .map(|auth| auth.v3ident())
            .collect();
        if !unvalidated.authorities_are_correct(&authority_ids[..]) {
            // This is not for us.  Treat it as if we had no cached consensus.
            // XXXX-A1: We should mark the cached as invalid somehow, or just
            // remove it.

            warn!("Found cached directory not signed by the right authorities. Are you using a mismatched cache?");
            store.lock().await.delete_consensus(&consensus_meta)?;
            return Ok(NextState::SameState(self));
        }

466
        let n_authorities = config.authorities().len() as u16;
467
468
469
        let unvalidated = unvalidated.set_n_authorities(n_authorities);
        Ok(NextState::NewState(UnvalidatedDir {
            from_cache: true,
470
            consensus_meta,
471
472
473
474
475
            consensus: unvalidated,
            certs: Vec::new(),
        }))
    }

476
477
    /// Try to fetch a currently timely consensus directory document
    /// from a randomly chosen directory cache server on the network.
478
479
    ///
    /// On failure, retry.
480
    async fn fetch_consensus(
481
        &self,
482
        config: &NetDirConfig,
483
        store: &Mutex<SqliteStore>,
484
        info: DirInfo<'_>,
485
486
        circmgr: Arc<CircMgr>,
    ) -> Result<UnvalidatedDir> {
487
        // XXXX make this configurable.
488
        // XXXX-A1 add a "keep trying forever" option for when we have no consensus.
489
490
491
        let n_retries = 3_u32;
        let mut retry_delay = RetryDelay::default();

492
        let mut errors = RetryError::while_doing("download a consensus");
493
494
495
496
497
        for _ in 0..n_retries {
            let cm = Arc::clone(&circmgr);
            match self.fetch_consensus_once(config, store, info, cm).await {
                Ok(v) => return Ok(v),
                Err(e) => {
498
                    errors.push(e);
499
500
501
502
503
504
                    let delay = retry_delay.next_delay(&mut rand::thread_rng());
                    tor_rtcompat::task::sleep(delay).await;
                }
            }
        }

505
        Err(errors.into())
506
507
508
509
    }

    /// Try to fetch a currently timely consensus directory document
    /// from a randomly chosen directory cache server on the network.
510
    async fn fetch_consensus_once(
511
512
        &self,
        config: &NetDirConfig,
513
        store: &Mutex<SqliteStore>,
514
        info: DirInfo<'_>,
515
516
        circmgr: Arc<CircMgr>,
    ) -> Result<UnvalidatedDir> {
517
        let mut resource = tor_dirclient::request::ConsensusRequest::new();
518
519
        // XXXX-A1 In some of the below error cases we should retire the circuit
        // to the cache that gave us this stuff.
520

521
        let meta = {
522
            let r = store.lock().await;
523
524
525
526
527
528
529
530
531
532
533
            match r.latest_consensus_meta() {
                Ok(Some(meta)) => {
                    resource.set_last_consensus_date(meta.lifetime().valid_after());
                    resource.push_old_consensus_digest(*meta.sha3_256_of_signed());
                    Some(meta)
                }
                Ok(None) => None,
                Err(e) => {
                    warn!("Error loading directory metadata: {}", e);
                    None
                }
534
            }
535
        };
536
537
        let response = tor_dirclient::get_resource(resource, info, circmgr).await?;
        let text = response.output();
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556

        let expanded_diff = if meta.is_some() && tor_consdiff::looks_like_diff(&text) {
            let meta = meta.unwrap();
            // This is a diff, not the consensus.
            let r = store.lock().await;
            let cons = r.consensus_by_meta(&meta)?;
            let new_consensus =
                tor_consdiff::apply_diff(cons.as_str()?, &text, Some(*meta.sha3_256_of_signed()))?
                    .to_string();
            // XXXX-A1 have to check digest of new value.
            info!("Applying a consensus diff");
            Some(new_consensus)
        } else {
            None
        };
        let text = match expanded_diff {
            Some(ref s) => s,
            None => text,
        };
557

558
        let (signedval, remainder, parsed) = MDConsensus::parse(&text)?;
559
        debug!("Successfully parsed the consensus");
560
        let unvalidated = parsed.check_valid_now()?;
561
        let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &unvalidated);
562

563
564
565
566
567
568
569
570
571
572
573
574
        // Make sure that we could in principle validate this, and it doesn't
        // have a totally different set of authorities than the ones
        // we asked for.
        let authority_ids: Vec<_> = config
            .authorities()
            .iter()
            .map(|auth| auth.v3ident())
            .collect();
        if !unvalidated.authorities_are_correct(&authority_ids[..]) {
            return Err(Error::Unwanted("Consensus not signed by correct authorities").into());
        }

575
        {
576
            let mut w = store.lock().await;
577
578
            w.store_consensus(&meta, true, &text)?;
        }
579
        let n_authorities = config.authorities().len() as u16;
580
        let unvalidated = unvalidated.set_n_authorities(n_authorities);
581

582
583
584
        Ok(UnvalidatedDir {
            from_cache: false,
            consensus: unvalidated,
585
            consensus_meta: meta,
586
587
588
589
590
591
            certs: Vec::new(),
        })
    }
}

impl UnvalidatedDir {
592
593
    /// Helper: Remove every member of self.certs that does not match
    /// some authority listed in `config`.
594
    fn prune_certs(&mut self, config: &NetDirConfig) {
595
        // Quadratic, but should be fine.
596
        let authorities = &config.authorities();
597
598
599
600
        self.certs
            .retain(|cert| authorities.iter().any(|a| a.matches_cert(cert)));
    }

601
602
603
604
605
606
    /// Helper: Return a list of certificate key identities for the
    /// certificates we should download in order to check this
    /// consensus.
    ///
    /// This function will return an empty list when we have enough
    /// certificates, whether or not it is a _complete_ list.
607
608
609
    fn missing_certs(&mut self, config: &NetDirConfig) -> Vec<AuthCertKeyIds> {
        self.prune_certs(config);
        let authorities = config.authorities();
610
611
612
613

        match self.consensus.key_is_correct(&self.certs[..]) {
            Ok(()) => Vec::new(),
            Err(mut missing) => {
614
                missing.retain(|m| authorities.iter().any(|a| a.matches_keyid(m)));
615
616
617
618
619
                missing
            }
        }
    }

620
    /// Load authority certificates from our local cache.
621
    async fn load(&mut self, config: &NetDirConfig, store: &Mutex<SqliteStore>) -> Result<()> {
622
        let missing = self.missing_certs(config);
623
624

        let newcerts = {
625
            let r = store.lock().await;
626
627
628
629
630
631
            r.authcerts(&missing[..])?
        };

        for c in newcerts.values() {
            let cert = AuthCert::parse(c)?.check_signature()?;
            if let Ok(cert) = cert.check_valid_now() {
632
                // XXXX-A1: Complain if we find a cert we didn't want. That's a bug.
633
634
635
636
                self.certs.push(cert);
            }
        }

637
        self.prune_certs(config);
638
639
640
641

        Ok(())
    }

642
643
644
    /// Try to fetch authority certificates from the network.
    ///
    /// Retry if we couldn't get enough certs to validate the consensus.
645
    async fn fetch_certs(
646
        &mut self,
647
        config: &NetDirConfig,
648
        store: &Mutex<SqliteStore>,
649
        info: DirInfo<'_>,
650
651
        circmgr: Arc<CircMgr>,
    ) -> Result<()> {
652
        // XXXX make this configurable
653
        // XXXX-A1 add a "keep trying forever" option for when we have no consensus.
654
655
656
        let n_retries = 3_u32;
        let mut retry_delay = RetryDelay::default();

657
        let mut errors = RetryError::while_doing("downloading authority certificates");
658
659
660
        for _ in 0..n_retries {
            let cm = Arc::clone(&circmgr);
            if let Err(e) = self.fetch_certs_once(config, store, info, cm).await {
661
                errors.push(e);
662
663
664
665
666
667
668
669
670
671
            }

            if self.missing_certs(config).is_empty() {
                // We have enough certificates to validate the consensus.
                return Ok(());
            }
            let delay = retry_delay.next_delay(&mut rand::thread_rng());
            tor_rtcompat::task::sleep(delay).await;
        }

672
        Err(errors.into())
673
674
675
    }

    /// Try to fetch authority certificates from the network.
676
    async fn fetch_certs_once(
677
678
        &mut self,
        config: &NetDirConfig,
679
        store: &Mutex<SqliteStore>,
680
        info: DirInfo<'_>,
681
682
        circmgr: Arc<CircMgr>,
    ) -> Result<()> {
683
        let missing = self.missing_certs(config);
684
685
686
687
688
689
690
691
692
        if missing.is_empty() {
            return Ok(());
        }

        let mut resource = tor_dirclient::request::AuthCertRequest::new();
        for m in missing.iter() {
            resource.push(m.clone());
        }

693
694
        let response = tor_dirclient::get_resource(resource, info, circmgr).await?;
        let text = response.output();
695
        // XXXX-A1 In some of the below error cases we should retire the circuit
696
        // to the cache that gave us this stuff.
697
698
699
700

        let mut newcerts = Vec::new();
        for cert in AuthCert::parse_multiple(&text) {
            if let Ok(parsed) = cert {
701
                let s = parsed.within(&text).unwrap();
702
703
704
705
706
707
                if let Ok(wellsigned) = parsed.check_signature() {
                    if let Ok(timely) = wellsigned.check_valid_now() {
                        newcerts.push((timely, s));
                    }
                }
            }
708
            // XXXX-A1 warn on error.
709
710
711
712
713
        }

        // Throw away any that we didn't ask for.
        self.certs
            .retain(|cert| missing.iter().any(|m| m == cert.key_ids()));
714
        // XXXX-A1 warn on discard.
715
716

        {
717
718
719
720
            let v: Vec<_> = newcerts[..]
                .iter()
                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
                .collect();
721
            let mut w = store.lock().await;
722
            w.store_authcerts(&v[..])?;
723
724
725
726
727
728
729
        }

        for (cert, _) in newcerts {
            self.certs.push(cert);
        }

        // This should be redundant.
730
        self.prune_certs(config);
731
732
733
734

        Ok(())
    }

735
736
    /// If we have enough certificates, check this document and return
    /// a PartialDir.  Otherwise remain in the same state.
737
738
    fn advance(mut self, config: &NetDirConfig) -> Result<NextState<Self, PartialDir>> {
        let missing = self.missing_certs(config);
739
740
741
742
743
744

        if missing.is_empty() {
            // Either we can validate, or we never will.
            let validated = self.consensus.check_signature(&self.certs[..])?;
            Ok(NextState::NewState(PartialDir {
                from_cache: self.from_cache,
745
                consensus_meta: self.consensus_meta,
746
747
748
                dir: PartialNetDir::new(validated),
            }))
        } else {
749
            Ok(NextState::SameState(self))
750
751
752
753
754
        }
    }
}

impl PartialDir {
755
    /// Try to load microdescriptors from our local cache.
756
    async fn load(&mut self, store: &Mutex<SqliteStore>, prev: Option<Arc<NetDir>>) -> Result<()> {
757
        let mark_listed = Some(SystemTime::now()); // XXXX-A1 use validafter, conditionally.
758

759
        load_mds(&mut self.dir, prev, mark_listed, store).await
760
761
    }

762
    /// Try to fetch microdescriptors from the network.
763
764
    ///
    /// Retry if we didn't get enough to build circuits.
765
    async fn fetch_mds(
766
        &mut self,
767
        store: &Mutex<SqliteStore>,
768
        info: DirInfo<'_>,
769
770
        circmgr: Arc<CircMgr>,
    ) -> Result<()> {
771
        // XXXX Make this configurable
772
        // XXXX-A1 add a "keep trying forever" option for when we have no consensus.
773
774
775
        let n_retries = 3_u32;
        let mut retry_delay = RetryDelay::default();

776
        let mut errors = RetryError::while_doing("download microdescriptors");
777
778
779
        for _ in 0..n_retries {
            let cm = Arc::clone(&circmgr);
            if let Err(e) = self.fetch_mds_once(store, info, cm).await {
780
                errors.push(e);
781
782
783
784
785
786
787
788
789
790
            }

            if self.dir.have_enough_paths() {
                // We can build circuits; return!
                return Ok(());
            }
            let delay = retry_delay.next_delay(&mut rand::thread_rng());
            tor_rtcompat::task::sleep(delay).await;
        }

791
        Err(errors.into())
792
793
    }
    /// Try to fetch microdescriptors from the network.
794
    async fn fetch_mds_once(
795
        &mut self,
796
        store: &Mutex<SqliteStore>,
797
        info: DirInfo<'_>,
798
799
        circmgr: Arc<CircMgr>,
    ) -> Result<()> {
800
        let mark_listed = SystemTime::now(); // XXXX-A1 use validafter
801
802
803
804
805
        let missing: Vec<MDDigest> = self.dir.missing_microdescs().map(Clone::clone).collect();
        let mds = download_mds(missing, mark_listed, store, info, circmgr).await?;
        for md in mds {
            self.dir.add_microdesc(md);
        }
806
807
808
        if self.dir.have_enough_paths() {
            // XXXX no need to do this if it was already non-pending.
            // XXXX this calculation is redundant with the one in advance().
809
            let mut w = store.lock().await;
810
            w.mark_consensus_usable(&self.consensus_meta)?;
811
812
            // Expire on getting a valid directory.
            w.expire_all()?;
813
        }
814
815
816
        Ok(())
    }

817
818
    /// If we have enough microdescriptors to build circuits, return a NetDir.
    /// Otherwise, return this same document.
819
    fn advance(self) -> NextState<Self, NetDir> {
820
821
        match self.dir.unwrap_if_sufficient() {
            Ok(netdir) => NextState::NewState(netdir),
822
            Err(partial) => NextState::SameState(PartialDir {
823
                from_cache: self.from_cache,
824
                consensus_meta: self.consensus_meta,
825
826
827
828
829
830
                dir: partial,
            }),
        }
    }
}

831
832
833
834
835
/// Helper to load microdescriptors from the cache and store them into
/// a PartialNetDir.
async fn load_mds(
    doc: &mut PartialNetDir,
    prev: Option<Arc<NetDir>>,
836
    mark_listed: Option<SystemTime>,
837
    store: &Mutex<SqliteStore>,
838
) -> Result<()> {
839
840
841
842
843
844
    let mut loaded = if let Some(ref prev_netdir) = prev {
        doc.fill_from_previous_netdir(prev_netdir.as_ref())
    } else {
        Vec::new()
    };

845
    let microdescs = {
846
        let r = store.lock().await;
847
848
849
850
        r.microdescs(doc.missing_microdescs())?
    };

    for (digest, text) in microdescs.iter() {
851
        let md = Microdesc::parse(text)?; // XXXX-A1 recover from this
852
853
854
855
856
        if md.digest() != digest {
            // whoa! XXXX Log something about this.
            continue;
        }
        if doc.add_microdesc(md) {
857
            loaded.push(digest);
858
859
860
861
        }
    }

    if let Some(when) = mark_listed {
862
        let mut w = store.lock().await;
Nick Mathewson's avatar
Nick Mathewson committed
863
        w.update_microdescs_listed(loaded, when)?;
864
865
866
867
868
    }

    Ok(())
}

869
870
/// Helper to fetch microdescriptors from the network and store them either
/// into a PartialNetDir or a NetDir.
871
async fn download_mds(
872
873
    mut missing: Vec<MDDigest>,
    mark_listed: SystemTime,
874
    store: &Mutex<SqliteStore>,
875
    info: DirInfo<'_>,
876
877
    circmgr: Arc<CircMgr>,
) -> Result<Vec<Microdesc>> {
878
879
880
881
882
883
    missing.sort_unstable();
    if missing.is_empty() {
        return Ok(Vec::new());
    }
    let chunksize: usize = std::cmp::min(500, (missing.len() + 2) / 3);

884
885
886
887
888
889
890
891
892
    let n_parallel_requests = 4; // TODO make this configurable.

    // Now we're going to fetch the descriptors up to 500 at a time,
    // in up to n_parallel_requests requests.

    // TODO: we should maybe exit early if we wind up with a working
    // list.
    // TODO: we should maybe try to keep concurrent requests on
    // separate circuits?
893
894
895
896
897
898
899

    // Break 'missing' into the chunks we're going to fetch.
    // XXXX: I hate having to do all these copies, but otherwise I
    // wind up with lifetime issues.
    let missing: Vec<Vec<_>> = missing[..].chunks(chunksize).map(|s| s.to_vec()).collect();

    let new_mds: Vec<_> = futures::stream::iter(missing.into_iter())
900
901
902
903
904
905
906
907
908
909
910
911
        .map(|chunk| {
            let cm = Arc::clone(&circmgr);
            async move {
                info!("Fetching {} microdescriptors...", chunksize);
                let mut resource = tor_dirclient::request::MicrodescRequest::new();
                for md in chunk.iter() {
                    resource.push(*md);
                }
                let want: HashSet<_> = chunk.iter().collect();

                let res = tor_dirclient::get_resource(resource, info, cm).await;

912
913
914
915
                // Handle fetch errors here
                if let Err(err) = &res {
                    info!("Problem fetching mds: {:?}", err);
                }
916

917
                let mut my_new_mds = Vec::new();
918
919
                if let Ok(response) = res {
                    let text = response.output();
920
                    // XXXX-A1 In some of the below error cases we should
921
922
923
                    // retire the circuit to the cache that gave us
                    // this stuff.

924
925
926
                    for annot in
                        MicrodescReader::new(&text, AllowAnnotations::AnnotationsNotAllowed)
                    {
927
928
929
930
931
932
933
934
935
936
937
938
                        match annot {
                            Ok(anno) => {
                                let txt = anno.within(&text).unwrap().to_string(); //XXXX ugly copy
                                let md = anno.into_microdesc();
                                if want.contains(md.digest()) {
                                    my_new_mds.push((txt, md))
                                } else {
                                    warn!("Received md we did not ask for: {:?}", md.digest())
                                }
                            }
                            Err(err) => {
                                warn!("Problem with annotated md: {:?}", err)
939
                            }
940
941
                        }
                    }
942
                }
943
944
945

                info!("Received {} microdescriptors.", my_new_mds.len());
                my_new_mds
946
            }
947
948
949
950
        })
        .buffer_unordered(n_parallel_requests)
        .collect()
        .await;
951
952
953

    // Now save it to the database
    {
954
        let mut w = store.lock().await;
955
        w.store_microdescs(
956
957
958
959
            new_mds
                .iter()
                .flatten()
                .map(|(txt, md)| (&txt[..], md.digest())),
960
961
            mark_listed,
        )?;
962
963
    }

964
    Ok(new_mds.into_iter().flatten().map(|(_, md)| md).collect())
965
}