GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

relaylist.py 19.5 KB
Newer Older
1 2 3
import copy
from datetime import datetime, timedelta

juga  's avatar
juga committed
4
from stem.descriptor.router_status_entry import RouterStatusEntryV3
5
from stem.descriptor.server_descriptor import ServerDescriptor
6
from stem import Flag, DescriptorUnavailable, ControllerError
Matt Traudt's avatar
Matt Traudt committed
7
import random
8
import logging
9
from threading import Lock
Matt Traudt's avatar
Matt Traudt committed
10

11 12
from ..globals import (
    MAX_RECENT_CONSENSUS_COUNT,
13
    MAX_RECENT_PRIORITY_RELAY_COUNT,
14 15 16
    MEASUREMENTS_PERIOD
)
from ..util import timestamp, timestamps
17

18 19
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
20

21 22 23 24 25 26 27 28 29 30
def remove_old_consensus_timestamps(
        consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
    """
    Remove the consensus timestamps that are older than period for which
    the measurements are keep from a list of consensus_timestamps.

    :param list consensus_timestamps:
    :param int measurements_period:
    :returns list: a new list of ``consensus_timestamps``
    """
31 32 33 34 35
    new_consensus_timestamps = [
        t
        for t in consensus_timestamps
        if not timestamp.is_old(t, measurements_period)
    ]
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
    return new_consensus_timestamps


def valid_after_from_network_statuses(network_statuses):
    """Obtain the consensus Valid-After datetime from the ``document``
    attribute of a ``stem.descriptor.RouterStatusEntryV3``.

    :param list network_statuses:
    returns datetime:
    """
    for ns in network_statuses:
        document = getattr(ns, 'document', None)
        if document:
            valid_after = getattr(document, 'valid_after', None)
            if valid_after:
                return valid_after
    return datetime.utcnow().replace(microsecond=0)


55
class Relay:
56
    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
57 58 59 60 61 62 63
        '''
        Given a relay fingerprint, fetch all the information about a relay that
        sbws currently needs and store it in this class. Acts as an abstraction
        to hide the confusion that is Tor consensus/descriptor stuff.

        :param str fp: fingerprint of the relay.
        :param cont: active and valid stem Tor controller connection
64 65 66

        :param datatime timestamp: the timestamp of a consensus
            (RouterStatusEntryV3) from which this relay has been obtained.
67 68 69 70 71 72 73
        '''
        assert isinstance(fp, str)
        assert len(fp) == 40
        if ns is not None:
            assert isinstance(ns, RouterStatusEntryV3)
            self._ns = ns
        else:
74 75
            try:
                self._ns = cont.get_network_status(fp, default=None)
76
            except (DescriptorUnavailable, ControllerError) as e:
77
                log.exception("Exception trying to get ns %s", e)
78
                self._ns = None
79 80 81 82
        if desc is not None:
            assert isinstance(desc, ServerDescriptor)
            self._desc = desc
        else:
83 84
            try:
                self._desc = cont.get_server_descriptor(fp, default=None)
85
            except (DescriptorUnavailable, ControllerError) as e:
juga  's avatar
juga committed
86
                log.exception("Exception trying to get desc %s", e)
87 88
        self._consensus_timestamps = []
        self._add_consensus_timestamp(timestamp)
89 90 91 92 93 94
        # The number of times that a relay is "prioritized" to be measured.
        # It is incremented in ``RelayPrioritizer.best_priority``
        self.relay_recent_priority_list_count = 0
        # The number of times that a relay has been queued to be measured.
        # It is incremented in ``scanner.main_loop``
        self.relay_recent_measurement_attempt_count = 0
95 96 97 98

    def _from_desc(self, attr):
        if not self._desc:
            return None
99
        return getattr(self._desc, attr, None)
100 101 102 103

    def _from_ns(self, attr):
        if not self._ns:
            return None
104
        return getattr(self._ns, attr, None)
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

    @property
    def nickname(self):
        return self._from_ns('nickname')

    @property
    def fingerprint(self):
        return self._from_ns('fingerprint')

    @property
    def flags(self):
        return self._from_ns('flags')

    @property
    def exit_policy(self):
        return self._from_desc('exit_policy')

    @property
    def average_bandwidth(self):
        return self._from_desc('average_bandwidth')

126 127 128 129
    @property
    def burst_bandwidth(self):
        return self._from_desc('burst_bandwidth')

130 131 132 133
    @property
    def observed_bandwidth(self):
        return self._from_desc('observed_bandwidth')

134
    @property
juga  's avatar
juga committed
135
    def consensus_bandwidth(self):
136 137 138 139 140 141
        """Return the consensus bandwidth in Bytes.

        Consensus bandwidth is the only bandwidth value that is in kilobytes.
        """
        if self._from_ns('bandwidth') is not None:
            return self._from_ns('bandwidth') * 1000
142

juga  's avatar
juga committed
143 144 145 146 147 148 149
    @property
    def consensus_bandwidth_is_unmeasured(self):
        # measured appears only votes, unmeasured appears in consensus
        # therefore is_unmeasured is needed to know whether the bandwidth
        # value in consensus is comming from bwauth measurements or not.
        return self._from_ns('is_unmeasured')

150 151 152
    @property
    def address(self):
        return self._from_ns('address')
juga  's avatar
juga committed
153

154
    @property
juga  's avatar
juga committed
155
    def master_key_ed25519(self):
156 157 158
        """Obtain ed25519 master key of the relay in server descriptors.

        :returns: str, the ed25519 master key base 64 encoded without
juga  's avatar
juga committed
159 160
                  trailing '='s.

161
        """
juga  's avatar
juga committed
162 163
        # Even if this key is called master-key-ed25519 in dir-spec.txt,
        # it seems that stem parses it as ed25519_master_key
164 165 166 167
        key = self._from_desc('ed25519_master_key')
        if key is None:
            return None
        return key.rstrip('=')
juga  's avatar
juga committed
168

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    @property
    def consensus_valid_after(self):
        """Obtain the consensus Valid-After from the document of this relay
        network status.
        """
        network_status_document = self._from_ns('document')
        if network_status_document:
            return getattr(network_status_document, 'valid_after', None)
        return None

    @property
    def last_consensus_timestamp(self):
        if len(self._consensus_timestamps) >= 1:
            return self._consensus_timestamps[-1]
        return None

185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
    def _append_consensus_timestamp_if_later(self, timestamp):
        """Append timestamp to the list of consensus timestamps, if it is later
           than the most recent existing timestamp, or there are no timestamps.
           Should only be called by _add_consensus_timestamp().
           timestamp must not be None, and it must not be zero.
        """
        if not timestamp:
            log.info('Bad timestamp %s, skipping consensus timestamp '
                     'update for  relay %s', timestamp, self.fingerprint)
            return
        # The consensus timestamp list was initialized.
        if self.last_consensus_timestamp is not None:
            # timestamp is more recent than the most recent stored
            # consensus timestamp.
            if timestamp > self.last_consensus_timestamp:
                # Add timestamp
                self._consensus_timestamps.append(timestamp)
        # The consensus timestamp list was not initialized.
        else:
            # Add timestamp
            self._consensus_timestamps.append(timestamp)

207 208 209 210
    def _add_consensus_timestamp(self, timestamp=None):
        """Add the consensus timestamp in which this relay is present.
        """
        # It is possible to access to the relay's consensensus Valid-After
211
        # so believe it, rather than the supplied timestamp
212
        if self.consensus_valid_after is not None:
213 214 215 216
            self._append_consensus_timestamp_if_later(
                self.consensus_valid_after
                )
        elif timestamp:
217
            # Add the arg timestamp.
218
            self._append_consensus_timestamp_if_later(timestamp)
219 220
        # In any other case
        else:
221 222 223
            log.warning('Bad timestamp %s, using current time for consensus '
                        'timestamp update for relay %s',
                        timestamp, self.fingerprint)
224
            # Add the current datetime
225
            self._append_consensus_timestamp_if_later(
226 227 228 229 230 231
                datetime.utcnow().replace(microsecond=0))

    def _remove_old_consensus_timestamps(
            self, measurements_period=MEASUREMENTS_PERIOD):
        self._consensus_timestamps = \
            remove_old_consensus_timestamps(
juga  's avatar
juga committed
232
                copy.deepcopy(self._consensus_timestamps), measurements_period
233 234 235 236 237 238
                )

    def update_consensus_timestamps(self, timestamp=None):
        self._add_consensus_timestamp(timestamp)
        self._remove_old_consensus_timestamps()

239
    @property
240
    def relay_in_recent_consensus_count(self):
241 242 243
        """Number of times the relay was in a conensus."""
        return len(self._consensus_timestamps)

244 245 246 247 248 249 250
    def can_exit_to_port(self, port):
        """
        Returns True if the relay has an exit policy and the policy accepts
        exiting to the given portself or False otherwise.
        """
        assert isinstance(port, int)
        # if dind't get the descriptor, there isn't exit policy
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
        # When the attribute is gotten in getattr(self._desc, "exit_policy"),
        # is possible that stem's _input_rules is None and raises an exception
        # (#29899):
        #   File "/usr/lib/python3/dist-packages/sbws/lib/relaylist.py", line 117, in can_exit_to_port  # noqa
        #     if not self.exit_policy:
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 512, in __len__  # noqa
        #     return len(self._get_rules())
        #   File "/usr/lib/python3/dist-packages/stem/exit_policy.py", line 464, in _get_rules  # noqa
        #     for rule in decompressed_rules:
        # TypeError: 'NoneType' object is not iterable
        # Therefore, catch the exception here.
        try:
            if self.exit_policy:
                return self.exit_policy.can_exit_to(port=port)
        except TypeError:
266
            return False
267
        return False
268

269 270 271 272
    def is_exit_not_bad_allowing_port(self, port):
        return (Flag.BADEXIT not in self.flags and
                Flag.EXIT in self.flags and
                self.can_exit_to_port(port))
273

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
    def increment_relay_recent_measurement_attempt_count(self):
        """
        Increment The number of times that a relay has been queued
        to be measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_measurement_attempt_count is None:
            self.relay_recent_measurement_attempt_count = 0
        self.relay_recent_measurement_attempt_count += 1

    def increment_relay_recent_priority_list_count(self):
        """
        The number of times that a relay is "prioritized" to be measured.

        It is call from
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.
        """
        # If it was not in the previous measurements version, start counting
        if self.relay_recent_priority_list_count is None:
            self.relay_recent_priority_list_count = 0
        self.relay_recent_priority_list_count += 1

298 299 300 301 302
    def is_old(self):
        """Whether the last consensus seen for this relay is older than the
        measurement period.
        """
        return timestamp.is_old(self.last_consensus_timestamp)
303

304 305 306 307 308 309 310 311 312 313 314
    # XXX: tech-debt: replace `_desc` attr by a a `dequee` of the last
    # descriptors seen for this relay and the timestamp.
    def update_server_descriptor(self, server_descriptor):
        """Update this relay server descriptor (from the consensus."""
        self._desc = server_descriptor

    # XXX: tech-debt: replace `_ns` attr by a a `dequee` of the last
    # router statuses seen for this relay and the timestampt.
    def update_router_status(self, router_status):
        """Update this relay router status (from the consensus)."""
        self._ns = router_status
315

juga  's avatar
juga committed
316

Matt Traudt's avatar
Matt Traudt committed
317
class RelayList:
Matt Traudt's avatar
Matt Traudt committed
318 319 320 321
    ''' Keeps a list of all relays in the current Tor network and updates it
    transparently in the background. Provides useful interfaces for getting
    only relays of a certain type.
    '''
Matt Traudt's avatar
Matt Traudt committed
322

323
    def __init__(self, args, conf, controller,
324
                 measurements_period=MEASUREMENTS_PERIOD, state=None):
Matt Traudt's avatar
Matt Traudt committed
325
        self._controller = controller
326
        self.rng = random.SystemRandom()
327
        self._refresh_lock = Lock()
328
        # To track all the consensus seen.
329 330 331
        self._recent_consensus = timestamps.DateTimeSeq(
            [], MAX_RECENT_CONSENSUS_COUNT, state, "recent_consensus"
        )
332 333 334 335 336 337
        # Initialize so that there's no error trying to access to it.
        # In future refactor, change to a dictionary, where the keys are
        # the relays' fingerprint.
        self._relays = []
        # The period of time for which the measurements are keep.
        self._measurements_period = measurements_period
338 339 340 341
        self._recent_measurement_attempt = timestamps.DateTimeSeq(
            [], MAX_RECENT_PRIORITY_RELAY_COUNT, state,
            "recent_measurement_attempt"
        )
Matt Traudt's avatar
Matt Traudt committed
342 343
        self._refresh()

344
    def _need_refresh(self):
345 346 347 348 349 350 351
        # New consensuses happen every hour.
        return datetime.utcnow() >= \
            self.last_consensus_timestamp + timedelta(seconds=60*60)

    @property
    def last_consensus_timestamp(self):
        """Returns the datetime when the last consensus was obtained."""
352
        return self._recent_consensus.last()
353

Matt Traudt's avatar
Matt Traudt committed
354 355
    @property
    def relays(self):
356 357 358
        # See if we can get the list of relays without having to do a refresh,
        # which is expensive and blocks other threads
        if self._need_refresh():
359 360
            log.debug('We need to refresh our list of relays. '
                      'Going to wait for lock.')
361 362 363
            # Whelp we couldn't just get the list of relays because the list is
            # stale. Wait for the lock so we can refresh it.
            with self._refresh_lock:
364 365
                log.debug('We got the lock. Now to see if we still '
                          'need to refresh.')
366 367 368 369
                # Now we have the lock ... but wait! Maybe someone else already
                # did the refreshing. So check if it still needs refreshing. If
                # not, we can do nothing.
                if self._need_refresh():
370
                    log.debug('Yup we need to refresh our relays. Doing so.')
371
                    self._refresh()
372 373 374 375
                else:
                    log.debug('No we don\'t need to refresh our relays. '
                              'It was done by someone else.')
            log.debug('Giving back the lock for refreshing relays.')
Matt Traudt's avatar
Matt Traudt committed
376 377
        return self._relays

Matt Traudt's avatar
Matt Traudt committed
378 379 380 381
    @property
    def fast(self):
        return self._relays_with_flag(Flag.FAST)

Matt Traudt's avatar
Matt Traudt committed
382 383 384 385
    @property
    def exits(self):
        return self._relays_with_flag(Flag.EXIT)

386 387
    @property
    def bad_exits(self):
388
        return self._relays_with_flag(Flag.BADEXIT)
389

390 391 392 393
    @property
    def non_exits(self):
        return self._relays_without_flag(Flag.EXIT)

Matt Traudt's avatar
Matt Traudt committed
394 395 396 397
    @property
    def guards(self):
        return self._relays_with_flag(Flag.GUARD)

398 399 400 401
    @property
    def authorities(self):
        return self._relays_with_flag(Flag.AUTHORITY)

402 403 404 405 406 407 408
    @property
    def relays_fingerprints(self):
        # Using relays instead of _relays, so that the list get updated if
        # needed, since this method is used to know which fingerprints are in
        # the consensus.
        return [r.fingerprint for r in self.relays]

Matt Traudt's avatar
Matt Traudt committed
409
    def random_relay(self):
410
        return self.rng.choice(self.relays)
Matt Traudt's avatar
Matt Traudt committed
411

Matt Traudt's avatar
Matt Traudt committed
412
    def _relays_with_flag(self, flag):
413
        return [r for r in self.relays if flag in r.flags]
Matt Traudt's avatar
Matt Traudt committed
414

Matt Traudt's avatar
Matt Traudt committed
415
    def _relays_without_flag(self, flag):
416
        return [r for r in self.relays if flag not in r.flags]
Matt Traudt's avatar
Matt Traudt committed
417

Matt Traudt's avatar
Matt Traudt committed
418
    def _init_relays(self):
419 420 421 422
        """Returns a new list of relays that are in the current consensus.
        And update the consensus timestamp list with the current one.

        """
Matt Traudt's avatar
Matt Traudt committed
423
        c = self._controller
424 425 426 427 428
        # This will get router statuses from this Tor cache, might not be
        # updated with the network.
        # Change to stem.descriptor.remote in future refactor.
        network_statuses = c.get_network_statuses()
        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
429 430
        log.debug("Number of relays in the current consensus: %d.",
                  len(new_relays_dict))
431 432 433

        # Find the timestamp of the last consensus.
        timestamp = valid_after_from_network_statuses(network_statuses)
434
        self._recent_consensus.update(timestamp)
435

436
        new_relays = []
437 438 439 440 441

        # Only or debugging, count the relays that are not in the current
        # consensus and have not been seen in the last consensuses either.
        num_old_relays = 0

442 443 444
        relays = copy.deepcopy(self._relays)
        for r in relays:
            if r.fingerprint in new_relays_dict.keys():
445 446 447 448 449
                # If a relay in the previous consensus and is in the current
                # one, update its timestamp, router status and descriptor.
                fp = r.fingerprint
                # new_relays_dict[fp] is the router status.
                r.update_router_status(new_relays_dict[fp])
450
                r.update_consensus_timestamps(timestamp)
451 452 453 454 455 456
                try:
                    descriptor = c.get_server_descriptor(fp, default=None)
                except (DescriptorUnavailable, ControllerError) as e:
                    log.exception("Exception trying to get desc %s", e)
                r.update_server_descriptor(descriptor)
                # Add it to the new list of relays.
457
                new_relays.append(r)
458 459 460
                # And remove it from the new consensus dict, as it has
                # already added to the new list.
                new_relays_dict.pop(fp)
461

462 463 464
            # If the relay is not in the current consensus but is not "old"
            # yet, add it to the new list of relays too, though its timestamp,
            # router status and descriptor can't be updated.
465
            elif not r.is_old():
466 467 468 469 470 471 472
                new_relays.append(r)
            # Otherwise, don't add it to the new list of relays.
            # For debugging, count the old relays that will be discarded.
            else:
                num_old_relays += 1

        # Finally, add the relays that were not in the previous consensus
473 474 475
        for fp, ns in new_relays_dict.items():
            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
            new_relays.append(r)
476

477
        days = self._measurements_period / (60 * 60 * 24)
478 479 480 481
        log.debug("Previous number of relays being measured %d",
                  len(self._relays))
        log.debug("Number of relays not in the in the consensus in the last "
                  "%d days: %d.",
482
                  days, num_old_relays)
483 484
        log.debug("Number of relays to measure with the current consensus: "
                  "%d", len(new_relays))
485
        return new_relays
Matt Traudt's avatar
Matt Traudt committed
486 487

    def _refresh(self):
488
        # Set a new list of relays.
Matt Traudt's avatar
Matt Traudt committed
489
        self._relays = self._init_relays()
490

491 492
        log.info("Number of consensuses obtained in the last %s days: %s.",
                 int(self._measurements_period / 24 / 60 / 60),
493
                 self.recent_consensus_count)
494

495
    @property
496
    def recent_consensus_count(self):
497
        """Number of times a new consensus was obtained."""
498
        return len(self._recent_consensus)
499

500 501 502
    def exits_not_bad_allowing_port(self, port):
        return [r for r in self.exits
                if r.is_exit_not_bad_allowing_port(port)]
503

504
    def increment_recent_measurement_attempt(self):
505 506 507 508 509 510 511 512 513
        """
        Increment the number of times that any relay has been queued to be
        measured.

        It is call from :funf:`~sbws.core.scaner.main_loop`.

        It is read and stored in a ``state`` file.
        """
        # NOTE: blocking, writes to file!
514 515 516 517 518
        self._recent_measurement_attempt.update()

    @property
    def recent_measurement_attempt_count(self):
        return len(self._recent_measurement_attempt)