relaylist.py 20.2 KB
Newer Older
1
2
3
import copy
from datetime import datetime, timedelta

juga's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11
from ..globals import MEASUREMENTS_PERIOD
12
from ..util import timestamp
13

14
15
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def remove_old_consensus_timestamps(
        consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
    """
    Remove the consensus timestamps that are older than period for which
    the measurements are keep from a list of consensus_timestamps.

    :param list consensus_timestamps:
    :param int measurements_period:
    :returns list: a new list of ``consensus_timestamps``
    """
    oldest_date = datetime.utcnow() - timedelta(measurements_period)
    new_consensus_timestamps = \
        [t for t in consensus_timestamps if t >= oldest_date]
    return new_consensus_timestamps


def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


49
class Relay:
50
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
51
52
53
54
55
56
57
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
58
59
60

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
61
62
63
64
65
66
67
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
68
69
            try:
                self._ns = cont.get_network_status(fp, default=None)
70
            except (DescriptorUnavailable, ControllerError) as e:
71
                log.exception("Exception trying to get ns %s", e)
72
                self._ns = None
73
74
75
76
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
77
78
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
79
            except (DescriptorUnavailable, ControllerError) as e:
juga's avatar
juga committed
80
                log.exception("Exception trying to get desc %s", e)
81
82
        self._consensus_timestamps = []
        self._add_consensus_timestamp(timestamp)
83
84
85
86
87
88
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
        self.relay_recent_priority_list_count = 0
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
        self.relay_recent_measurement_attempt_count = 0
89
90
91
92

    def _from_desc(self, attr):
        if not self._desc:
            return None
93
        return getattr(self._desc, attr, None)
94
95
96
97

    def _from_ns(self, attr):
        if not self._ns:
            return None
98
        return getattr(self._ns, attr, None)
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

juga's avatar
juga committed
120
121
122
123
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

124
125
126
127
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

128
    @property
juga's avatar
juga committed
129
    def consensus_bandwidth(self):
130
131
132
133
134
135
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
136

juga's avatar
juga committed
137
138
139
140
141
142
143
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

144
145
146
    @property
    def address(self):
        return self._from_ns('address')
juga's avatar
juga committed
147

148
    @property
juga's avatar
juga committed
149
    def master_key_ed25519(self):
150
151
152
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga's avatar
juga committed
153
154
                  trailing '='s.

155
        """
juga's avatar
juga committed
156
157
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
158
159
160
161
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga's avatar
juga committed
162

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
        if len(self._consensus_timestamps) >= 1:
            return self._consensus_timestamps[-1]
        return None

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    def _append_consensus_timestamp_if_later(self, timestamp):
        """Append timestamp to the list of consensus timestamps, if it is later
           than the most recent existing timestamp, or there are no timestamps.
           Should only be called by _add_consensus_timestamp().
           timestamp must not be None, and it must not be zero.
        """
        if not timestamp:
            log.info('Bad timestamp %s, skipping consensus timestamp '
                     'update for  relay %s', timestamp, self.fingerprint)
            return
        # The consensus timestamp list was initialized.
        if self.last_consensus_timestamp is not None:
            # timestamp is more recent than the most recent stored
            # consensus timestamp.
            if timestamp > self.last_consensus_timestamp:
                # Add timestamp
                self._consensus_timestamps.append(timestamp)
        # The consensus timestamp list was not initialized.
        else:
            # Add timestamp
            self._consensus_timestamps.append(timestamp)

201
202
203
204
    def _add_consensus_timestamp(self, timestamp=None):
        """Add the consensus timestamp in which this relay is present.
        """
        # It is possible to access to the relay's consensensus Valid-After
205
        # so believe it, rather than the supplied timestamp
206
        if self.consensus_valid_after is not None:
207
208
209
210
            self._append_consensus_timestamp_if_later(
                self.consensus_valid_after
                )
        elif timestamp:
211
            # Add the arg timestamp.
212
            self._append_consensus_timestamp_if_later(timestamp)
213
214
        # In any other case
        else:
215
216
217
            log.warning('Bad timestamp %s, using current time for consensus '
                        'timestamp update for relay %s',
                        timestamp, self.fingerprint)
218
            # Add the current datetime
219
            self._append_consensus_timestamp_if_later(
220
221
222
223
224
225
                datetime.utcnow().replace(microsecond=0))

    def _remove_old_consensus_timestamps(
            self, measurements_period=MEASUREMENTS_PERIOD):
        self._consensus_timestamps = \
            remove_old_consensus_timestamps(
juga's avatar
juga committed
226
                copy.deepcopy(self._consensus_timestamps), measurements_period
227
228
229
230
231
232
                )

    def update_consensus_timestamps(self, timestamp=None):
        self._add_consensus_timestamp(timestamp)
        self._remove_old_consensus_timestamps()

233
    @property
234
    def relay_in_recent_consensus_count(self):
235
236
237
        """Number of times the relay was in a conensus."""
        return len(self._consensus_timestamps)

238
239
240
241
242
243
244
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
        # is possible that stem's _input_rules is None and raises an exception
        # (#29899):
        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
        #     if not self.exit_policy:
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
        #     return len(self._get_rules())
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
        #     for rule in decompressed_rules:
        # TypeError: 'NoneType' object is not iterable
        # Therefore, catch the exception here.
        try:
            if self.exit_policy:
                return self.exit_policy.can_exit_to(port=port)
        except TypeError:
260
            return False
261
        return False
262

263
264
265
266
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
267

268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
    def increment_relay_recent_measurement_attempt_count(self):
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_measurement_attempt_count is None:
            self.relay_recent_measurement_attempt_count = 0
        self.relay_recent_measurement_attempt_count += 1

    def increment_relay_recent_priority_list_count(self):
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_priority_list_count is None:
            self.relay_recent_priority_list_count = 0
        self.relay_recent_priority_list_count += 1

292
293
294
295
296
    def is_old(self):
        """Whether the last consensus seen for this relay is older than the
        measurement period.
        """
        return timestamp.is_old(self.last_consensus_timestamp)
297

298
299
300
301
302
303
304
305
306
307
308
    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
    # descriptors seen for this relay and the timestamp.
    def update_server_descriptor(self, server_descriptor):
        """Update this relay server descriptor (from the consensus."""
        self._desc = server_descriptor

    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
    # router statuses seen for this relay and the timestampt.
    def update_router_status(self, router_status):
        """Update this relay router status (from the consensus)."""
        self._ns = router_status
309

juga's avatar
juga committed
310

Matt Traudt's avatar
Matt Traudt committed
311
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
312
313
314
315
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
316

317
    def __init__(self, args, conf, controller,
318
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
319
        self._controller = controller
320
        self.rng = random.SystemRandom()
321
        self._refresh_lock = Lock()
322
323
324
325
326
327
328
329
        # To track all the consensus seen.
        self._consensus_timestamps = []
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
330
        self._state = state
331
332
333
334
335
        # NOTE: blocking: writes to disk
        if self._state:
            if self._state.get('recent_measurement_attempt_count', None) \
                    is None:
                self._state['recent_measurement_attempt_count'] = 0
Matt Traudt's avatar
Matt Traudt committed
336
337
        self._refresh()

338
    def _need_refresh(self):
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
        if (getattr(self, "_consensus_timestamps")
                and self._consensus_timestamps):
            return self._consensus_timestamps[-1]
        # If the object was not created from __init__, it won't have
        # consensus_timestamps attribute or it might be empty.
        # In this case force new update.
        # Anytime more than 1h in the past will be old.
        self._consensus_timestamps = []
        return datetime.utcnow() - timedelta(seconds=60*61)
355

Matt Traudt's avatar
Matt Traudt committed
356
357
    @property
    def relays(self):
358
359
360
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
361
362
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
363
364
365
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
366
367
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
368
369
370
371
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
372
                    log.debug('Yup we need to refresh our relays. Doing so.')
373
                    self._refresh()
374
375
376
377
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
378
379
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
380
381
382
383
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
384
385
386
387
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

388
389
    @property
    def bad_exits(self):
390
        return self._relays_with_flag(Flag.BADEXIT)
391

392
393
394
395
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
396
397
398
399
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

400
401
402
403
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

juga's avatar
juga committed
404
405
406
407
408
409
410
    @property
    def relays_fingerprints(self):
        # Using relays instead of _relays, so that the list get updated if
        # needed, since this method is used to know which fingerprints are in
        # the consensus.
        return [r.fingerprint for r in self.relays]

Matt Traudt's avatar
Matt Traudt committed
411
    def random_relay(self):
412
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
413

Matt Traudt's avatar
Matt Traudt committed
414
    def _relays_with_flag(self, flag):
415
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
416

Matt Traudt's avatar
Matt Traudt committed
417
    def _relays_without_flag(self, flag):
418
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
419

420
421
422
423
424
425
    def _remove_old_consensus_timestamps(self):
        self._consensus_timestamps = remove_old_consensus_timestamps(
            copy.deepcopy(self._consensus_timestamps),
            self._measurements_period
            )

Matt Traudt's avatar
Matt Traudt committed
426
    def _init_relays(self):
427
428
429
430
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
431
        c = self._controller
432
433
434
435
436
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
437
438
        log.debug("Number of relays in the current consensus: %d.",
                  len(new_relays_dict))
439
440
441
442
443

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
        self._consensus_timestamps.append(timestamp)
        self._remove_old_consensus_timestamps()
444

445
        new_relays = []
446
447
448
449
450

        # Only or debugging, count the relays that are not in the current
        # consensus and have not been seen in the last consensuses either.
        num_old_relays = 0

451
452
453
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
454
455
456
                # If a relay in the previous consensus and is in the current
                # one, update its timestamp, router status and descriptor.
                fp = r.fingerprint
457
                r.update_consensus_timestamps(timestamp)
458
459
460
461
462
463
464
465
                # new_relays_dict[fp] is the router status.
                r.update_router_status(new_relays_dict[fp])
                try:
                    descriptor = c.get_server_descriptor(fp, default=None)
                except (DescriptorUnavailable, ControllerError) as e:
                    log.exception("Exception trying to get desc %s", e)
                r.update_server_descriptor(descriptor)
                # Add it to the new list of relays.
466
                new_relays.append(r)
467
468
469
                # And remove it from the new consensus dict, as it has
                # already added to the new list.
                new_relays_dict.pop(fp)
470

471
472
473
            # If the relay is not in the current consensus but is not "old"
            # yet, add it to the new list of relays too, though its timestamp,
            # router status and descriptor can't be updated.
474
            elif not r.is_old():
475
476
477
478
479
480
481
                new_relays.append(r)
            # Otherwise, don't add it to the new list of relays.
            # For debugging, count the old relays that will be discarded.
            else:
                num_old_relays += 1

        # Finally, add the relays that were not in the previous consensus
482
483
484
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
485
486
487
488
489
490
491
492

        log.debug("Previous number of relays being measured %d",
                  len(self._relays))
        log.debug("Number of relays not in the in the consensus in the last "
                  "%d days: %d.",
                  self._measurements_period, num_old_relays)
        log.debug("Number of relays to measure with the current consensus: "
                  "%d", len(new_relays))
493
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
494
495

    def _refresh(self):
496
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
497
        self._relays = self._init_relays()
498

499
500
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
501
                 self.recent_consensus_count)
502
503
        # NOTE: blocking, writes to file!
        if self._state is not None:
504
            self._state['recent_consensus_count'] = self.recent_consensus_count
505

506
    @property
507
    def recent_consensus_count(self):
508
509
510
        """Number of times a new consensus was obtained."""
        return len(self._consensus_timestamps)

511
512
513
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
514
515
516
517
518
519
520
521
522
523
524
525
526

    def increment_recent_measurement_attempt_count(self):
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
        if self._state:
            self._state['recent_measurement_attempt_count'] += 1