relaylist.py 16.2 KB
Newer Older
1
2
3
import copy
from datetime import datetime, timedelta

juga's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11
12
from ..globals import MEASUREMENTS_PERIOD

13
14
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
15

16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def remove_old_consensus_timestamps(
        consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
    """
    Remove the consensus timestamps that are older than period for which
    the measurements are keep from a list of consensus_timestamps.

    :param list consensus_timestamps:
    :param int measurements_period:
    :returns list: a new list of ``consensus_timestamps``
    """
    oldest_date = datetime.utcnow() - timedelta(measurements_period)
    new_consensus_timestamps = \
        [t for t in consensus_timestamps if t >= oldest_date]
    return new_consensus_timestamps


def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


48
class Relay:
49
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
50
51
52
53
54
55
56
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
57
58
59

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
60
61
62
63
64
65
66
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
67
68
            try:
                self._ns = cont.get_network_status(fp, default=None)
69
            except (DescriptorUnavailable, ControllerError) as e:
70
                log.exception("Exception trying to get ns %s", e)
71
                self._ns = None
72
73
74
75
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
76
77
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
78
            except (DescriptorUnavailable, ControllerError) as e:
juga's avatar
juga committed
79
                log.exception("Exception trying to get desc %s", e)
80
81
        self._consensus_timestamps = []
        self._add_consensus_timestamp(timestamp)
82
83
84
85
86
87
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
        self.relay_recent_priority_list_count = 0
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
        self.relay_recent_measurement_attempt_count = 0
88
89
90
91

    def _from_desc(self, attr):
        if not self._desc:
            return None
92
        return getattr(self._desc, attr, None)
93
94
95
96

    def _from_ns(self, attr):
        if not self._ns:
            return None
97
        return getattr(self._ns, attr, None)
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

juga's avatar
juga committed
119
120
121
122
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

123
124
125
126
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

127
    @property
juga's avatar
juga committed
128
    def consensus_bandwidth(self):
129
130
131
132
133
134
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
135

juga's avatar
juga committed
136
137
138
139
140
141
142
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

143
144
145
    @property
    def address(self):
        return self._from_ns('address')
juga's avatar
juga committed
146

147
    @property
juga's avatar
juga committed
148
    def master_key_ed25519(self):
149
150
151
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga's avatar
juga committed
152
153
                  trailing '='s.

154
        """
juga's avatar
juga committed
155
156
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
157
158
159
160
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga's avatar
juga committed
161

162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
        if len(self._consensus_timestamps) >= 1:
            return self._consensus_timestamps[-1]
        return None

    def _add_consensus_timestamp(self, timestamp=None):
        """Add the consensus timestamp in which this relay is present.
        """
        # It is possible to access to the relay's consensensus Valid-After
        if self.consensus_valid_after is not None:
            # The consensus timestamp list was initialized.
            if self.last_consensus_timestamp is not None:
                # Valid-After is more recent than the most recent stored
                # consensus timestamp.
                if self.consensus_valid_after > self.last_consensus_timestamp:
                    # Add Valid-After
                    self._consensus_timestamps.append(
                        self.consensus_valid_after
                        )
            # The consensus timestamp list was not initialized.
            else:
                # Add Valid-After
                self._consensus_timestamps.append(self.consensus_valid_after)
        # If there was already a list the timestamp arg is more recent than
        # the most recent timestamp stored,
        elif (self.last_consensus_timestamp is not None
              and timestamp > self.last_consensus_timestamp):
            # Add the arg timestamp.
            self._consensus_timestamps.append(timestamp)
        # In any other case
        else:
            # Add the current datetime
            self._consensus_timestamps.append(
                datetime.utcnow().replace(microsecond=0))

    def _remove_old_consensus_timestamps(
            self, measurements_period=MEASUREMENTS_PERIOD):
        self._consensus_timestamps = \
            remove_old_consensus_timestamps(
                copy.deepcopy(self._consensus_timestamps, measurements_period)
                )

    def update_consensus_timestamps(self, timestamp=None):
        self._add_consensus_timestamp(timestamp)
        self._remove_old_consensus_timestamps()

219
    @property
220
    def relay_in_recent_consensus_count(self):
221
222
223
        """Number of times the relay was in a conensus."""
        return len(self._consensus_timestamps)

224
225
226
227
228
229
230
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
231
232
        if not self.exit_policy:
            return False
233
234
        return self.exit_policy.can_exit_to(port=port)

235
236
237
238
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
239

240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
    def increment_relay_recent_measurement_attempt_count(self):
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_measurement_attempt_count is None:
            self.relay_recent_measurement_attempt_count = 0
        self.relay_recent_measurement_attempt_count += 1

    def increment_relay_recent_priority_list_count(self):
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_priority_list_count is None:
            self.relay_recent_priority_list_count = 0
        self.relay_recent_priority_list_count += 1

juga's avatar
juga committed
264

Matt Traudt's avatar
Matt Traudt committed
265
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
266
267
268
269
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
270

271
    def __init__(self, args, conf, controller,
272
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
273
        self._controller = controller
274
        self.rng = random.SystemRandom()
275
        self._refresh_lock = Lock()
276
277
278
279
280
281
282
283
        # To track all the consensus seen.
        self._consensus_timestamps = []
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
284
        self._state = state
285
286
287
288
289
        # NOTE: blocking: writes to disk
        if self._state:
            if self._state.get('recent_measurement_attempt_count', None) \
                    is None:
                self._state['recent_measurement_attempt_count'] = 0
Matt Traudt's avatar
Matt Traudt committed
290
291
        self._refresh()

292
    def _need_refresh(self):
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
        if (getattr(self, "_consensus_timestamps")
                and self._consensus_timestamps):
            return self._consensus_timestamps[-1]
        # If the object was not created from __init__, it won't have
        # consensus_timestamps attribute or it might be empty.
        # In this case force new update.
        # Anytime more than 1h in the past will be old.
        self._consensus_timestamps = []
        return datetime.utcnow() - timedelta(seconds=60*61)
309

Matt Traudt's avatar
Matt Traudt committed
310
311
    @property
    def relays(self):
312
313
314
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
315
316
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
317
318
319
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
320
321
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
322
323
324
325
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
326
                    log.debug('Yup we need to refresh our relays. Doing so.')
327
                    self._refresh()
328
329
330
331
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
332
333
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
334
335
336
337
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
338
339
340
341
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

342
343
    @property
    def bad_exits(self):
344
        return self._relays_with_flag(Flag.BADEXIT)
345

346
347
348
349
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
350
351
352
353
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

354
355
356
357
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

Matt Traudt's avatar
Matt Traudt committed
358
    def random_relay(self):
359
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
360

Matt Traudt's avatar
Matt Traudt committed
361
    def _relays_with_flag(self, flag):
362
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
363

Matt Traudt's avatar
Matt Traudt committed
364
    def _relays_without_flag(self, flag):
365
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
366

367
368
369
370
371
372
    def _remove_old_consensus_timestamps(self):
        self._consensus_timestamps = remove_old_consensus_timestamps(
            copy.deepcopy(self._consensus_timestamps),
            self._measurements_period
            )

Matt Traudt's avatar
Matt Traudt committed
373
    def _init_relays(self):
374
375
376
377
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
378
        c = self._controller
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
        self._consensus_timestamps.append(timestamp)
        self._remove_old_consensus_timestamps()
        # Update the relays that were in the previous consensus with the
        # new timestamp
        new_relays = []
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
                r.update_consensus_timestamps(timestamp)
                new_relays_dict.pop(r.fingerprint)
                new_relays.append(r)

        # Add the relays that were not in the previous consensus
        # If there was an relay in some older previous consensus,
        # it won't get stored, so its previous consensuses are lost,
        # but probably this is fine for now to don't make it more complicated.
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
407
408

    def _refresh(self):
409
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
410
        self._relays = self._init_relays()
411

412
413
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
414
                 self.recent_consensus_count)
415
416
        # NOTE: blocking, writes to file!
        if self._state is not None:
417
            self._state['recent_consensus_count'] = self.recent_consensus_count
418

419
    @property
420
    def recent_consensus_count(self):
421
422
423
        """Number of times a new consensus was obtained."""
        return len(self._consensus_timestamps)

424
425
426
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
427
428
429
430
431
432
433
434
435
436
437
438
439

    def increment_recent_measurement_attempt_count(self):
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
        if self._state:
            self._state['recent_measurement_attempt_count'] += 1