relaylist.py 19.5 KB
Newer Older
1
2
3
import copy
from datetime import datetime, timedelta

juga's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11
12
13
14
15
from ..globals import (
    MAX_RECENT_CONSENSUS_COUNT,
    MEASUREMENTS_PERIOD
)
from ..util import timestamp, timestamps
16

17
18
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
19

20
21
22
23
24
25
26
27
28
29
def remove_old_consensus_timestamps(
        consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
    """
    Remove the consensus timestamps that are older than period for which
    the measurements are keep from a list of consensus_timestamps.

    :param list consensus_timestamps:
    :param int measurements_period:
    :returns list: a new list of ``consensus_timestamps``
    """
30
31
32
33
34
    new_consensus_timestamps = [
        t
        for t in consensus_timestamps
        if not timestamp.is_old(t, measurements_period)
    ]
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    return new_consensus_timestamps


def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


54
class Relay:
55
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
56
57
58
59
60
61
62
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
63
64
65

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
66
67
68
69
70
71
72
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
73
74
            try:
                self._ns = cont.get_network_status(fp, default=None)
75
            except (DescriptorUnavailable, ControllerError) as e:
76
                log.exception("Exception trying to get ns %s", e)
77
                self._ns = None
78
79
80
81
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
82
83
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
84
            except (DescriptorUnavailable, ControllerError) as e:
juga's avatar
juga committed
85
                log.exception("Exception trying to get desc %s", e)
86
87
        self._consensus_timestamps = []
        self._add_consensus_timestamp(timestamp)
88
89
90
91
92
93
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
        self.relay_recent_priority_list_count = 0
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
        self.relay_recent_measurement_attempt_count = 0
94
95
96
97

    def _from_desc(self, attr):
        if not self._desc:
            return None
98
        return getattr(self._desc, attr, None)
99
100
101
102

    def _from_ns(self, attr):
        if not self._ns:
            return None
103
        return getattr(self._ns, attr, None)
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

juga's avatar
juga committed
125
126
127
128
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

129
130
131
132
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

133
    @property
juga's avatar
juga committed
134
    def consensus_bandwidth(self):
135
136
137
138
139
140
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
141

juga's avatar
juga committed
142
143
144
145
146
147
148
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

149
150
151
    @property
    def address(self):
        return self._from_ns('address')
juga's avatar
juga committed
152

153
    @property
juga's avatar
juga committed
154
    def master_key_ed25519(self):
155
156
157
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga's avatar
juga committed
158
159
                  trailing '='s.

160
        """
juga's avatar
juga committed
161
162
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
163
164
165
166
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga's avatar
juga committed
167

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
        if len(self._consensus_timestamps) >= 1:
            return self._consensus_timestamps[-1]
        return None

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    def _append_consensus_timestamp_if_later(self, timestamp):
        """Append timestamp to the list of consensus timestamps, if it is later
           than the most recent existing timestamp, or there are no timestamps.
           Should only be called by _add_consensus_timestamp().
           timestamp must not be None, and it must not be zero.
        """
        if not timestamp:
            log.info('Bad timestamp %s, skipping consensus timestamp '
                     'update for  relay %s', timestamp, self.fingerprint)
            return
        # The consensus timestamp list was initialized.
        if self.last_consensus_timestamp is not None:
            # timestamp is more recent than the most recent stored
            # consensus timestamp.
            if timestamp > self.last_consensus_timestamp:
                # Add timestamp
                self._consensus_timestamps.append(timestamp)
        # The consensus timestamp list was not initialized.
        else:
            # Add timestamp
            self._consensus_timestamps.append(timestamp)

206
207
208
209
    def _add_consensus_timestamp(self, timestamp=None):
        """Add the consensus timestamp in which this relay is present.
        """
        # It is possible to access to the relay's consensensus Valid-After
210
        # so believe it, rather than the supplied timestamp
211
        if self.consensus_valid_after is not None:
212
213
214
215
            self._append_consensus_timestamp_if_later(
                self.consensus_valid_after
                )
        elif timestamp:
216
            # Add the arg timestamp.
217
            self._append_consensus_timestamp_if_later(timestamp)
218
219
        # In any other case
        else:
220
221
222
            log.warning('Bad timestamp %s, using current time for consensus '
                        'timestamp update for relay %s',
                        timestamp, self.fingerprint)
223
            # Add the current datetime
224
            self._append_consensus_timestamp_if_later(
225
226
227
228
229
230
                datetime.utcnow().replace(microsecond=0))

    def _remove_old_consensus_timestamps(
            self, measurements_period=MEASUREMENTS_PERIOD):
        self._consensus_timestamps = \
            remove_old_consensus_timestamps(
juga's avatar
juga committed
231
                copy.deepcopy(self._consensus_timestamps), measurements_period
232
233
234
235
236
237
                )

    def update_consensus_timestamps(self, timestamp=None):
        self._add_consensus_timestamp(timestamp)
        self._remove_old_consensus_timestamps()

238
    @property
239
    def relay_in_recent_consensus_count(self):
240
241
242
        """Number of times the relay was in a conensus."""
        return len(self._consensus_timestamps)

243
244
245
246
247
248
249
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
        # is possible that stem's _input_rules is None and raises an exception
        # (#29899):
        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
        #     if not self.exit_policy:
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
        #     return len(self._get_rules())
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
        #     for rule in decompressed_rules:
        # TypeError: 'NoneType' object is not iterable
        # Therefore, catch the exception here.
        try:
            if self.exit_policy:
                return self.exit_policy.can_exit_to(port=port)
        except TypeError:
265
            return False
266
        return False
267

268
269
270
271
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
272

273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    def increment_relay_recent_measurement_attempt_count(self):
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_measurement_attempt_count is None:
            self.relay_recent_measurement_attempt_count = 0
        self.relay_recent_measurement_attempt_count += 1

    def increment_relay_recent_priority_list_count(self):
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_priority_list_count is None:
            self.relay_recent_priority_list_count = 0
        self.relay_recent_priority_list_count += 1

297
298
299
300
301
    def is_old(self):
        """Whether the last consensus seen for this relay is older than the
        measurement period.
        """
        return timestamp.is_old(self.last_consensus_timestamp)
302

303
304
305
306
307
308
309
310
311
312
313
    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
    # descriptors seen for this relay and the timestamp.
    def update_server_descriptor(self, server_descriptor):
        """Update this relay server descriptor (from the consensus."""
        self._desc = server_descriptor

    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
    # router statuses seen for this relay and the timestampt.
    def update_router_status(self, router_status):
        """Update this relay router status (from the consensus)."""
        self._ns = router_status
314

juga's avatar
juga committed
315

Matt Traudt's avatar
Matt Traudt committed
316
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
317
318
319
320
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
321

322
    def __init__(self, args, conf, controller,
323
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
324
        self._controller = controller
325
        self.rng = random.SystemRandom()
326
        self._refresh_lock = Lock()
327
        # To track all the consensus seen.
328
329
330
        self._recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus"
        )
331
332
333
334
335
336
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
337
        self._state = state
338
339
340
341
342
        # NOTE: blocking: writes to disk
        if self._state:
            if self._state.get('recent_measurement_attempt_count', None) \
                    is None:
                self._state['recent_measurement_attempt_count'] = 0
Matt Traudt's avatar
Matt Traudt committed
343
344
        self._refresh()

345
    def _need_refresh(self):
346
347
348
349
350
351
352
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
353
        return self._recent_consensus.last()
354

Matt Traudt's avatar
Matt Traudt committed
355
356
    @property
    def relays(self):
357
358
359
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
360
361
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
362
363
364
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
365
366
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
367
368
369
370
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
371
                    log.debug('Yup we need to refresh our relays. Doing so.')
372
                    self._refresh()
373
374
375
376
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
377
378
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
379
380
381
382
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
383
384
385
386
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

387
388
    @property
    def bad_exits(self):
389
        return self._relays_with_flag(Flag.BADEXIT)
390

391
392
393
394
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
395
396
397
398
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

399
400
401
402
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

juga's avatar
juga committed
403
404
405
406
407
408
409
    @property
    def relays_fingerprints(self):
        # Using relays instead of _relays, so that the list get updated if
        # needed, since this method is used to know which fingerprints are in
        # the consensus.
        return [r.fingerprint for r in self.relays]

Matt Traudt's avatar
Matt Traudt committed
410
    def random_relay(self):
411
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
412

Matt Traudt's avatar
Matt Traudt committed
413
    def _relays_with_flag(self, flag):
414
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
415

Matt Traudt's avatar
Matt Traudt committed
416
    def _relays_without_flag(self, flag):
417
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
418

Matt Traudt's avatar
Matt Traudt committed
419
    def _init_relays(self):
420
421
422
423
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
424
        c = self._controller
425
426
427
428
429
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
430
431
        log.debug("Number of relays in the current consensus: %d.",
                  len(new_relays_dict))
432
433
434

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
435
        self._recent_consensus.update(timestamp)
436

437
        new_relays = []
438
439
440
441
442

        # Only or debugging, count the relays that are not in the current
        # consensus and have not been seen in the last consensuses either.
        num_old_relays = 0

443
444
445
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
446
447
448
449
450
                # If a relay in the previous consensus and is in the current
                # one, update its timestamp, router status and descriptor.
                fp = r.fingerprint
                # new_relays_dict[fp] is the router status.
                r.update_router_status(new_relays_dict[fp])
451
                r.update_consensus_timestamps(timestamp)
452
453
454
455
456
457
                try:
                    descriptor = c.get_server_descriptor(fp, default=None)
                except (DescriptorUnavailable, ControllerError) as e:
                    log.exception("Exception trying to get desc %s", e)
                r.update_server_descriptor(descriptor)
                # Add it to the new list of relays.
458
                new_relays.append(r)
459
460
461
                # And remove it from the new consensus dict, as it has
                # already added to the new list.
                new_relays_dict.pop(fp)
462

463
464
465
            # If the relay is not in the current consensus but is not "old"
            # yet, add it to the new list of relays too, though its timestamp,
            # router status and descriptor can't be updated.
466
            elif not r.is_old():
467
468
469
470
471
472
473
                new_relays.append(r)
            # Otherwise, don't add it to the new list of relays.
            # For debugging, count the old relays that will be discarded.
            else:
                num_old_relays += 1

        # Finally, add the relays that were not in the previous consensus
474
475
476
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
477

478
        days = self._measurements_period / (60 * 60 * 24)
479
480
481
482
        log.debug("Previous number of relays being measured %d",
                  len(self._relays))
        log.debug("Number of relays not in the in the consensus in the last "
                  "%d days: %d.",
483
                  days, num_old_relays)
484
485
        log.debug("Number of relays to measure with the current consensus: "
                  "%d", len(new_relays))
486
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
487
488

    def _refresh(self):
489
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
490
        self._relays = self._init_relays()
491

492
493
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
494
                 self.recent_consensus_count)
495

496
    @property
497
    def recent_consensus_count(self):
498
        """Number of times a new consensus was obtained."""
499
        return len(self._recent_consensus)
500

501
502
503
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
504
505
506
507
508
509
510
511
512
513
514
515
516

    def increment_recent_measurement_attempt_count(self):
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
        if self._state:
            self._state['recent_measurement_attempt_count'] += 1