relaylist.py 16.8 KB
Newer Older
1
2
3
import copy
from datetime import datetime, timedelta

juga's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11
12
from ..globals import (
    MAX_RECENT_CONSENSUS_COUNT,
13
    MAX_RECENT_PRIORITY_RELAY_COUNT,
14
    MAX_RECENT_PRIORITY_LIST_COUNT,
15
16
17
    MEASUREMENTS_PERIOD
)
from ..util import timestamp, timestamps
18

19
20
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
21

22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


38
class Relay:
39
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
40
41
42
43
44
45
46
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
47
48
49

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
50
51
52
53
54
55
56
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
57
58
            try:
                self._ns = cont.get_network_status(fp, default=None)
59
            except (DescriptorUnavailable, ControllerError) as e:
60
                log.exception("Exception trying to get ns %s", e)
61
                self._ns = None
62
63
64
65
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
66
67
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
68
            except (DescriptorUnavailable, ControllerError) as e:
juga's avatar
juga committed
69
                log.exception("Exception trying to get desc %s", e)
70
71
72
73
        self.relay_in_recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT
        )
        self.update_relay_in_recent_consensus()
74
75
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
76
77
78
        self.relay_recent_priority_list = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_LIST_COUNT
        )
79
80
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
81
82
83
        self.relay_recent_measurement_attempt = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_LIST_COUNT
        )
84
85
86
87

    def _from_desc(self, attr):
        if not self._desc:
            return None
88
        return getattr(self._desc, attr, None)
89
90
91
92

    def _from_ns(self, attr):
        if not self._ns:
            return None
93
        return getattr(self._ns, attr, None)
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

juga's avatar
juga committed
115
116
117
118
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

119
120
121
122
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

123
    @property
juga's avatar
juga committed
124
    def consensus_bandwidth(self):
125
126
127
128
129
130
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
131

juga's avatar
juga committed
132
133
134
135
136
137
138
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

139
140
141
    @property
    def address(self):
        return self._from_ns('address')
juga's avatar
juga committed
142

143
    @property
juga's avatar
juga committed
144
    def master_key_ed25519(self):
145
146
147
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga's avatar
juga committed
148
149
                  trailing '='s.

150
        """
juga's avatar
juga committed
151
152
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
153
154
155
156
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga's avatar
juga committed
157

158
159
160
161
162
163
164
165
166
167
168
169
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
170
        return self.relay_in_recent_consensus.last()
171

172
173
    def update_relay_in_recent_consensus(self, timestamp=None):
        self.relay_in_recent_consensus.update(timestamp)
174

175
    @property
176
    def relay_in_recent_consensus_count(self):
177
        """Number of times the relay was in a conensus."""
178
        return len(self.relay_in_recent_consensus)
179

180
181
182
183
184
185
186
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
        # is possible that stem's _input_rules is None and raises an exception
        # (#29899):
        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
        #     if not self.exit_policy:
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
        #     return len(self._get_rules())
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
        #     for rule in decompressed_rules:
        # TypeError: 'NoneType' object is not iterable
        # Therefore, catch the exception here.
        try:
            if self.exit_policy:
                return self.exit_policy.can_exit_to(port=port)
        except TypeError:
202
            return False
203
        return False
204

205
206
207
208
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
209

210
    def increment_relay_recent_measurement_attempt(self):
211
212
213
214
215
216
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
217
        self.relay_recent_measurement_attempt.update()
218

219
220
221
222
223
    @property
    def relay_recent_measurement_attempt_count(self):
        return len(self.relay_recent_measurement_attempt)

    def increment_relay_recent_priority_list(self):
224
225
226
227
228
229
230
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
231
232
233
234
235
        self.relay_recent_priority_list.update()

    @property
    def relay_recent_priority_list_count(self):
        return len(self.relay_recent_priority_list)
236

237
238
239
240
241
    def is_old(self):
        """Whether the last consensus seen for this relay is older than the
        measurement period.
        """
        return timestamp.is_old(self.last_consensus_timestamp)
242

243
244
245
246
247
248
249
250
251
252
253
    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
    # descriptors seen for this relay and the timestamp.
    def update_server_descriptor(self, server_descriptor):
        """Update this relay server descriptor (from the consensus."""
        self._desc = server_descriptor

    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
    # router statuses seen for this relay and the timestampt.
    def update_router_status(self, router_status):
        """Update this relay router status (from the consensus)."""
        self._ns = router_status
254

juga's avatar
juga committed
255

Matt Traudt's avatar
Matt Traudt committed
256
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
257
258
259
260
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
261

262
    def __init__(self, args, conf, controller,
263
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
264
        self._controller = controller
265
        self.rng = random.SystemRandom()
266
        self._refresh_lock = Lock()
267
        # To track all the consensus seen.
268
269
270
        self._recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus"
        )
271
272
273
274
275
276
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
277
278
279
280
        self._recent_measurement_attempt = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_RELAY_COUNT, state,
            "recent_measurement_attempt"
        )
Matt Traudt's avatar
Matt Traudt committed
281
282
        self._refresh()

283
    def _need_refresh(self):
284
285
286
287
288
289
290
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
291
        return self._recent_consensus.last()
292

Matt Traudt's avatar
Matt Traudt committed
293
294
    @property
    def relays(self):
295
296
297
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
298
299
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
300
301
302
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
303
304
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
305
306
307
308
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
309
                    log.debug('Yup we need to refresh our relays. Doing so.')
310
                    self._refresh()
311
312
313
314
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
315
316
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
317
318
319
320
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
321
322
323
324
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

325
326
    @property
    def bad_exits(self):
327
        return self._relays_with_flag(Flag.BADEXIT)
328

329
330
331
332
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
333
334
335
336
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

337
338
339
340
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

juga's avatar
juga committed
341
342
343
344
345
346
347
    @property
    def relays_fingerprints(self):
        # Using relays instead of _relays, so that the list get updated if
        # needed, since this method is used to know which fingerprints are in
        # the consensus.
        return [r.fingerprint for r in self.relays]

Matt Traudt's avatar
Matt Traudt committed
348
    def random_relay(self):
349
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
350

Matt Traudt's avatar
Matt Traudt committed
351
    def _relays_with_flag(self, flag):
352
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
353

Matt Traudt's avatar
Matt Traudt committed
354
    def _relays_without_flag(self, flag):
355
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
356

Matt Traudt's avatar
Matt Traudt committed
357
    def _init_relays(self):
358
359
360
361
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
362
        c = self._controller
363
364
365
366
367
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
368
369
        log.debug("Number of relays in the current consensus: %d.",
                  len(new_relays_dict))
370
371
372

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
373
        self._recent_consensus.update(timestamp)
374

375
        new_relays = []
376
377
378
379
380

        # Only or debugging, count the relays that are not in the current
        # consensus and have not been seen in the last consensuses either.
        num_old_relays = 0

381
382
383
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
384
385
386
387
388
                # If a relay in the previous consensus and is in the current
                # one, update its timestamp, router status and descriptor.
                fp = r.fingerprint
                # new_relays_dict[fp] is the router status.
                r.update_router_status(new_relays_dict[fp])
389
                r.update_relay_in_recent_consensus(timestamp)
390
391
392
393
394
395
                try:
                    descriptor = c.get_server_descriptor(fp, default=None)
                except (DescriptorUnavailable, ControllerError) as e:
                    log.exception("Exception trying to get desc %s", e)
                r.update_server_descriptor(descriptor)
                # Add it to the new list of relays.
396
                new_relays.append(r)
397
398
399
                # And remove it from the new consensus dict, as it has
                # already added to the new list.
                new_relays_dict.pop(fp)
400

401
402
403
            # If the relay is not in the current consensus but is not "old"
            # yet, add it to the new list of relays too, though its timestamp,
            # router status and descriptor can't be updated.
404
            elif not r.is_old():
405
406
407
408
409
410
411
                new_relays.append(r)
            # Otherwise, don't add it to the new list of relays.
            # For debugging, count the old relays that will be discarded.
            else:
                num_old_relays += 1

        # Finally, add the relays that were not in the previous consensus
412
413
414
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
415

416
        days = self._measurements_period / (60 * 60 * 24)
417
418
419
420
        log.debug("Previous number of relays being measured %d",
                  len(self._relays))
        log.debug("Number of relays not in the in the consensus in the last "
                  "%d days: %d.",
421
                  days, num_old_relays)
422
423
        log.debug("Number of relays to measure with the current consensus: "
                  "%d", len(new_relays))
424
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
425
426

    def _refresh(self):
427
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
428
        self._relays = self._init_relays()
429

430
431
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
432
                 self.recent_consensus_count)
433

434
    @property
435
    def recent_consensus_count(self):
436
        """Number of times a new consensus was obtained."""
437
        return len(self._recent_consensus)
438

439
440
441
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
442

443
    def increment_recent_measurement_attempt(self):
444
445
446
447
448
449
450
451
452
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
453
454
455
456
457
        self._recent_measurement_attempt.update()

    @property
    def recent_measurement_attempt_count(self):
        return len(self._recent_measurement_attempt)