relaylist.py 18.8 KB
Newer Older
1
2
3
import copy
from datetime import datetime, timedelta

juga  's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11
12
from ..globals import (
    MAX_RECENT_CONSENSUS_COUNT,
13
    MAX_RECENT_PRIORITY_RELAY_COUNT,
14
    MAX_RECENT_PRIORITY_LIST_COUNT,
15
16
17
    MEASUREMENTS_PERIOD
)
from ..util import timestamp, timestamps
18

19
20
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


38
class Relay:
39
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
40
41
42
43
44
45
46
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
47
48
49

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
50
51
52
53
54
55
56
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
57
58
            try:
                self._ns = cont.get_network_status(fp, default=None)
59
            except (DescriptorUnavailable, ControllerError) as e:
60
                log.exception("Exception trying to get ns %s", e)
61
                self._ns = None
62
63
64
65
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
66
67
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
68
            except (DescriptorUnavailable, ControllerError) as e:
juga  's avatar
juga committed
69
                log.exception("Exception trying to get desc %s", e)
70
71
72
73
        self.relay_in_recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT
        )
        self.update_relay_in_recent_consensus()
74
75
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
76
77
78
        self.relay_recent_priority_list = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_LIST_COUNT
        )
79
80
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
81
82
83
        self.relay_recent_measurement_attempt = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_LIST_COUNT
        )
84
85
86
87

    def _from_desc(self, attr):
        if not self._desc:
            return None
88
        return getattr(self._desc, attr, None)
89
90
91
92

    def _from_ns(self, attr):
        if not self._ns:
            return None
93
        return getattr(self._ns, attr, None)
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

115
116
117
118
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

119
120
121
122
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

123
    @property
124
    def consensus_bandwidth(self):
125
126
127
128
129
130
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
131

132
133
134
135
136
137
138
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

139
140
141
    @property
    def address(self):
        return self._from_ns('address')
juga  's avatar
juga committed
142

143
    @property
juga  's avatar
juga committed
144
    def master_key_ed25519(self):
145
146
147
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga  's avatar
juga committed
148
149
                  trailing '='s.

150
        """
juga  's avatar
juga committed
151
152
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
153
154
155
156
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga  's avatar
juga committed
157

158
159
160
161
162
163
164
165
166
167
168
169
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
170
        return self.relay_in_recent_consensus.last()
171

172
173
    def update_relay_in_recent_consensus(self, timestamp=None):
        self.relay_in_recent_consensus.update(timestamp)
174

175
    @property
176
    def relay_in_recent_consensus_count(self):
177
        """Number of times the relay was in a conensus."""
178
        return len(self.relay_in_recent_consensus)
179

180
181
182
183
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
184
185
186
187

        The exits that are IPv6 only or IPv4 but rejecting some public networks
        will return false.
        On July 2020, there were 67 out of 1095 exits like this.
188
189
190
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
191
192
193
194
195
196
197
198
199
200
201
202
203
        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
        # is possible that stem's _input_rules is None and raises an exception
        # (#29899):
        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
        #     if not self.exit_policy:
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
        #     return len(self._get_rules())
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
        #     for rule in decompressed_rules:
        # TypeError: 'NoneType' object is not iterable
        # Therefore, catch the exception here.
        try:
            if self.exit_policy:
204
205
                # Using `strict` to ensure it can exit to ALL domains
                # and ips and that port. See #40006.
206
207
208
209
210
211
                # Using `strip_private` to ignore reject rules to private
                # networks.
                return (
                    self.exit_policy.strip_private()
                    .can_exit_to(port=port, strict=True)
                )
212
        except TypeError:
213
            return False
214
        return False
215

216
217
218
219
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
220

221
    def increment_relay_recent_measurement_attempt(self):
222
223
224
225
226
227
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
228
        self.relay_recent_measurement_attempt.update()
229

230
231
232
233
234
    @property
    def relay_recent_measurement_attempt_count(self):
        return len(self.relay_recent_measurement_attempt)

    def increment_relay_recent_priority_list(self):
235
236
237
238
239
240
241
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
242
243
244
245
246
        self.relay_recent_priority_list.update()

    @property
    def relay_recent_priority_list_count(self):
        return len(self.relay_recent_priority_list)
247

248
249
250
251
252
    def is_old(self):
        """Whether the last consensus seen for this relay is older than the
        measurement period.
        """
        return timestamp.is_old(self.last_consensus_timestamp)
253

254
255
256
257
258
259
260
261
262
263
264
    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
    # descriptors seen for this relay and the timestamp.
    def update_server_descriptor(self, server_descriptor):
        """Update this relay server descriptor (from the consensus."""
        self._desc = server_descriptor

    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
    # router statuses seen for this relay and the timestampt.
    def update_router_status(self, router_status):
        """Update this relay router status (from the consensus)."""
        self._ns = router_status
265

juga  's avatar
juga committed
266

Matt Traudt's avatar
Matt Traudt committed
267
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
268
269
270
271
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
272

273
    def __init__(self, args, conf, controller,
274
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
275
        self._controller = controller
276
        self.rng = random.SystemRandom()
277
        self._refresh_lock = Lock()
278
        # To track all the consensus seen.
279
280
281
        self._recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus"
        )
282
283
284
285
286
287
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
288
289
290
291
        self._recent_measurement_attempt = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_RELAY_COUNT, state,
            "recent_measurement_attempt"
        )
292
293
294
        # Start with 0 for the min bw for our second hops
        self._exit_min_bw = 0
        self._non_exit_min_bw = 0
Matt Traudt's avatar
Matt Traudt committed
295
296
        self._refresh()

297
    def _need_refresh(self):
298
299
300
301
302
303
304
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
305
        return self._recent_consensus.last()
306

Matt Traudt's avatar
Matt Traudt committed
307
308
    @property
    def relays(self):
309
310
311
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
312
313
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
314
315
316
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
317
318
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
319
320
321
322
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
323
                    log.debug('Yup we need to refresh our relays. Doing so.')
324
                    self._refresh()
325
326
327
328
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
329
330
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
331
332
333
334
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
335
336
337
338
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

339
340
    @property
    def bad_exits(self):
341
        return self._relays_with_flag(Flag.BADEXIT)
342

343
344
345
346
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
347
348
349
350
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

351
352
353
354
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

355
356
357
358
359
360
361
    @property
    def relays_fingerprints(self):
        # Using relays instead of _relays, so that the list get updated if
        # needed, since this method is used to know which fingerprints are in
        # the consensus.
        return [r.fingerprint for r in self.relays]

Matt Traudt's avatar
Matt Traudt committed
362
    def random_relay(self):
363
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
364

Matt Traudt's avatar
Matt Traudt committed
365
    def _relays_with_flag(self, flag):
366
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
367

Matt Traudt's avatar
Matt Traudt committed
368
    def _relays_without_flag(self, flag):
369
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
370

Matt Traudt's avatar
Matt Traudt committed
371
    def _init_relays(self):
372
373
374
375
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
376
        c = self._controller
377
378
379
380
381
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
382
383
        log.debug("Number of relays in the current consensus: %d.",
                  len(new_relays_dict))
384
385
386

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
387
        self._recent_consensus.update(timestamp)
388

389
        new_relays = []
390
391
392
393
394

        # Only or debugging, count the relays that are not in the current
        # consensus and have not been seen in the last consensuses either.
        num_old_relays = 0

395
396
397
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
398
399
400
401
402
                # If a relay in the previous consensus and is in the current
                # one, update its timestamp, router status and descriptor.
                fp = r.fingerprint
                # new_relays_dict[fp] is the router status.
                r.update_router_status(new_relays_dict[fp])
403
                r.update_relay_in_recent_consensus(timestamp)
404
405
406
407
408
409
                try:
                    descriptor = c.get_server_descriptor(fp, default=None)
                except (DescriptorUnavailable, ControllerError) as e:
                    log.exception("Exception trying to get desc %s", e)
                r.update_server_descriptor(descriptor)
                # Add it to the new list of relays.
410
                new_relays.append(r)
411
412
413
                # And remove it from the new consensus dict, as it has
                # already added to the new list.
                new_relays_dict.pop(fp)
414

415
416
417
            # If the relay is not in the current consensus but is not "old"
            # yet, add it to the new list of relays too, though its timestamp,
            # router status and descriptor can't be updated.
418
            elif not r.is_old():
419
420
421
422
423
424
425
                new_relays.append(r)
            # Otherwise, don't add it to the new list of relays.
            # For debugging, count the old relays that will be discarded.
            else:
                num_old_relays += 1

        # Finally, add the relays that were not in the previous consensus
426
427
428
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
429

430
        days = self._measurements_period / (60 * 60 * 24)
431
432
433
434
        log.debug("Previous number of relays being measured %d",
                  len(self._relays))
        log.debug("Number of relays not in the in the consensus in the last "
                  "%d days: %d.",
435
                  days, num_old_relays)
436
437
        log.debug("Number of relays to measure with the current consensus: "
                  "%d", len(new_relays))
438
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
439
440

    def _refresh(self):
441
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
442
        self._relays = self._init_relays()
443

444
445
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
446
                 self.recent_consensus_count)
447

448
449
450
451
        # Calculate minimum bandwidth value for 2nd hop after we refreshed
        # our available relays.
        self._calculate_min_bw_second_hop()

452
    @property
453
    def recent_consensus_count(self):
454
        """Number of times a new consensus was obtained."""
455
        return len(self._recent_consensus)
456

457
458
459
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
460

461
    def increment_recent_measurement_attempt(self):
462
463
464
465
466
467
468
469
470
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
471
472
473
474
475
        self._recent_measurement_attempt.update()

    @property
    def recent_measurement_attempt_count(self):
        return len(self._recent_measurement_attempt)
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503

    def _calculate_min_bw_second_hop(self):
        """
        Calculates the minimum bandwidth for both exit and non-exit relays
        chosen as a second hop by picking the lowest bandwidth value available
        from the top 75% of the respective category.
        """
        # Sort our sets of candidates according to bw, lowest amount first.
        # It's okay to keep things simple for the calculation and go over all
        # exits, including badexits.
        exit_candidates = sorted(self.exits,
                                 key=lambda r: r.consensus_bandwidth)
        non_exit_candidates = sorted(self.non_exits,
                                     key=lambda r: r.consensus_bandwidth)
        # We know the bandwidth is sorted from least to most. Dividing the
        # length of the available relays by 4 gives us the position of the
        # relay with the lowest bandwidth from the top 75%. We do this both
        # for our exit and non-exit candidates.
        pos = int(len(exit_candidates)/4)
        self._exit_min_bw = exit_candidates[pos].consensus_bandwidth
        pos = int(len(non_exit_candidates)/4)
        self._non_exit_min_bw = non_exit_candidates[pos].consensus_bandwidth

    def exit_min_bw(self):
        return self._exit_min_bw

    def non_exit_min_bw(self):
        return self._non_exit_min_bw