v3bwfile.py 60.2 KB
Newer Older
juga's avatar
juga committed
1
2
3
# -*- coding: utf-8 -*-
"""Classes and functions that create the bandwidth measurements document
(v3bw) used by bandwidth authorities."""
4
5
# flake8: noqa: E741
# (E741 ambiguous variable name), when using l.
juga's avatar
juga committed
6

juga's avatar
juga committed
7
import copy
juga's avatar
juga committed
8
import logging
9
import math
juga's avatar
juga committed
10
import os
juga's avatar
juga committed
11
from itertools import combinations
juga's avatar
juga committed
12
from statistics import median, mean
13
from stem.descriptor import parse_file
juga's avatar
juga committed
14

15
from sbws import __version__
16
from sbws.globals import (SPEC_VERSION, BW_LINE_SIZE, SBWS_SCALE_CONSTANT,
17
18
                          TORFLOW_SCALING, SBWS_SCALING, TORFLOW_BW_MARGIN,
                          TORFLOW_OBS_LAST, TORFLOW_OBS_MEAN,
19
                          PROP276_ROUND_DIG, MIN_REPORT, MAX_BW_DIFF_PERC)
20
from sbws.lib import scaling
juga's avatar
juga committed
21
from sbws.lib.resultdump import ResultSuccess, _ResultType
22
from sbws.util.filelock import DirectoryLock
juga's avatar
juga committed
23
from sbws.util.timestamp import (now_isodt_str, unixts_to_isodt_str,
juga's avatar
juga committed
24
25
                                 now_unixts, isostr_to_dt_obj,
                                 dt_obj_to_isodt_str)
26
from sbws.util.state import State
juga's avatar
juga committed
27
28
29

log = logging.getLogger(__name__)

30
LINE_SEP = '\n'
31
32
KEYVALUE_SEP_V1 = '='
KEYVALUE_SEP_V2 = ' '
33
34
35
36
37
38

# NOTE: in a future refactor make make all the KeyValues be a dictionary
# with their type, so that it's more similar to stem parser.

# Header KeyValues
# =================
39
40
# KeyValues that need to be in a specific order in the Bandwidth File.
HEADER_KEYS_V1_1_ORDERED = ['version']
41
42
43
44
45
46
47
48
49
# KeyValues that are not initialized from the state file nor the measurements.
# They can also be pass as an argument to `Header` to overwrite default values,
# what is done in unit tests.
# `latest bandwidth` is special cause it gets its value from timestamp, which
# is not a KeyValue, but it's always pass as an agument.
# It could be separaed in other list, but so far there is no need, cause:
# 1. when it's pass to the Header to initialize it, it's just ignored.
# 2. when the file is created, it's took into account.
HEADER_KEYS_V1_1_SELF_INITIALIZED = [
50
51
52
    "software",
    "software_version",
    "file_created",
53
54
55
56
    "latest_bandwidth",
]
# KeyValues that are initialized from arguments.
HEADER_KEYS_V1_1_TO_INIT = [
57
58
    "earliest_bandwidth",
    "generator_started",
59
60
]

61
62
63
64
# number_eligible_relays is the number that ends in the bandwidth file
# ie, have not been excluded by one of the filters in 4. below
# They should be call recent_measurement_included_count to be congruent
# with the other KeyValues.
65
66
67
68
69
70
71
HEADER_KEYS_V1_2 = [
    "number_eligible_relays",
    "minimum_number_eligible_relays",
    "number_consensus_relays",
    "percent_eligible_relays",
    "minimum_percent_eligible_relays",
]
72

73
74
75
76
77
78
# KeyValues added in the Bandwidth File v1.3.0
HEADER_KEYS_V1_3 = [
    "scanner_country",
    "destinations_countries",
]

79
80
81
# KeyValues that count the number of relays that are in the bandwidth file,
# but ignored by Tor when voting, because they do not have a
# measured bandwidth.
82
HEADER_RECENT_MEASUREMENTS_EXCLUDED_KEYS = [
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    # Number of relays that were measured but all the measurements failed
    # because of network failures or it was
    # not found a suitable helper relay
    'recent_measurements_excluded_error_count',
    # Number of relays that have successful measurements but the measurements
    # were not away from each other in X time (by default 1 day).
    'recent_measurements_excluded_near_count',
    # Number of relays that have successful measurements and they are away from
    # each other but they are not X time recent.
    # By default this is 5 days, which is the same time the older
    # the measurements can be by default.
    'recent_measurements_excluded_old_count',
    # Number of relays that have successful measurements and they are away from
    # each other and recent
    # but the number of measurements are less than X (by default 2).
    'recent_measurements_excluded_few_count',
]
100
# Added in #29591
101
102
103
104
# NOTE: recent_consensus_count, recent_priority_list_count,
# recent_measurement_attempt_count and recent_priority_relay_count
# are not reset when the scanner is stop.
# They will accumulate the values since the scanner was ever started.
105
HEADER_KEYS_V1_4 = [
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    # 1.1 header: the number of different consensuses, that sbws has seen,
    # since the last 5 days
    'recent_consensus_count',
    # 2.4 Number of times a priority list has been created
    'recent_priority_list_count',
    # 2.5 Number of relays that there were in a priority list
    # [50, number of relays in the network * 0.05]
    'recent_priority_relay_count',
    # 3.6 header: the number of times that sbws has tried to measure any relay,
    # since the last 5 days
    # This would be the number of times a relays were in a priority list
    'recent_measurement_attempt_count',
    # 3.7 header: the number of times that sbws has tried to measure any relay,
    # since the last 5 days, but it didn't work
    # This should be the number of attempts - number of ResultSuccess -
    # something else we don't know yet
    # So far is the number of ResultError
    'recent_measurement_failure_count',
124
125
    # The time it took to report about half of the network.
    'time_to_report_half_network',
126
] + HEADER_RECENT_MEASUREMENTS_EXCLUDED_KEYS
127

juga's avatar
juga committed
128
129
130
131
132
133
134
135
# KeyValues added in the Bandwidth File v1.5.0
# XXX: Change SPEC_VERSION when all the v1.5.0 keys are added, before a new
# sbws release.
# Tor version will be obtained from the state file, so it won't be pass as an
# argument, but will be self-initialized.
HEADER_KEYS_V1_5_TO_INIT = ['tor_version']
HEADER_KEYS_V1_5 = HEADER_KEYS_V1_5_TO_INIT

136
137
138
139
140
# KeyValues that are initialized from arguments, not self-initialized.
HEADER_INIT_KEYS = (
    HEADER_KEYS_V1_1_TO_INIT
    + HEADER_KEYS_V1_3
    + HEADER_KEYS_V1_2
141
    + HEADER_KEYS_V1_4
juga's avatar
juga committed
142
    + HEADER_KEYS_V1_5_TO_INIT
143
)
144

145
HEADER_INT_KEYS = HEADER_KEYS_V1_2 + HEADER_KEYS_V1_4
146
# List of all unordered KeyValues currently being used to generate the file
147
148
149
150
151
152
HEADER_UNORDERED_KEYS = (
    HEADER_KEYS_V1_1_SELF_INITIALIZED
    + HEADER_KEYS_V1_1_TO_INIT
    + HEADER_KEYS_V1_3
    + HEADER_KEYS_V1_2
    + HEADER_KEYS_V1_4
juga's avatar
juga committed
153
    + HEADER_KEYS_V1_5
154
)
155
# List of all the KeyValues currently being used to generate the file
156
HEADER_ALL_KEYS = HEADER_KEYS_V1_1_ORDERED + HEADER_UNORDERED_KEYS
157

158
TERMINATOR = '====='
159
160
161

# Bandwidth Lines KeyValues
# =========================
162
# Num header lines in v1.X.X using all the KeyValues
163
NUM_LINES_HEADER_V1 = len(HEADER_ALL_KEYS) + 2
164
165
LINE_TERMINATOR = TERMINATOR + LINE_SEP

juga's avatar
juga committed
166
# KeyValue separator in Bandwidth Lines
167
BWLINE_KEYVALUES_SEP_V1 = ' '
168
# not inclding in the files the extra bws for now
169
BWLINE_KEYS_V0 = ['node_id', 'bw']
170
171
172
173
174
175
176
177
178
179
180
181
182
BWLINE_KEYS_V1_1 = [
    "master_key_ed25519",
    "nick",
    "rtt",
    "time",
    "success",
    "error_stream",
    "error_circ",
    "error_misc",
    # Added in #292951
    "error_second_relay",
    "error_destination",
]
183
184
185
186
187
188
189
190
191
192
BWLINE_KEYS_V1_2 = [
    "bw_median",
    "bw_mean",
    "desc_bw_avg",
    "desc_bw_bur",
    "desc_bw_obs_last",
    "desc_bw_obs_mean",
    "consensus_bandwidth",
    "consensus_bandwidth_is_unmeasured",
]
193

194
195
# There were no bandwidth lines key added in the specification version 1.3

196
# Added in #292951
197
BWLINE_KEYS_V1_4 = [
198
199
200
    # 1.2 relay: the number of different consensuses, that sbws has seen,
    # since the last 5 days, that have this relay
    'relay_in_recent_consensus_count',
201
202
203
    # 2.6 relay: the number of times a relay was "prioritized" to be measured
    # in the recent days (by default 5).
    'relay_recent_priority_list_count',
204
205
206
207
208
209
210
211
212
213
214
215
    # 3.8 relay:  the number of times that sbws has tried to measure
    # this relay, since the last 5 days
    # This would be the number of times a relay was in a priority list (2.6)
    # since once it gets measured, it either returns ResultError,
    # ResultSuccess or something else happened that we don't know yet
    'relay_recent_measurement_attempt_count',
    # 3.9 relay:  the number of times that sbws has tried to measure
    # this relay, since the last 5 days, but it didn't work
    # This should be the number of attempts - number of ResultSuccess -
    # something else we don't know yet
    # So far is the number of ResultError
    'relay_recent_measurement_failure_count',
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    # Number of error results created in the last 5 days that are excluded.
    # This is the sum of all the errors.
    'relay_recent_measurements_excluded_error_count',
    # The number of successful results, created in the last 5 days,
    # that were excluded by a rule, for this relay.
    # 'relay_recent_measurements_excluded_error_count' would be the
    # sum of the following 3 + the number of error results.

    # The number of successful measurements that are not X time away
    # from each other (by default 1 day).
    'relay_recent_measurements_excluded_near_count',
    # The number of successful measurements that are away from each other
    # but not X time recent (by default 5 days).
    'relay_recent_measurements_excluded_old_count',
    # The number of measurements excluded because they are not at least X
    # (by default 2).
    'relay_recent_measurements_excluded_few_count',
233
234
235
236
237
238
239
240
241
242
243
244
245
    # `vote=0` is used for the relays that were excluded to
    # be reported in the bandwidth file and now they are
    # reported.
    # It tells Tor to do not vote on the relay.
    # `unmeasured=1` is used for the same relays and it is
    # added in case Tor would vote on them in future versions.
    # Maybe these keys should not be included for the relays
    # in which vote=1 and unmeasured=0.
    'vote', 'unmeasured',
    # When there not enough eligible relays (not excluded)
    # under_min_report is 1, `vote` is 0.
    # Added in #29853.
    'under_min_report',
246
]
247
BWLINE_KEYS_V1 = BWLINE_KEYS_V0 + BWLINE_KEYS_V1_1 + BWLINE_KEYS_V1_2 \
248
               + BWLINE_KEYS_V1_4
juga's avatar
juga committed
249
250
# NOTE: tech-debt: assign boolean type to vote and unmeasured,
# when the attributes are defined with a type, as stem does.
251
252
253
254
255
256
257
258
259
260
261
262
BWLINE_INT_KEYS = (
    [
        "bw",
        "rtt",
        "success",
        "error_stream",
        "error_circ",
        "error_misc",
    ]
    + BWLINE_KEYS_V1_2
    + BWLINE_KEYS_V1_4
)
juga's avatar
juga committed
263
264


265
def round_sig_dig(n, digits=PROP276_ROUND_DIG):
266
267
268
269
270
271
272
    """Round n to 'digits' significant digits in front of the decimal point.
       Results less than or equal to 1 are rounded to 1.
       Returns an integer.

       digits must be greater than 0.
       n must be less than or equal to 2**73, to avoid floating point errors.
       """
273
    digits = int(digits)
274
275
276
277
278
279
280
281
282
    assert digits >= 1
    if n <= 1:
        return 1
    digits_in_n = int(math.log10(n)) + 1
    round_digits = max(digits_in_n - digits, 0)
    rounded_n = round(n, -round_digits)
    return int(rounded_n)


283
def kb_round_x_sig_dig(bw_bs, digits=PROP276_ROUND_DIG):
284
285
286
287
288
289
290
291
292
293
294
    """Convert bw_bs from bytes to kilobytes, and round the result to
       'digits' significant digits.
       Results less than or equal to 1 are rounded up to 1.
       Returns an integer.

       digits must be greater than 0.
       n must be less than or equal to 2**82, to avoid floating point errors.
       """
    # avoid double-rounding by using floating-point
    bw_kb = bw_bs / 1000.0
    return round_sig_dig(bw_kb, digits=digits)
295
296


juga's avatar
juga committed
297
298
299
300
301
302
303
304
305
def num_results_of_type(results, type_str):
    return len([r for r in results if r.type == type_str])


# Better way to use enums?
def result_type_to_key(type_str):
    return type_str.replace('-', '_')


306
class V3BWHeader(object):
juga's avatar
juga committed
307
308
    """
    Create a bandwidth measurements (V3bw) header
309
    following bandwidth measurements document spec version 1.X.X.
juga's avatar
juga committed
310

311
    :param str timestamp: timestamp in Unix Epoch seconds of the most recent
312
        generator result.
juga's avatar
juga committed
313
314
315
    :param str version: the spec version
    :param str software: the name of the software that generates this
    :param str software_version: the version of the software
316
    :param dict kwargs: extra headers. Currently supported:
juga's avatar
juga committed
317

318
319
320
321
        - earliest_bandwidth: str, ISO 8601 timestamp in UTC time zone
          when the first bandwidth was obtained
        - generator_started: str, ISO 8601 timestamp in UTC time zone
          when the generator started
juga's avatar
juga committed
322
    """
323
    def __init__(self, timestamp, **kwargs):
juga's avatar
juga committed
324
325
326
327
        assert isinstance(timestamp, str)
        for v in kwargs.values():
            assert isinstance(v, str)
        self.timestamp = timestamp
328
329
330
331
        # KeyValues with default value when not given by kwargs
        self.version = kwargs.get('version', SPEC_VERSION)
        self.software = kwargs.get('software', 'sbws')
        self.software_version = kwargs.get('software_version', __version__)
332
        self.file_created = kwargs.get('file_created', now_isodt_str())
juga's avatar
juga committed
333
        # latest_bandwidth should not be in kwargs, since it MUST be the
334
        # same as timestamp
juga's avatar
juga committed
335
        self.latest_bandwidth = unixts_to_isodt_str(timestamp)
336
        [setattr(self, k, v) for k, v in kwargs.items()
337
         if k in HEADER_INIT_KEYS]
juga's avatar
juga committed
338

339
    def __str__(self):
340
        if self.version.startswith('1.'):
341
342
            return self.strv1
        return self.strv2
343

juga's avatar
juga committed
344
    @classmethod
345
346
    def from_results(cls, results, scanner_country=None,
                     destinations_countries=None, state_fpath=''):
juga's avatar
juga committed
347
348
349
        kwargs = dict()
        latest_bandwidth = cls.latest_bandwidth_from_results(results)
        earliest_bandwidth = cls.earliest_bandwidth_from_results(results)
350
        # NOTE: Blocking, reads file
juga's avatar
juga committed
351
        generator_started = cls.generator_started_from_file(state_fpath)
352
        recent_consensus_count = cls.consensus_count_from_file(state_fpath)
juga's avatar
juga committed
353
        timestamp = str(latest_bandwidth)
juga's avatar
juga committed
354
355
356
357
358
359
360
361
362

        # XXX: tech-debt: obtain the other values from the state file using
        # this state variable.
        # Store the state as an attribute of the object?
        state = State(state_fpath)
        tor_version = state.get('tor_version', None)
        if tor_version:
            kwargs['tor_version'] = tor_version

juga's avatar
juga committed
363
364
365
366
        kwargs['latest_bandwidth'] = unixts_to_isodt_str(latest_bandwidth)
        kwargs['earliest_bandwidth'] = unixts_to_isodt_str(earliest_bandwidth)
        if generator_started is not None:
            kwargs['generator_started'] = generator_started
367
368
369
        # To be compatible with older bandwidth files, do not require it.
        if scanner_country is not None:
            kwargs['scanner_country'] = scanner_country
370
371
        if destinations_countries is not None:
            kwargs['destinations_countries'] = destinations_countries
372
        if recent_consensus_count is not None:
373
            kwargs['recent_consensus_count'] = recent_consensus_count
374
375
376
377
378
379
380
381
382
383
384

        recent_measurement_attempt_count = \
            cls.recent_measurement_attempt_count_from_file(state_fpath)
        if recent_measurement_attempt_count is not None:
            kwargs['recent_measurement_attempt_count'] = \
                str(recent_measurement_attempt_count)

        # If it is a failure that is not a ResultError, then
        # failures = attempts - all mesaurements
        # Works only in the case that old measurements files already had
        # measurements count
385
386
        # If this is None or 0, the failures can't be calculated
        if recent_measurement_attempt_count:
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
            all_measurements = 0
            for result_list in results.values():
                all_measurements += len(result_list)
            measurement_failures = (recent_measurement_attempt_count
                                    - all_measurements)
            kwargs['recent_measurement_failure_count'] = \
                str(measurement_failures)

        priority_lists = cls.recent_priority_list_count_from_file(state_fpath)
        if priority_lists is not None:
            kwargs['recent_priority_list_count'] = str(priority_lists)

        priority_relays = \
            cls.recent_priority_relay_count_from_file(state_fpath)
        if priority_relays is not None:
            kwargs['recent_priority_relay_count'] = str(priority_relays)

juga's avatar
juga committed
404
405
406
        h = cls(timestamp, **kwargs)
        return h

407
    @classmethod
408
    def from_lines_v1(cls, lines):
409
410
        """
        :param list lines: list of lines to parse
411
        :returns: tuple of V3BWHeader object and non-header lines
412
413
414
415
        """
        assert isinstance(lines, list)
        try:
            index_terminator = lines.index(TERMINATOR)
Matt Traudt's avatar
Matt Traudt committed
416
        except ValueError:
417
418
419
            # is not a bw file or is v100
            log.warn('Terminator is not in lines')
            return None
420
        ts = lines[0]
421
        kwargs = dict([l.split(KEYVALUE_SEP_V1)
422
                       for l in lines[:index_terminator]
423
                       if l.split(KEYVALUE_SEP_V1)[0] in HEADER_ALL_KEYS])
424
        h = cls(ts, **kwargs)
juga's avatar
juga committed
425
        # last line is new line
426
        return h, lines[index_terminator + 1:-1]
427
428

    @classmethod
429
    def from_text_v1(self, text):
430
        """
431
        :param str text: text to parse
432
        :returns: tuple of V3BWHeader object and non-header lines
433
434
        """
        assert isinstance(text, str)
435
        return self.from_lines_v1(text.split(LINE_SEP))
juga's avatar
juga committed
436

juga's avatar
juga committed
437
438
439
440
441
442
443
444
445
446
447
    @classmethod
    def from_lines_v100(cls, lines):
        """
        :param list lines: list of lines to parse
        :returns: tuple of V3BWHeader object and non-header lines
        """
        assert isinstance(lines, list)
        h = cls(lines[0])
        # last line is new line
        return h, lines[1:-1]

juga's avatar
juga committed
448
    @staticmethod
juga's avatar
juga committed
449
    def generator_started_from_file(state_fpath):
450
451
452
453
        '''
        ISO formatted timestamp for the time when the scanner process most
        recently started.
        '''
juga's avatar
juga committed
454
        state = State(state_fpath)
455
        if 'scanner_started' in state:
juga's avatar
juga committed
456
457
            # From v1.1.0-dev `state` is capable of converting strs to datetime
            return dt_obj_to_isodt_str(state['scanner_started'])
458
459
        else:
            return None
juga's avatar
juga committed
460

461
462
463
    @staticmethod
    def consensus_count_from_file(state_fpath):
        state = State(state_fpath)
464
465
466
467
        count = state.count("recent_consensus")
        if count:
            return str(count)
        return None
468

469
470
471
472
473
474
475
476
    # NOTE: in future refactor store state in the class
    @staticmethod
    def recent_measurement_attempt_count_from_file(state_fpath):
        """
        Returns the number of times any relay was queued to be measured
        in the recent (by default 5) days from the state file.
        """
        state = State(state_fpath)
477
        return state.count('recent_measurement_attempt')
478
479
480
481
482
483
484
485
486
487

    @staticmethod
    def recent_priority_list_count_from_file(state_fpath):
        """
        Returns the number of times
        :meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`
        was run
        in the recent (by default 5) days from the state file.
        """
        state = State(state_fpath)
488
        return state.count('recent_priority_list')
489
490
491
492
493
494
495
496

    @staticmethod
    def recent_priority_relay_count_from_file(state_fpath):
        """
        Returns the number of times any relay was "prioritized" to be measured
        in the recent (by default 5) days from the state file.
        """
        state = State(state_fpath)
497
        return state.count('recent_priority_relay')
498

juga's avatar
juga committed
499
    @staticmethod
juga's avatar
juga committed
500
    def latest_bandwidth_from_results(results):
juga's avatar
juga committed
501
        return round(max([r.time for fp in results for r in results[fp]]))
juga's avatar
juga committed
502
503
504

    @staticmethod
    def earliest_bandwidth_from_results(results):
juga's avatar
juga committed
505
        return round(min([r.time for fp in results for r in results[fp]]))
juga's avatar
juga committed
506

juga's avatar
juga committed
507
508
509
510
511
    @property
    def keyvalue_unordered_tuple_ls(self):
        """Return list of KeyValue tuples that do not have specific order."""
        # sort the list to generate determinist headers
        keyvalue_tuple_ls = sorted([(k, v) for k, v in self.__dict__.items()
512
                                    if k in HEADER_UNORDERED_KEYS])
juga's avatar
juga committed
513
514
515
516
517
518
519
520
        return keyvalue_tuple_ls

    @property
    def keyvalue_tuple_ls(self):
        """Return list of all KeyValue tuples"""
        return [('version', self.version)] + self.keyvalue_unordered_tuple_ls

    @property
521
522
523
    def keyvalue_v1str_ls(self):
        """Return KeyValue list of strings following spec v1.X.X."""
        keyvalues = [self.timestamp] + [KEYVALUE_SEP_V1.join([k, v])
juga's avatar
juga committed
524
525
526
527
                                        for k, v in self.keyvalue_tuple_ls]
        return keyvalues

    @property
528
529
530
    def strv1(self):
        """Return header string following spec v1.X.X."""
        header_str = LINE_SEP.join(self.keyvalue_v1str_ls) + LINE_SEP + \
juga's avatar
juga committed
531
532
533
534
            LINE_TERMINATOR
        return header_str

    @property
535
536
537
    def keyvalue_v2_ls(self):
        """Return KeyValue list of strings following spec v2.X.X."""
        keyvalue = [self.timestamp] + [KEYVALUE_SEP_V2.join([k, v])
juga's avatar
juga committed
538
539
540
541
                                       for k, v in self.keyvalue_tuple_ls]
        return keyvalue

    @property
542
543
544
    def strv2(self):
        """Return header string following spec v2.X.X."""
        header_str = LINE_SEP.join(self.keyvalue_v2_ls) + LINE_SEP + \
juga's avatar
juga committed
545
546
547
548
549
550
            LINE_TERMINATOR
        return header_str

    @property
    def num_lines(self):
        return len(self.__str__().split(LINE_SEP))
juga's avatar
juga committed
551

juga's avatar
juga committed
552
553
554
    def add_stats(self, **kwargs):
        # Using kwargs because attributes might chage.
        [setattr(self, k, str(v)) for k, v in kwargs.items()
555
         if k in HEADER_KEYS_V1_2]
juga's avatar
juga committed
556

557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
    def add_time_report_half_network(self):
        """Add to the header the time it took to measure half of the network.

        It is not the time the scanner actually takes on measuring all the
        network, but the ``number_eligible_relays`` that are reported in the
        bandwidth file and directory authorities will vote on.

        This is calculated for half of the network, so that failed or not
        reported relays do not affect too much.

        For instance, if there are 6500 relays in the network, half of the
        network would be 3250. And if there were 4000 eligible relays
        measured in an interval of 3 days, the time to measure half of the
        network would be 3 days * 3250 / 4000.

        Since the elapsed time is calculated from the earliest and the
        latest measurement and a relay might have more than 2 measurements,
        this would give an estimate on how long it would take to measure
        the network including all the valid measurements.

        Log also an estimated on how long it would take with the current
        number of relays included in the bandwidth file.
        """
        # NOTE: in future refactor do not convert attributes to str until
        # writing to the file, so that they do not need to be converted back
        # to do some calculations.
        elapsed_time = (
            (isostr_to_dt_obj(self.latest_bandwidth)
             - isostr_to_dt_obj(self.earliest_bandwidth))
            .total_seconds())

        # This attributes were added later and some tests that
        # do not initialize them would fail.
        eligible_relays = int(getattr(self, 'number_eligible_relays', 0))
        consensus_relays = int(getattr(self, 'number_consensus_relays', 0))
        if not(eligible_relays and consensus_relays):
            return

        half_network = consensus_relays / 2
        # Calculate the time it would take to measure half of the network
        if eligible_relays >= half_network:
            time_half_network = round(
                elapsed_time * half_network / eligible_relays
            )
            self.time_to_report_half_network = str(time_half_network)

        # In any case log an estimated on the time to measure all the network.
        estimated_time = round(
            elapsed_time * consensus_relays / eligible_relays
        )
        log.info("Estimated time to measure the network: %s hours.",
                 round(estimated_time / 60 / 60))

610
611
612
613
614
615
616
617
618
    def add_relays_excluded_counters(self, exclusion_dict):
        """
        Add the monitoring KeyValues to the header about the number of
        relays not included because they were not ``eligible``.
        """
        log.debug("Adding relays excluded counters.")
        for k, v in exclusion_dict.items():
            setattr(self, k, str(v))

juga's avatar
juga committed
619

juga's avatar
juga committed
620
class V3BWLine(object):
juga's avatar
juga committed
621
    """
622
    Create a Bandwidth List line following the spec version 1.X.X.
juga's avatar
juga committed
623

juga's avatar
juga committed
624
625
626
627
    :param str node_id: the relay fingerprint
    :param int bw: the bandwidth value that directory authorities will include
        in their votes.
    :param dict kwargs: extra headers.
juga's avatar
juga committed
628

juga's avatar
juga committed
629
630
    .. note:: tech-debt: move node_id and bw to kwargs and just ensure that
       the required values are in **kwargs
juga's avatar
juga committed
631
    """
juga's avatar
juga committed
632
633
    def __init__(self, node_id, bw, **kwargs):
        assert isinstance(node_id, str)
634
        assert node_id.startswith('$')
juga's avatar
juga committed
635
636
        self.node_id = node_id
        self.bw = bw
637
638
        # For now, we do not want to add ``bw_filt`` to the bandwidth file,
        # therefore it is set here but not added to ``BWLINE_KEYS_V1``.
juga's avatar
juga committed
639
        [setattr(self, k, v) for k, v in kwargs.items()
640
         if k in BWLINE_KEYS_V1 + ["bw_filt"]]
juga's avatar
juga committed
641

juga's avatar
juga committed
642
    def __str__(self):
643
        return self.bw_strv1
juga's avatar
juga committed
644

juga's avatar
juga committed
645
    @classmethod
juga's avatar
juga committed
646
    def from_results(cls, results, secs_recent=None, secs_away=None,
647
                     min_num=0, router_statuses_d=None):
648
649
650
        """Convert sbws results to relays' Bandwidth Lines

        ``bs`` stands for Bytes/seconds
651
        ``bw_mean`` means the bw is obtained from the mean of the all the
652
653
654
655
656
        downloads' bandwidth.
        Downloads' bandwidth are calculated as the amount of data received
        divided by the the time it took to received.
        bw = data (Bytes) / time (seconds)
        """
657
        # log.debug("Len success_results %s", len(success_results))
juga's avatar
juga committed
658
659
660
661
662
663
664
        node_id = '$' + results[0].fingerprint
        kwargs = dict()
        kwargs['nick'] = results[0].nickname
        if getattr(results[0], 'master_key_ed25519'):
            kwargs['master_key_ed25519'] = results[0].master_key_ed25519
        kwargs['time'] = cls.last_time_from_results(results)
        kwargs.update(cls.result_types_from_results(results))
665
666
667
668
669
670
671
672
673
674

        # If it has not the attribute, return list to be able to call len
        # If it has the attribute, but it is None, return also list
        kwargs['relay_in_recent_consensus_count'] = str(
            max([
                len(getattr(r, 'relay_in_recent_consensus', []) or [])
                for r in results
            ])
        )

675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
        # Workaround for #34309.
        # Because of a bug, probably in relaylist, resultdump, relayprioritizer
        # or scanner, only the last timestamp is being stored in each result.
        # Temporally count the number of timestamps for all results.
        # If there is an unexpected failure and the result is not stored, this
        # number would be lower than what would be the correct one.
        # This should happen rarely or never.
        ts = set([])
        for r in results:
            if getattr(r, "relay_recent_priority_list", None):
                ts.update(r.relay_recent_priority_list)
        kwargs["relay_recent_priority_list_count"] = str(len(ts))

        # Same comment as the previous paragraph.
        ts = set()
        for r in results:
            if getattr(r, "relay_recent_measurement_attempt", None):
                ts.update(r.relay_recent_measurement_attempt)
        kwargs["relay_recent_measurement_attempt_count"] = str(len(ts))
694

695
        success_results = [r for r in results if isinstance(r, ResultSuccess)]
696
697
698
699

        # NOTE: The following 4 conditions exclude relays from the bandwidth
        # file when the measurements does not satisfy some rules, what makes
        # the relay non-`eligible`.
700
701
        # In BWLINE_KEYS_V1_4 it is explained what they mean.
        # In HEADER_RECENT_MEASUREMENTS_EXCLUDED_KEYS it is also
702
703
704
705
706
707
        # explained the what it means the strings returned.
        # They rules were introduced in #28061 and #27338
        # In #28565 we introduce the KeyValues to know why they're excluded.
        # In #28563 we report these relays, but make Tor ignore them.
        # This might confirm #28042.

juga's avatar
juga committed
708
709
710
711
712
713
714
715
716
717
718
719
720
721
        # If the relay is non-`eligible`:
        # Create a bandwidth line with the relay, but set ``vote=0`` so that
        # Tor versions with patch #29806 does not vote on the relay.
        # Set ``bw=1`` so that Tor versions without the patch,
        # will give the relay low bandwidth.
        # Include ``unmeasured=1`` in case Tor would vote on unmeasured relays
        # in future versions.
        # And return because there are not bandwidth values.
        # NOTE: the bandwidth values could still be obtained if:
        # 1. ``ResultError`` will store them
        # 2. assign ``results_recent = results`` when there is a ``exclusion
        # reason.
        # This could be done in a better way as part of a refactor #28684.

juga's avatar
juga committed
722
723
        kwargs['vote'] = 0
        kwargs['unmeasured'] = 1
juga's avatar
juga committed
724
725

        exclusion_reason = None
726

727
728
729
730
731
        number_excluded_error = len(results) - len(success_results)
        if number_excluded_error > 0:
            # then the number of error results is the number of results
            kwargs['relay_recent_measurements_excluded_error_count'] = \
                number_excluded_error
732
        if not success_results:
733
            exclusion_reason = 'recent_measurements_excluded_error_count'
juga's avatar
juga committed
734
            return (cls(node_id, 1, **kwargs), exclusion_reason)
735

736
737
        results_away = \
            cls.results_away_each_other(success_results, secs_away)
738
739
740
        number_excluded_near = len(success_results) - len(results_away)
        if number_excluded_near > 0:
            kwargs['relay_recent_measurements_excluded_near_count'] = \
741
                number_excluded_near
742
        if not results_away:
juga's avatar
juga committed
743
            exclusion_reason = \
744
                'recent_measurements_excluded_near_count'
juga's avatar
juga committed
745
            return (cls(node_id, 1, **kwargs), exclusion_reason)
746
747
        # log.debug("Results away from each other: %s",
        #           [unixts_to_isodt_str(r.time) for r in results_away])
748

749
        results_recent = cls.results_recent_than(results_away, secs_recent)
750
751
752
753
        number_excluded_old = len(results_away) - len(results_recent)
        if number_excluded_old > 0:
            kwargs['relay_recent_measurements_excluded_old_count'] = \
                number_excluded_old
754
        if not results_recent:
juga's avatar
juga committed
755
            exclusion_reason = \
756
                'recent_measurements_excluded_old_count'
juga's avatar
juga committed
757
            return (cls(node_id, 1, **kwargs), exclusion_reason)
758

759
        if not len(results_recent) >= min_num:
760
761
            kwargs['relay_recent_measurements_excluded_few_count'] = \
                len(results_recent)
762
            # log.debug('The number of results is less than %s', min_num)
juga's avatar
juga committed
763
            exclusion_reason = \
764
                'recent_measurements_excluded_few_count'
juga's avatar
juga committed
765
766
            return (cls(node_id, 1, **kwargs), exclusion_reason)

767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
        # Use the last consensus if available, since the results' consensus
        # values come from the moment the measurement was made.
        if router_statuses_d and node_id in router_statuses_d:
            consensus_bandwidth = \
                router_statuses_d[node_id].bandwidth * 1000
            consensus_bandwidth_is_unmeasured = \
                router_statuses_d[node_id].is_unmeasured
        else:
            consensus_bandwidth = \
                cls.consensus_bandwidth_from_results(results_recent)
            consensus_bandwidth_is_unmeasured = \
                cls.consensus_bandwidth_is_unmeasured_from_results(
                    results_recent)
        # If there is no last observed bandwidth, there won't be mean either.
        desc_bw_obs_last = \
            cls.desc_bw_obs_last_from_results(results_recent)

784
785
786
787
788
789
790
        # Exclude also relays without consensus bandwidth nor observed
        # bandwidth, since they can't be scaled
        if (desc_bw_obs_last is None and consensus_bandwidth is None):
            # This reason is not counted, not added in the file, but it will
            # have vote = 0
            return(cls(node_id, 1), "no_consensus_no_observed_bw")

juga's avatar
juga committed
791
792
793
794
        # For any line not excluded, do not include vote and unmeasured
        # KeyValues
        del kwargs['vote']
        del kwargs['unmeasured']
795

796
797
798
799
        rtt = cls.rtt_from_results(results_recent)
        if rtt:
            kwargs['rtt'] = rtt
        bw = cls.bw_median_from_results(results_recent)
800
801
802
        # XXX: all the class functions could use the bw_measurements instead of
        # obtaining them each time or use a class Measurements.
        bw_measurements = scaling.bw_measurements_from_results(results_recent)
803
        kwargs['bw_mean'] = cls.bw_mean_from_results(results_recent)
804
        kwargs['bw_filt'] = scaling.bw_filt(bw_measurements)
805
806
807
808
809
810
        kwargs['bw_median'] = cls.bw_median_from_results(
            results_recent)
        kwargs['desc_bw_avg'] = \
            cls.desc_bw_avg_from_results(results_recent)
        kwargs['desc_bw_bur'] = \
            cls.desc_bw_bur_from_results(results_recent)
811
        kwargs['consensus_bandwidth'] = consensus_bandwidth
812
        kwargs['consensus_bandwidth_is_unmeasured'] = \
813
814
            consensus_bandwidth_is_unmeasured
        kwargs['desc_bw_obs_last'] = desc_bw_obs_last
815
816
        kwargs['desc_bw_obs_mean'] = \
            cls.desc_bw_obs_mean_from_results(results_recent)
817

818
        bwl = cls(node_id, bw, **kwargs)
819
        return bwl, None
juga's avatar
juga committed
820
821
822
823
824
825

    @classmethod
    def from_data(cls, data, fingerprint):
        assert fingerprint in data
        return cls.from_results(data[fingerprint])

juga's avatar
juga committed
826
    @classmethod
827
    def from_bw_line_v1(cls, line):
juga's avatar
juga committed
828
        assert isinstance(line, str)
829
        kwargs = dict([kv.split(KEYVALUE_SEP_V1)
830
                       for kv in line.split(BWLINE_KEYVALUES_SEP_V1)
831
                       if kv.split(KEYVALUE_SEP_V1)[0] in BWLINE_KEYS_V1])
juga's avatar
juga committed
832
        for k, v in kwargs.items():
833
            if k in BWLINE_INT_KEYS:
juga's avatar
juga committed
834
                kwargs[k] = int(v)
835
836
837
838
839
        node_id = kwargs['node_id']
        bw = kwargs['bw']
        del kwargs['node_id']
        del kwargs['bw']
        bw_line = cls(node_id, bw, **kwargs)
juga's avatar
juga committed
840
841
        return bw_line

juga's avatar
juga committed
842
843
    @staticmethod
    def results_away_each_other(results, secs_away=None):
juga's avatar
juga committed
844
845
        # log.debug("Checking whether results are away from each other in %s "
        #           "secs.", secs_away)
juga's avatar
juga committed
846
847
        if secs_away is None or len(results) < 2:
            return results
juga's avatar
juga committed
848
849
850
851
852
        for a, b in combinations(results, 2):
            if abs(a.time - b.time) > secs_away:
                return results
        # log.debug("Results are NOT away from each other in at least %ss: %s",
        #           secs_away, [unixts_to_isodt_str(r.time) for r in results])
853
        return []
juga's avatar
juga committed
854
855
856
857
858

    @staticmethod
    def results_recent_than(results, secs_recent=None):
        if secs_recent is None:
            return results
juga's avatar
juga committed
859
        results_recent = list(filter(
juga's avatar
juga committed
860
                            lambda x: (now_unixts() - x.time) < secs_recent,
juga's avatar
juga committed
861
862
863
864
865
866
                            results))
        # if not results_recent:
        #     log.debug("Results are NOT more recent than %ss: %s",
        #               secs_recent,
        #               [unixts_to_isodt_str(r.time) for r in results])
        return results_recent
juga's avatar
juga committed
867

868
    @staticmethod
869
    def bw_median_from_results(results):
870
871
872
873
        return max(round(median([dl['amount'] / dl['duration']
                                 for r in results for dl in r.downloads])), 1)

    @staticmethod
874
    def bw_mean_from_results(results):
875
876
877
        return max(round(mean([dl['amount'] / dl['duration']
                               for r in results for dl in r.downloads])), 1)

juga's avatar
juga committed
878
879
880
881
882
883
884
885
    @staticmethod
    def last_time_from_results(results):
        return unixts_to_isodt_str(round(max([r.time for r in results])))

    @staticmethod
    def rtt_from_results(results):
        # convert from miliseconds to seconds
        rtts = [(round(rtt * 1000)) for r in results for rtt in r.rtts]
juga's avatar
juga committed
886
        rtt = round(median(rtts)) if rtts else None
juga's avatar
juga committed
887
888
889
890
891
892
893
894
895
        return rtt

    @staticmethod
    def result_types_from_results(results):
        rt_dict = dict([(result_type_to_key(rt.value),
                         num_results_of_type(results, rt.value))
                        for rt in _ResultType])
        return rt_dict

896
897
898
899
900
901
    @staticmethod
    def desc_bw_avg_from_results(results):
        """Obtain the last descriptor bandwidth average from the results."""
        for r in reversed(results):
            if r.relay_average_bandwidth is not None:
                return r.relay_average_bandwidth
902
        log.warning("Descriptor average bandwidth is None.")
903
904
        return None

905
906
907
908
909
910
    @staticmethod
    def desc_bw_bur_from_results(results):
        """Obtain the last descriptor bandwidth burst from the results."""
        for r in reversed(results):
            if r.relay_burst_bandwidth is not None:
                return r.relay_burst_bandwidth
911
        log.warning("Descriptor burst bandwidth is None.")
912
913
        return None

914
915
916
917
918
919
    @staticmethod
    def consensus_bandwidth_from_results(results):
        """Obtain the last consensus bandwidth from the results."""
        for r in reversed(results):
            if r.consensus_bandwidth is not None:
                return r.consensus_bandwidth
920
        log.warning("Consensus bandwidth is None.")
921
922
923
924
925
926
927
928
        return None

    @staticmethod
    def consensus_bandwidth_is_unmeasured_from_results(results):
        """Obtain the last consensus unmeasured flag from the results."""
        for r in reversed(results):
            if r.consensus_bandwidth_is_unmeasured is not None:
                return r.consensus_bandwidth_is_unmeasured
929
            log.warning("Consensus bandwidth is unmeasured is None.")
930
931
        return None

932
    @staticmethod
933
934
    def desc_bw_obs_mean_from_results(results):
        desc_bw_obs_ls = []
935
936
        for r in results:
            if r.relay_observed_bandwidth is not None:
937
938
                desc_bw_obs_ls.append(r.relay_observed_bandwidth)
        if desc_bw_obs_ls:
939
940
            return round(mean(desc_bw_obs_ls))
        log.warning("Descriptor observed bandwidth is None.")
941
942
943
        return None

    @staticmethod
944
    def desc_bw_obs_last_from_results(results):
945
946
947
948
        # the last is at the end of the list
        for r in reversed(results):
            if r.relay_observed_bandwidth is not None:
                return r.relay_observed_bandwidth
949
        log.warning("Descriptor observed bandwidth is None.")
950
951
        return None

juga's avatar
juga committed
952
953
954
955
956
    @property
    def bw_keyvalue_tuple_ls(self):
        """Return list of KeyValue Bandwidth Line tuples."""
        # sort the list to generate determinist headers
        keyvalue_tuple_ls = sorted([(k, v) for k, v in self.__dict__.items()
957
                                    if k in BWLINE_KEYS_V1])
juga's avatar
juga committed
958
        return keyvalue_tuple_ls
juga's avatar
juga committed
959

juga's avatar
juga committed
960
    @property
961
    def bw_keyvalue_v1str_ls(self):
juga's avatar
juga committed
962
        """Return list of KeyValue Bandwidth Line strings following
963
        spec v1.X.X.
juga's avatar
juga committed
964
        """
965
        bw_keyvalue_str = [KEYVALUE_SEP_V1 .join([k, str(v)])
juga's avatar
juga committed
966
967
                           for k, v in self.bw_keyvalue_tuple_ls]
        return bw_keyvalue_str
juga's avatar
juga committed
968

juga's avatar
juga committed
969
    @property
970
971
    def bw_strv1(self):
        """Return Bandwidth Line string following spec v1.X.X."""
972
        bw_line_str = BWLINE_KEYVALUES_SEP_V1.join(
973
                        self.bw_keyvalue_v1str_ls) + LINE_SEP
juga's avatar
juga committed
974
975
976
977
978
979
        if len(bw_line_str) > BW_LINE_SIZE:
            # if this is the case, probably there are too many KeyValues,
            # or the limit needs to be changed in Tor
            log.warn("The bandwidth line %s is longer than %s",
                     len(bw_line_str), BW_LINE_SIZE)
        return bw_line_str
juga's avatar
juga committed
980

juga's avatar
juga committed
981

982
class V3BWFile(object):
juga's avatar
juga committed
983
    """
984
    Create a Bandwidth List file following spec version 1.X.X
juga's avatar
juga committed
985
986
987
988

    :param V3BWHeader v3bwheader: header
    :param list v3bwlines: V3BWLines
    """
juga's avatar
juga committed
989
990
991
992
993
    def __init__(self, v3bwheader, v3bwlines):
        self.header = v3bwheader
        self.bw_lines = v3bwlines

    def __str__(self):
994
        return str(self.header) + ''.join([str(bw_line) or ''
juga's avatar
juga committed
995
996
                                           for bw_line in self.bw_lines])

juga's avatar
juga committed
997
    @classmethod
998
999
    def from_results(cls, results, scanner_country=None,
                     destinations_countries=None, state_fpath='',
1000
                     scale_constant=SBWS_SCALE_CONSTANT,
For faster browsing, not all history is shown. View entire blame