v3bwfile.py 40.8 KB
Newer Older
juga  's avatar
juga committed
1
2
3
4
# -*- coding: utf-8 -*-
"""Classes and functions that create the bandwidth measurements document
(v3bw) used by bandwidth authorities."""

juga  's avatar
juga committed
5
import copy
juga  's avatar
juga committed
6
import logging
7
import math
juga  's avatar
juga committed
8
import os
9
from itertools import combinations
juga  's avatar
juga committed
10
from statistics import median, mean
11
from stem.descriptor import parse_file
12

13
from sbws import __version__
14
from sbws.globals import (SPEC_VERSION, BW_LINE_SIZE, SBWS_SCALE_CONSTANT,
15
16
                          TORFLOW_SCALING, SBWS_SCALING, TORFLOW_BW_MARGIN,
                          TORFLOW_OBS_LAST, TORFLOW_OBS_MEAN,
17
                          PROP276_ROUND_DIG, MIN_REPORT, MAX_BW_DIFF_PERC)
juga  's avatar
juga committed
18
from sbws.lib.resultdump import ResultSuccess, _ResultType
19
from sbws.util.filelock import DirectoryLock
juga  's avatar
juga committed
20
21
from sbws.util.timestamp import (now_isodt_str, unixts_to_isodt_str,
                                 now_unixts)
22
from sbws.util.state import State
juga  's avatar
juga committed
23
24
25

log = logging.getLogger(__name__)

26
LINE_SEP = '\n'
27
28
KEYVALUE_SEP_V1 = '='
KEYVALUE_SEP_V2 = ' '
29
30
31
# List of the extra KeyValues accepted by the class
EXTRA_ARG_KEYVALUES = ['software', 'software_version', 'file_created',
                       'earliest_bandwidth', 'generator_started']
juga  's avatar
juga committed
32
33
34
STATS_KEYVALUES = ['number_eligible_relays', 'minimum_number_eligible_relays',
                   'number_consensus_relays', 'percent_eligible_relays',
                   'minimum_percent_eligible_relays']
35
KEYVALUES_INT = STATS_KEYVALUES
36
# List of all unordered KeyValues currently being used to generate the file
37
38
UNORDERED_KEYVALUES = EXTRA_ARG_KEYVALUES + STATS_KEYVALUES + \
                      ['latest_bandwidth']
39
40
# List of all the KeyValues currently being used to generate the file
ALL_KEYVALUES = ['version'] + UNORDERED_KEYVALUES
41
TERMINATOR = '====='
42
43
# Num header lines in v1.X.X using all the KeyValues
NUM_LINES_HEADER_V1 = len(ALL_KEYVALUES) + 2
44
45
LINE_TERMINATOR = TERMINATOR + LINE_SEP

juga  's avatar
juga committed
46
# KeyValue separator in Bandwidth Lines
47
BW_KEYVALUE_SEP_V1 = ' '
48
49
50
51
52
# not inclding in the files the extra bws for now
BW_KEYVALUES_BASIC = ['node_id', 'bw']
BW_KEYVALUES_FILE = BW_KEYVALUES_BASIC + \
                    ['master_key_ed25519', 'nick', 'rtt', 'time',
                     'success', 'error_stream', 'error_circ', 'error_misc']
53
BW_KEYVALUES_EXTRA_BWS = ['bw_median', 'bw_mean', 'desc_bw_avg', 'desc_bw_bur',
54
55
56
                          'desc_bw_obs_last', 'desc_bw_obs_mean',
                          'consensus_bandwidth',
                          'consensus_bandwidth_is_unmeasured']
57
BW_KEYVALUES_EXTRA = BW_KEYVALUES_FILE + BW_KEYVALUES_EXTRA_BWS
58
BW_KEYVALUES_INT = ['bw', 'rtt', 'success', 'error_stream',
59
60
                    'error_circ', 'error_misc'] + BW_KEYVALUES_EXTRA_BWS
BW_KEYVALUES = BW_KEYVALUES_BASIC + BW_KEYVALUES_EXTRA
juga  's avatar
juga committed
61
62


63
def round_sig_dig(n, digits=PROP276_ROUND_DIG):
64
65
66
67
68
69
70
    """Round n to 'digits' significant digits in front of the decimal point.
       Results less than or equal to 1 are rounded to 1.
       Returns an integer.

       digits must be greater than 0.
       n must be less than or equal to 2**73, to avoid floating point errors.
       """
71
    digits = int(digits)
72
73
74
75
76
77
78
79
80
    assert digits >= 1
    if n <= 1:
        return 1
    digits_in_n = int(math.log10(n)) + 1
    round_digits = max(digits_in_n - digits, 0)
    rounded_n = round(n, -round_digits)
    return int(rounded_n)


81
def kb_round_x_sig_dig(bw_bs, digits=PROP276_ROUND_DIG):
82
83
84
85
86
87
88
89
90
91
92
    """Convert bw_bs from bytes to kilobytes, and round the result to
       'digits' significant digits.
       Results less than or equal to 1 are rounded up to 1.
       Returns an integer.

       digits must be greater than 0.
       n must be less than or equal to 2**82, to avoid floating point errors.
       """
    # avoid double-rounding by using floating-point
    bw_kb = bw_bs / 1000.0
    return round_sig_dig(bw_kb, digits=digits)
93
94


juga  's avatar
juga committed
95
96
97
98
99
100
101
102
103
def num_results_of_type(results, type_str):
    return len([r for r in results if r.type == type_str])


# Better way to use enums?
def result_type_to_key(type_str):
    return type_str.replace('-', '_')


104
class V3BWHeader(object):
juga  's avatar
juga committed
105
106
    """
    Create a bandwidth measurements (V3bw) header
107
    following bandwidth measurements document spec version 1.X.X.
juga  's avatar
juga committed
108

109
    :param str timestamp: timestamp in Unix Epoch seconds of the most recent
110
        generator result.
juga  's avatar
juga committed
111
112
113
    :param str version: the spec version
    :param str software: the name of the software that generates this
    :param str software_version: the version of the software
114
    :param dict kwargs: extra headers. Currently supported:
juga  's avatar
juga committed
115

116
117
118
119
        - earliest_bandwidth: str, ISO 8601 timestamp in UTC time zone
          when the first bandwidth was obtained
        - generator_started: str, ISO 8601 timestamp in UTC time zone
          when the generator started
juga  's avatar
juga committed
120
    """
121
    def __init__(self, timestamp, **kwargs):
122
123
124
125
        assert isinstance(timestamp, str)
        for v in kwargs.values():
            assert isinstance(v, str)
        self.timestamp = timestamp
126
127
128
129
        # KeyValues with default value when not given by kwargs
        self.version = kwargs.get('version', SPEC_VERSION)
        self.software = kwargs.get('software', 'sbws')
        self.software_version = kwargs.get('software_version', __version__)
130
        self.file_created = kwargs.get('file_created', now_isodt_str())
juga  's avatar
juga committed
131
        # latest_bandwidth should not be in kwargs, since it MUST be the
132
        # same as timestamp
juga  's avatar
juga committed
133
        self.latest_bandwidth = unixts_to_isodt_str(timestamp)
134
135
        [setattr(self, k, v) for k, v in kwargs.items()
         if k in EXTRA_ARG_KEYVALUES]
136

137
    def __str__(self):
138
        if self.version.startswith('1.'):
139
140
            return self.strv1
        return self.strv2
141

juga  's avatar
juga committed
142
    @classmethod
143
    def from_results(cls, results, state_fpath=''):
juga  's avatar
juga committed
144
145
146
        kwargs = dict()
        latest_bandwidth = cls.latest_bandwidth_from_results(results)
        earliest_bandwidth = cls.earliest_bandwidth_from_results(results)
147
        generator_started = cls.generator_started_from_file(state_fpath)
juga  's avatar
juga committed
148
149
150
151
152
153
154
155
        timestamp = str(latest_bandwidth)
        kwargs['latest_bandwidth'] = unixts_to_isodt_str(latest_bandwidth)
        kwargs['earliest_bandwidth'] = unixts_to_isodt_str(earliest_bandwidth)
        if generator_started is not None:
            kwargs['generator_started'] = generator_started
        h = cls(timestamp, **kwargs)
        return h

156
    @classmethod
157
    def from_lines_v1(cls, lines):
158
159
        """
        :param list lines: list of lines to parse
160
        :returns: tuple of V3BWHeader object and non-header lines
161
162
163
164
        """
        assert isinstance(lines, list)
        try:
            index_terminator = lines.index(TERMINATOR)
Matt Traudt's avatar
Matt Traudt committed
165
        except ValueError:
166
167
168
            # is not a bw file or is v100
            log.warn('Terminator is not in lines')
            return None
169
        ts = lines[0]
170
        kwargs = dict([l.split(KEYVALUE_SEP_V1)
171
                       for l in lines[:index_terminator]
172
                       if l.split(KEYVALUE_SEP_V1)[0] in ALL_KEYVALUES])
173
        h = cls(ts, **kwargs)
juga  's avatar
juga committed
174
        # last line is new line
175
        return h, lines[index_terminator + 1:-1]
176
177

    @classmethod
178
    def from_text_v1(self, text):
179
        """
180
        :param str text: text to parse
181
        :returns: tuple of V3BWHeader object and non-header lines
182
183
        """
        assert isinstance(text, str)
184
        return self.from_lines_v1(text.split(LINE_SEP))
185

juga  's avatar
juga committed
186
187
188
189
190
191
192
193
194
195
196
    @classmethod
    def from_lines_v100(cls, lines):
        """
        :param list lines: list of lines to parse
        :returns: tuple of V3BWHeader object and non-header lines
        """
        assert isinstance(lines, list)
        h = cls(lines[0])
        # last line is new line
        return h, lines[1:-1]

197
    @staticmethod
198
    def generator_started_from_file(state_fpath):
199
200
201
202
        '''
        ISO formatted timestamp for the time when the scanner process most
        recently started.
        '''
203
        state = State(state_fpath)
204
205
206
207
        if 'scanner_started' in state:
            return state['scanner_started']
        else:
            return None
208
209

    @staticmethod
juga  's avatar
juga committed
210
    def latest_bandwidth_from_results(results):
211
        return round(max([r.time for fp in results for r in results[fp]]))
212
213
214

    @staticmethod
    def earliest_bandwidth_from_results(results):
215
        return round(min([r.time for fp in results for r in results[fp]]))
216

juga  's avatar
juga committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
    @property
    def keyvalue_unordered_tuple_ls(self):
        """Return list of KeyValue tuples that do not have specific order."""
        # sort the list to generate determinist headers
        keyvalue_tuple_ls = sorted([(k, v) for k, v in self.__dict__.items()
                                    if k in UNORDERED_KEYVALUES])
        return keyvalue_tuple_ls

    @property
    def keyvalue_tuple_ls(self):
        """Return list of all KeyValue tuples"""
        return [('version', self.version)] + self.keyvalue_unordered_tuple_ls

    @property
231
232
233
    def keyvalue_v1str_ls(self):
        """Return KeyValue list of strings following spec v1.X.X."""
        keyvalues = [self.timestamp] + [KEYVALUE_SEP_V1.join([k, v])
juga  's avatar
juga committed
234
235
236
237
                                        for k, v in self.keyvalue_tuple_ls]
        return keyvalues

    @property
238
239
240
    def strv1(self):
        """Return header string following spec v1.X.X."""
        header_str = LINE_SEP.join(self.keyvalue_v1str_ls) + LINE_SEP + \
juga  's avatar
juga committed
241
242
243
244
            LINE_TERMINATOR
        return header_str

    @property
245
246
247
    def keyvalue_v2_ls(self):
        """Return KeyValue list of strings following spec v2.X.X."""
        keyvalue = [self.timestamp] + [KEYVALUE_SEP_V2.join([k, v])
juga  's avatar
juga committed
248
249
250
251
                                       for k, v in self.keyvalue_tuple_ls]
        return keyvalue

    @property
252
253
254
    def strv2(self):
        """Return header string following spec v2.X.X."""
        header_str = LINE_SEP.join(self.keyvalue_v2_ls) + LINE_SEP + \
juga  's avatar
juga committed
255
256
257
258
259
260
            LINE_TERMINATOR
        return header_str

    @property
    def num_lines(self):
        return len(self.__str__().split(LINE_SEP))
juga  's avatar
juga committed
261

262
263
264
265
266
    def add_stats(self, **kwargs):
        # Using kwargs because attributes might chage.
        [setattr(self, k, str(v)) for k, v in kwargs.items()
         if k in STATS_KEYVALUES]

juga  's avatar
juga committed
267

juga  's avatar
juga committed
268
class V3BWLine(object):
juga  's avatar
juga committed
269
    """
270
    Create a Bandwidth List line following the spec version 1.X.X.
juga  's avatar
juga committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284

    :param str node_id:
    :param int bw:
    :param dict kwargs: extra headers. Currently supported:

        - nickname, str
        - master_key_ed25519, str
        - rtt, int
        - time, str
        - sucess, int
        - error_stream, int
        - error_circ, int
        - error_misc, int
    """
juga  's avatar
juga committed
285
286
287
    def __init__(self, node_id, bw, **kwargs):
        assert isinstance(node_id, str)
        assert isinstance(bw, int)
288
        assert node_id.startswith('$')
juga  's avatar
juga committed
289
290
291
        self.node_id = node_id
        self.bw = bw
        [setattr(self, k, v) for k, v in kwargs.items()
292
         if k in BW_KEYVALUES_EXTRA]
juga  's avatar
juga committed
293

juga  's avatar
juga committed
294
    def __str__(self):
295
        return self.bw_strv1
juga  's avatar
juga committed
296

juga  's avatar
juga committed
297
    @classmethod
juga  's avatar
juga committed
298
299
    def from_results(cls, results, secs_recent=None, secs_away=None,
                     min_num=0):
300
301
302
        """Convert sbws results to relays' Bandwidth Lines

        ``bs`` stands for Bytes/seconds
303
        ``bw_mean`` means the bw is obtained from the mean of the all the
304
305
306
307
308
        downloads' bandwidth.
        Downloads' bandwidth are calculated as the amount of data received
        divided by the the time it took to received.
        bw = data (Bytes) / time (seconds)
        """
juga  's avatar
juga committed
309
        success_results = [r for r in results if isinstance(r, ResultSuccess)]
310
        # log.debug("Len success_results %s", len(success_results))
juga  's avatar
juga committed
311
312
313
314
315
316
317
        node_id = '$' + results[0].fingerprint
        kwargs = dict()
        kwargs['nick'] = results[0].nickname
        if getattr(results[0], 'master_key_ed25519'):
            kwargs['master_key_ed25519'] = results[0].master_key_ed25519
        kwargs['time'] = cls.last_time_from_results(results)
        kwargs.update(cls.result_types_from_results(results))
318
319
        # useful args for scaling
        if success_results:
320
            results_away = \
321
322
323
                cls.results_away_each_other(success_results, secs_away)
            if not results_away:
                return None
324
325
            # log.debug("Results away from each other: %s",
            #           [unixts_to_isodt_str(r.time) for r in results_away])
326
327
328
            results_recent = cls.results_recent_than(results_away, secs_recent)
            if not results_recent:
                return None
329
330
331
            if not len(results_recent) >= min_num:
                # log.debug('The number of results is less than %s', min_num)
                return None
juga  's avatar
juga committed
332
333
334
            rtt = cls.rtt_from_results(results_recent)
            if rtt:
                kwargs['rtt'] = rtt
335
336
337
            bw = cls.bw_median_from_results(results_recent)
            kwargs['bw_mean'] = cls.bw_mean_from_results(results_recent)
            kwargs['bw_median'] = cls.bw_median_from_results(
338
                results_recent)
339
340
            kwargs['desc_bw_avg'] = \
                cls.desc_bw_avg_from_results(results_recent)
341
342
            kwargs['desc_bw_bur'] = \
                cls.desc_bw_bur_from_results(results_recent)
343
344
345
346
347
            kwargs['consensus_bandwidth'] = \
                cls.consensus_bandwidth_from_results(results_recent)
            kwargs['consensus_bandwidth_is_unmeasured'] = \
                cls.consensus_bandwidth_is_unmeasured_from_results(
                    results_recent)
348
349
350
351
            kwargs['desc_bw_obs_last'] = \
                cls.desc_bw_obs_last_from_results(results_recent)
            kwargs['desc_bw_obs_mean'] = \
                cls.desc_bw_obs_mean_from_results(results_recent)
352
353
354
            bwl = cls(node_id, bw, **kwargs)
            return bwl
        return None
juga  's avatar
juga committed
355
356
357
358
359
360

    @classmethod
    def from_data(cls, data, fingerprint):
        assert fingerprint in data
        return cls.from_results(data[fingerprint])

juga  's avatar
juga committed
361
    @classmethod
362
    def from_bw_line_v1(cls, line):
juga  's avatar
juga committed
363
        assert isinstance(line, str)
364
365
366
        kwargs = dict([kv.split(KEYVALUE_SEP_V1)
                       for kv in line.split(BW_KEYVALUE_SEP_V1)
                       if kv.split(KEYVALUE_SEP_V1)[0] in BW_KEYVALUES])
juga  's avatar
juga committed
367
368
369
        for k, v in kwargs.items():
            if k in BW_KEYVALUES_INT:
                kwargs[k] = int(v)
370
371
372
373
374
        node_id = kwargs['node_id']
        bw = kwargs['bw']
        del kwargs['node_id']
        del kwargs['bw']
        bw_line = cls(node_id, bw, **kwargs)
juga  's avatar
juga committed
375
376
        return bw_line

juga  's avatar
juga committed
377
378
    @staticmethod
    def results_away_each_other(results, secs_away=None):
juga  's avatar
juga committed
379
380
        # log.debug("Checking whether results are away from each other in %s "
        #           "secs.", secs_away)
juga  's avatar
juga committed
381
382
        if secs_away is None or len(results) < 2:
            return results
383
384
385
386
387
388
        for a, b in combinations(results, 2):
            if abs(a.time - b.time) > secs_away:
                return results
        # log.debug("Results are NOT away from each other in at least %ss: %s",
        #           secs_away, [unixts_to_isodt_str(r.time) for r in results])
        return None
juga  's avatar
juga committed
389
390
391
392
393

    @staticmethod
    def results_recent_than(results, secs_recent=None):
        if secs_recent is None:
            return results
394
        results_recent = list(filter(
juga  's avatar
juga committed
395
                            lambda x: (now_unixts() - x.time) < secs_recent,
396
397
398
399
400
401
                            results))
        # if not results_recent:
        #     log.debug("Results are NOT more recent than %ss: %s",
        #               secs_recent,
        #               [unixts_to_isodt_str(r.time) for r in results])
        return results_recent
juga  's avatar
juga committed
402

403
    @staticmethod
404
    def bw_median_from_results(results):
405
406
407
408
        return max(round(median([dl['amount'] / dl['duration']
                                 for r in results for dl in r.downloads])), 1)

    @staticmethod
409
    def bw_mean_from_results(results):
410
411
412
        return max(round(mean([dl['amount'] / dl['duration']
                               for r in results for dl in r.downloads])), 1)

juga  's avatar
juga committed
413
414
415
416
417
418
419
420
    @staticmethod
    def last_time_from_results(results):
        return unixts_to_isodt_str(round(max([r.time for r in results])))

    @staticmethod
    def rtt_from_results(results):
        # convert from miliseconds to seconds
        rtts = [(round(rtt * 1000)) for r in results for rtt in r.rtts]
juga  's avatar
juga committed
421
        rtt = round(median(rtts)) if rtts else None
juga  's avatar
juga committed
422
423
424
425
426
427
428
429
430
        return rtt

    @staticmethod
    def result_types_from_results(results):
        rt_dict = dict([(result_type_to_key(rt.value),
                         num_results_of_type(results, rt.value))
                        for rt in _ResultType])
        return rt_dict

431
432
433
434
435
436
437
438
    @staticmethod
    def desc_bw_avg_from_results(results):
        """Obtain the last descriptor bandwidth average from the results."""
        for r in reversed(results):
            if r.relay_average_bandwidth is not None:
                return r.relay_average_bandwidth
        return None

439
440
441
442
443
444
445
446
    @staticmethod
    def desc_bw_bur_from_results(results):
        """Obtain the last descriptor bandwidth burst from the results."""
        for r in reversed(results):
            if r.relay_burst_bandwidth is not None:
                return r.relay_burst_bandwidth
        return None

447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
    @staticmethod
    def consensus_bandwidth_from_results(results):
        """Obtain the last consensus bandwidth from the results."""
        for r in reversed(results):
            if r.consensus_bandwidth is not None:
                return r.consensus_bandwidth
        return None

    @staticmethod
    def consensus_bandwidth_is_unmeasured_from_results(results):
        """Obtain the last consensus unmeasured flag from the results."""
        for r in reversed(results):
            if r.consensus_bandwidth_is_unmeasured is not None:
                return r.consensus_bandwidth_is_unmeasured
        return None

463
    @staticmethod
464
465
    def desc_bw_obs_mean_from_results(results):
        desc_bw_obs_ls = []
466
467
        for r in results:
            if r.relay_observed_bandwidth is not None:
468
469
470
                desc_bw_obs_ls.append(r.relay_observed_bandwidth)
        if desc_bw_obs_ls:
            return max(round(mean(desc_bw_obs_ls)), 1)
471
472
473
        return None

    @staticmethod
474
    def desc_bw_obs_last_from_results(results):
475
476
477
478
479
480
        # the last is at the end of the list
        for r in reversed(results):
            if r.relay_observed_bandwidth is not None:
                return r.relay_observed_bandwidth
        return None

juga  's avatar
juga committed
481
482
483
484
485
486
487
    @property
    def bw_keyvalue_tuple_ls(self):
        """Return list of KeyValue Bandwidth Line tuples."""
        # sort the list to generate determinist headers
        keyvalue_tuple_ls = sorted([(k, v) for k, v in self.__dict__.items()
                                    if k in BW_KEYVALUES])
        return keyvalue_tuple_ls
juga  's avatar
juga committed
488

juga  's avatar
juga committed
489
    @property
490
    def bw_keyvalue_v1str_ls(self):
juga  's avatar
juga committed
491
        """Return list of KeyValue Bandwidth Line strings following
492
        spec v1.X.X.
juga  's avatar
juga committed
493
        """
494
        bw_keyvalue_str = [KEYVALUE_SEP_V1 .join([k, str(v)])
juga  's avatar
juga committed
495
496
                           for k, v in self.bw_keyvalue_tuple_ls]
        return bw_keyvalue_str
juga  's avatar
juga committed
497

juga  's avatar
juga committed
498
    @property
499
500
501
502
    def bw_strv1(self):
        """Return Bandwidth Line string following spec v1.X.X."""
        bw_line_str = BW_KEYVALUE_SEP_V1.join(
                        self.bw_keyvalue_v1str_ls) + LINE_SEP
juga  's avatar
juga committed
503
504
505
506
507
508
        if len(bw_line_str) > BW_LINE_SIZE:
            # if this is the case, probably there are too many KeyValues,
            # or the limit needs to be changed in Tor
            log.warn("The bandwidth line %s is longer than %s",
                     len(bw_line_str), BW_LINE_SIZE)
        return bw_line_str
juga  's avatar
juga committed
509

juga  's avatar
juga committed
510

511
class V3BWFile(object):
juga  's avatar
juga committed
512
    """
513
    Create a Bandwidth List file following spec version 1.X.X
juga  's avatar
juga committed
514
515
516
517

    :param V3BWHeader v3bwheader: header
    :param list v3bwlines: V3BWLines
    """
juga  's avatar
juga committed
518
519
520
521
522
    def __init__(self, v3bwheader, v3bwlines):
        self.header = v3bwheader
        self.bw_lines = v3bwlines

    def __str__(self):
523
        return str(self.header) + ''.join([str(bw_line) or ''
juga  's avatar
juga committed
524
525
                                           for bw_line in self.bw_lines])

juga  's avatar
juga committed
526
    @classmethod
527
    def from_results(cls, results, state_fpath='',
528
                     scale_constant=SBWS_SCALE_CONSTANT,
529
530
                     scaling_method=TORFLOW_SCALING,
                     torflow_obs=TORFLOW_OBS_LAST,
juga  's avatar
juga committed
531
                     torflow_cap=TORFLOW_BW_MARGIN,
532
                     torflow_round_digs=PROP276_ROUND_DIG,
533
                     secs_recent=None, secs_away=None, min_num=0,
534
535
                     consensus_path=None, max_bw_diff_perc=MAX_BW_DIFF_PERC,
                     reverse=False):
536
537
538
539
540
541
        """Create V3BWFile class from sbws Results.

        :param dict results: see below
        :param str state_fpath: path to the state file
        :param int scaling_method:
            Scaling method to obtain the bandwidth
542
            Possible values: {None, SBWS_SCALING, TORFLOW_SCALING} = {0, 1, 2}
543
544
545
546
547
548
549
550
551
552
553
554
555
        :param int scale_constant: sbws scaling constant
        :param int torflow_obs: method to choose descriptor observed bandwidth
        :param bool reverse: whether to sort the bw lines descending or not

        Results are in the form::

            {'relay_fp1': [Result1, Result2, ...],
             'relay_fp2': [Result1, Result2, ...]}

        """
        log.info('Processing results to generate a bandwidth list file.')
        header = V3BWHeader.from_results(results, state_fpath)
        bw_lines_raw = []
juga  's avatar
juga committed
556
557
        number_consensus_relays = cls.read_number_consensus_relays(
            consensus_path)
558
        state = State(state_fpath)
juga  's avatar
juga committed
559
        for fp, values in results.items():
560
            # log.debug("Relay fp %s", fp)
juga  's avatar
juga committed
561
562
            line = V3BWLine.from_results(values, secs_recent, secs_away,
                                         min_num)
juga  's avatar
juga committed
563
564
            if line is not None:
                bw_lines_raw.append(line)
565
        if not bw_lines_raw:
566
567
            log.info("After applying restrictions to the raw results, "
                     "there is not any. Scaling can not be applied.")
juga  's avatar
juga committed
568
569
            cls.update_progress(
                cls, bw_lines_raw, header, number_consensus_relays, state)
570
571
572
            return cls(header, [])
        if scaling_method == SBWS_SCALING:
            bw_lines = cls.bw_sbws_scale(bw_lines_raw, scale_constant)
573
            cls.warn_if_not_accurate_enough(bw_lines, scale_constant)
574
575
            # log.debug(bw_lines[-1])
        elif scaling_method == TORFLOW_SCALING:
juga  's avatar
juga committed
576
            bw_lines = cls.bw_torflow_scale(bw_lines_raw, torflow_obs,
577
                                            torflow_cap, torflow_round_digs)
578
            # log.debug(bw_lines[-1])
juga  's avatar
juga committed
579
580
            cls.update_progress(
                cls, bw_lines, header, number_consensus_relays, state)
juga  's avatar
juga committed
581
        else:
582
583
            bw_lines = cls.bw_kb(bw_lines_raw)
            # log.debug(bw_lines[-1])
584
585
        # Not using the result for now, just warning
        cls.is_max_bw_diff_perc_reached(bw_lines, max_bw_diff_perc)
juga  's avatar
juga committed
586
587
588
        f = cls(header, bw_lines)
        return f

589
    @classmethod
590
    def from_v1_fpath(cls, fpath):
591
592
593
594
        log.info('Parsing bandwidth file %s', fpath)
        with open(fpath) as fd:
            text = fd.read()
        all_lines = text.split(LINE_SEP)
595
596
        header, lines = V3BWHeader.from_lines_v1(all_lines)
        bw_lines = [V3BWLine.from_bw_line_v1(line) for line in lines]
597
598
        return cls(header, bw_lines)

599
600
601
602
603
604
605
    @classmethod
    def from_v100_fpath(cls, fpath):
        log.info('Parsing bandwidth file %s', fpath)
        with open(fpath) as fd:
            text = fd.read()
        all_lines = text.split(LINE_SEP)
        header, lines = V3BWHeader.from_lines_v100(all_lines)
606
        bw_lines = sorted([V3BWLine.from_bw_line_v1(l) for l in lines],
607
608
609
                          key=lambda l: l.bw)
        return cls(header, bw_lines)

juga  's avatar
juga committed
610
611
612
613
614
615
616
    @staticmethod
    def bw_kb(bw_lines, reverse=False):
        bw_lines_scaled = copy.deepcopy(bw_lines)
        for l in bw_lines_scaled:
            l.bw = max(round(l.bw / 1000), 1)
        return sorted(bw_lines_scaled, key=lambda x: x.bw, reverse=reverse)

juga  's avatar
juga committed
617
    @staticmethod
618
    def bw_sbws_scale(bw_lines, scale_constant=SBWS_SCALE_CONSTANT,
juga  's avatar
juga committed
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
                      reverse=False):
        """Return a new V3BwLine list scaled using sbws method.

        :param list bw_lines:
            bw lines to scale, not self.bw_lines,
            since this method will be before self.bw_lines have been
            initialized.
        :param int scale_constant:
            the constant to multiply by the ratio and
            the bandwidth to obtain the new bandwidth
        :returns list: V3BwLine list
        """
        log.debug('Scaling bandwidth using sbws method.')
        m = median([l.bw for l in bw_lines])
        bw_lines_scaled = copy.deepcopy(bw_lines)
        for l in bw_lines_scaled:
            # min is to limit the bw to descriptor average-bandwidth
            # max to avoid bandwidth with 0 value
637
            l.bw = max(round(min(l.desc_bw_avg,
638
                                 l.bw * scale_constant / m)
juga  's avatar
juga committed
639
640
641
642
643
                             / 1000), 1)
        return sorted(bw_lines_scaled, key=lambda x: x.bw, reverse=reverse)

    @staticmethod
    def warn_if_not_accurate_enough(bw_lines,
644
                                    scale_constant=SBWS_SCALE_CONSTANT):
juga  's avatar
juga committed
645
646
647
648
649
650
651
652
        margin = 0.001
        accuracy_ratio = median([l.bw for l in bw_lines]) / scale_constant
        log.info('The generated lines are within {:.5}% of what they should '
                 'be'.format((1 - accuracy_ratio) * 100))
        if accuracy_ratio < 1 - margin or accuracy_ratio > 1 + margin:
            log.warning('There was %f%% error and only +/- %f%% is '
                        'allowed', (1 - accuracy_ratio) * 100, margin * 100)

653
    @staticmethod
654
655
656
    def is_max_bw_diff_perc_reached(bw_lines,
                                    max_bw_diff_perc=MAX_BW_DIFF_PERC):
        sum_consensus_bw = sum([l.desc_bw_obs_last for l in bw_lines])
657
658
659
660
661
662
663
664
665
666
667
        sum_bw = sum([l.bw for l in bw_lines])
        diff = min(sum_consensus_bw, sum_bw) / max(sum_consensus_bw, sum_bw)
        diff_perc = diff * 100
        log.info("The difference between the total consensus bandwidth "
                 "and the total measured bandwidth is %s%% percent",
                 diff_perc)
        if diff_perc > MAX_BW_DIFF_PERC:
            log.warning("It is more than %s%%", max_bw_diff_perc)
            return True
        return False

668
    @staticmethod
669
    def bw_torflow_scale(bw_lines, desc_bw_obs_type=TORFLOW_OBS_MEAN,
670
                         cap=TORFLOW_BW_MARGIN,
671
                         num_round_dig=PROP276_ROUND_DIG, reverse=False):
juga  's avatar
juga committed
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
        """
        Obtain final bandwidth measurements applying Torflow's scaling
        method.

        From Torflow's README.spec.txt (section 2.2)::

            In this way, the resulting network status consensus bandwidth values  # NOQA
            are effectively re-weighted proportional to how much faster the node  # NOQA
            was as compared to the rest of the network.

        The variables and steps used in Torflow:

        **strm_bw**::

            The strm_bw field is the average (mean) of all the streams for the relay  # NOQA
            identified by the fingerprint field.
            strm_bw = sum(bw stream x)/|n stream|

        **filt_bw**::

            The filt_bw field is computed similarly, but only the streams equal to  # NOQA
            or greater than the strm_bw are counted in order to filter very slow  # NOQA
            streams due to slow node pairings.

        **filt_sbw and strm_sbw**::

            for rs in RouterStats.query.filter(stats_clause).\
                  options(eagerload_all('router.streams.circuit.routers')).all():  # NOQA
              tot_sbw = 0
              sbw_cnt = 0
              for s in rs.router.streams:
                if isinstance(s, ClosedStream):
                  skip = False
                  #for br in badrouters:
                  #  if br != rs:
                  #    if br.router in s.circuit.routers:
                  #      skip = True
                  if not skip:
                    # Throw out outliers < mean
                    # (too much variance for stddev to filter much)
                    if rs.strm_closed == 1 or s.bandwidth() >= rs.sbw:
                      tot_sbw += s.bandwidth()
                      sbw_cnt += 1

            if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
            else: rs.filt_sbw = None

        **filt_avg, and strm_avg**::

            Once we have determined the most recent measurements for each node, we  # NOQA
            compute an average of the filt_bw fields over all nodes we have measured.  # NOQA

        ::

            filt_avg = sum(map(lambda n: n.filt_bw, nodes.itervalues()))/float(len(nodes))  # NOQA
            strm_avg = sum(map(lambda n: n.strm_bw, nodes.itervalues()))/float(len(nodes))  # NOQA

        **true_filt_avg and true_strm_avg**::

            for cl in ["Guard+Exit", "Guard", "Exit", "Middle"]:
                true_filt_avg[cl] = filt_avg
                true_strm_avg[cl] = strm_avg

        In the non-pid case, all types of nodes get the same avg

        **n.fbw_ratio and n.fsw_ratio**::

            for n in nodes.itervalues():
                n.fbw_ratio = n.filt_bw/true_filt_avg[n.node_class()]
                n.sbw_ratio = n.strm_bw/true_strm_avg[n.node_class()]

        **n.ratio**::

            These averages are used to produce ratios for each node by dividing the  # NOQA
            measured value for that node by the network average.

        ::

            # Choose the larger between sbw and fbw
              if n.sbw_ratio > n.fbw_ratio:
                n.ratio = n.sbw_ratio
              else:
                n.ratio = n.fbw_ratio

        **desc_bw**:

juga  's avatar
juga committed
758
759
760
761
        It is the minimum of all the descriptor bandwidth values::

            bws = map(int, g)
            bw_observed = min(bws)
juga  's avatar
juga committed
762
763
764
765

            return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
            ns.flags, ip, version, os, uptime, published, contact, rate_limited,  # NOQA
            ns.orhash, ns.bandwidth, extra_info_digest, ns.unmeasured)
juga  's avatar
juga committed
766

juga  's avatar
juga committed
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
            self.desc_bw = max(bw,1) # Avoid div by 0

        **new_bw**::

            These ratios are then multiplied by the most recent observed descriptor  # NOQA
            bandwidth we have available for each node, to produce a new value for  # NOQA
            the network status consensus process.

        ::

            n.new_bw = n.desc_bw*n.ratio

        The descriptor observed bandwidth is multiplied by the ratio.

        **Limit the bandwidth to a maximum**::

            NODE_CAP = 0.05

        ::

            if n.new_bw > tot_net_bw*NODE_CAP:
              plog("INFO", "Clipping extremely fast "+n.node_class()+" node "+n.idhex+"="+n.nick+  # NOQA
                   " at "+str(100*NODE_CAP)+"% of network capacity ("+
                   str(n.new_bw)+"->"+str(int(tot_net_bw*NODE_CAP))+") "+
                   " pid_error="+str(n.pid_error)+
                   " pid_error_sum="+str(n.pid_error_sum))
              n.new_bw = int(tot_net_bw*NODE_CAP)

        However, tot_net_bw does not seems to be updated when not using pid.
        This clipping would make faster relays to all have the same value.

        All of that can be expressed as:

        .. math::
801

802
803
804
805
           bwn_i =& min\\left(bwnew_i,
                      \\sum_{i=1}^{n}bwnew_i \\times 0.05\\right) \\

                 &= min\\left(
juga  's avatar
juga committed
806
807
                      \\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times r_i\\right),
                        \\sum_{i=1}^{n}\\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times r_i\\right)
808
809
810
                        \\times 0.05\\right)\\

                 &= min\\left(
juga  's avatar
juga committed
811
812
                      \\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times max\\left(rf_i, rs_i\\right)\\right),
                        \\sum_{i=1}^{n}\\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times
813
814
815
                          max\\left(rf_i, rs_i\\right)\\right) \\times 0.05\\right)\\

                 &= min\\left(
juga  's avatar
juga committed
816
                      \\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times max\\left(\\frac{bwfilt_i}{bwfilt},
817
                          \\frac{bw_i}{bwstrm}\\right)\\right),
juga  's avatar
juga committed
818
                        \\sum_{i=1}^{n}\\left(min\\left(bwobs_i, bwavg_i, bwbur_i \\right) \\times
819
820
                          max\\left(\\frac{bwfilt_i}{bwfilt},
                            \\frac{bw_i}{bwstrm}\\right)\\right) \\times 0.05\\right)
juga  's avatar
juga committed
821
822
823
824
825

        """
        log.info("Calculating relays' bandwidth using Torflow method.")
        bw_lines_tf = copy.deepcopy(bw_lines)
        # mean (Torflow's strm_avg)
826
        mu = mean([l.bw_mean for l in bw_lines])
juga  's avatar
juga committed
827
        # filtered mean (Torflow's filt_avg)
828
        muf = mean([max(l.bw_mean, mu) for l in bw_lines])
juga  's avatar
juga committed
829
        # bw sum (Torflow's tot_net_bw or tot_sbw)
830
        sum_bw = sum([l.bw_mean for l in bw_lines])
juga  's avatar
juga committed
831
832
        # Torflow's clipping
        hlimit = sum_bw * TORFLOW_BW_MARGIN
juga  's avatar
juga committed
833
834
835
        log.debug('sum %s', sum_bw)
        log.debug('mu %s', mu)
        log.debug('muf %s', muf)
juga  's avatar
juga committed
836
        log.debug('hlimit %s', hlimit)
juga  's avatar
juga committed
837
        for l in bw_lines_tf:
838
839
840
841
            if desc_bw_obs_type == TORFLOW_OBS_LAST:
                desc_bw_obs = l.desc_bw_obs_last
            elif desc_bw_obs_type == TORFLOW_OBS_MEAN:
                desc_bw_obs = l.desc_bw_obs_mean
842
843
844
845
846
847
848
849
850
            # Excerpt from bandwidth-file-spec.txt section 2.3
            # A relay's MaxAdvertisedBandwidth limits the bandwidth-avg in its
            # descriptor.
            # Therefore generators MUST limit a relay's measured bandwidth to
            # its descriptor's bandwidth-avg.
            # Generators SHOULD NOT limit measured bandwidths based on
            # descriptors' bandwidth-observed, because that penalises new
            # relays.
            # See https://trac.torproject.org/projects/tor/ticket/8494
851
852
853
854
855
856
857
858
859
            if l.desc_bw_bur is not None:
                # Because in previous versions results were not storing
                # desc_bw_bur
                desc_bw = min(desc_bw_obs, l.desc_bw_bur, l.desc_bw_avg)
            else:
                desc_bw = min(desc_bw_obs, l.desc_bw_avg)
            # In previous versions results were not storing consensus_bandwidth
            if l.consensus_bandwidth_is_unmeasured \
                    or l.consensus_bandwidth is None:
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
                min_bandwidth = desc_bw
            # If the relay is measured, use the minimum between the descriptors
            # bandwidth and the consensus bandwidth, so that
            # MaxAdvertisedBandwidth limits the consensus weight
            # The consensus bandwidth in a measured relay has been obtained
            # doing the same calculation as here
            else:
                min_bandwidth = min(desc_bw, l.consensus_bandwidth)
            # Torflow's scaling
            ratio_stream = l.bw_mean / mu
            ratio_stream_filtered = max(l.bw_mean, mu) / muf
            ratio = max(ratio_stream, ratio_stream_filtered)
            bw_scaled = ratio * min_bandwidth
            # round and convert to KB
            bw_new = kb_round_x_sig_dig(bw_scaled, digits=num_round_dig)
juga  's avatar
juga committed
875
876
877
            # Cap maximum bw
            if cap is not None:
                bw_new = min(hlimit, bw_new)
878
879
            # avoid 0
            l.bw = max(bw_new, 1)
juga  's avatar
juga committed
880
        return sorted(bw_lines_tf, key=lambda x: x.bw, reverse=reverse)
881

882
    @staticmethod
juga  's avatar
juga committed
883
    def read_number_consensus_relays(consensus_path):
884
885
886
887
888
        """Read the number of relays in the Network from the cached consensus
        file."""
        num = None
        try:
            num = len(list(parse_file(consensus_path)))
889
        except (FileNotFoundError, AttributeError):
890
891
892
893
894
895
            log.info("It is not possible to obtain statistics about the "
                     "percentage of measured relays because the cached "
                     "consensus file is not found.")
        log.debug("Number of relays in the network %s", num)
        return num

896
    @staticmethod
juga  's avatar
juga committed
897
    def measured_progress_stats(bw_lines, number_consensus_relays,
898
                                min_perc_reached_before):
899
900
901
902
903
        """ Statistics about measurements progress,
        to be included in the header.

        :param list bw_lines: the bw_lines after scaling and applying filters.
        :param str consensus_path: the path to the cached consensus file.
904
        :param str state_fpath: the path to the state file
905
906
907
908
909
910
911
912
913
        :returns dict, bool: Statistics about the progress made with
            measurements and whether the percentage of measured relays has been
            reached.

        """
        # cached-consensus should be updated every time that scanner get the
        # network status or descriptors?
        # It will not be updated to the last consensus, but the list of
        # measured relays is not either.
juga  's avatar
juga committed
914
        assert isinstance(number_consensus_relays, int)
915
916
        assert isinstance(bw_lines, list)
        statsd = {}
juga  's avatar
juga committed
917
918
919
920
921
922
923
924
925
        statsd['number_eligible_relays'] = len(bw_lines)
        statsd['number_consensus_relays'] = number_consensus_relays
        statsd['minimum_number_eligible_relays'] = round(
            statsd['number_consensus_relays'] * MIN_REPORT / 100)
        statsd['percent_eligible_relays'] = round(
            len(bw_lines) * 100 / statsd['number_consensus_relays'])
        statsd['minimum_percent_eligible_relays'] = MIN_REPORT
        if statsd['number_eligible_relays'] < \
                statsd['minimum_number_eligible_relays']:
926
            # if min percent was was reached before, warn
927
928
929
930
            # otherwise, debug
            if min_perc_reached_before is not None:
                log.warning('The percentage of the measured relays is less '
                            'than the %s%% of the relays in the network (%s).',
juga  's avatar
juga committed
931
                            MIN_REPORT, statsd['number_consensus_relays'])
932
933
934
            else:
                log.info('The percentage of the measured relays is less '
                         'than the %s%% of the relays in the network (%s).',
juga  's avatar
juga committed
935
                         MIN_REPORT, statsd['number_consensus_relays'])
936
937
938
939
940
            return statsd, False
        return statsd, True

    @property
    def is_min_perc(self):
juga  's avatar
juga committed
941
942
        if getattr(self.header, 'number_eligible_relays', 0) \
                < getattr(self.header, 'minimum_number_eligible_relays', 0):
943
944
945
            return False
        return True

juga  's avatar
juga committed
946
    @property
juga  's avatar
juga committed
947
948
    def sum_bw(self):
        return sum([l.bw for l in self.bw_lines])
juga  's avatar
juga committed
949
950

    @property
juga  's avatar
juga committed
951
    def num(self):
juga  's avatar
juga committed
952
953
954
        return len(self.bw_lines)

    @property
juga  's avatar
juga committed
955
956
957
958
959
960
    def mean_bw(self):
        return mean([l.bw for l in self.bw_lines])

    @property
    def median_bw(self):
        return median([l.bw for l in self.bw_lines])
juga  's avatar
juga committed
961

juga  's avatar
juga committed
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
    @property
    def max_bw(self):
        return max([l.bw for l in self.bw_lines])

    @property
    def min_bw(self):
        return min([l.bw for l in self.bw_lines])

    @property
    def info_stats(self):
        if not self.bw_lines:
            return
        [log.info(': '.join([attr, str(getattr(self, attr))])) for attr in
         ['sum_bw', 'mean_bw', 'median_bw', 'num',
          'max_bw', 'min_bw']]

juga  's avatar
juga committed
978
979
    def update_progress(self, bw_lines, header, number_consensus_relays,
                        state):
980
        min_perc_reached_before = state.get('min_perc_reached')
juga  's avatar
juga committed
981
        if number_consensus_relays is not None:
982
            statsd, success = self.measured_progress_stats(
juga  's avatar
juga committed
983
                bw_lines, number_consensus_relays, min_perc_reached_before)
984
985
            # add statistics about progress always
            header.add_stats(**statsd)
986
987
988
989
990
991
992
            if not success:
                bw_lines = []
                state['min_perc_reached'] = None
            else:
                state['min_perc_reached'] = now_isodt_str()
        return bw_lines

juga  's avatar
juga committed
993
994
995
996
997
998
999
1000
1001
1002
    def bw_line_for_node_id(self, node_id):
        """Returns the bandwidth line for a given node fingerprint.

        Used to combine data when plotting.
        """
        bwl = [l for l in self.bw_lines if l.node_id == node_id]
        if bwl:
            return bwl[0]
        return None

1003
1004
1005
1006
1007
1008
1009
1010
1011
    def to_plt(self, attrs=['bw'], sorted_by=None):
        """Return bandwidth data in a format useful for matplotlib.

        Used from external tool to plot.
        """
        x = [i for i in range(0, self.num)]
        ys = [[getattr(l, k) for l in self.bw_lines] for k in attrs]
        return x, ys, attrs

juga  's avatar
juga committed
1012
1013
1014
1015
1016
    def write(self, output):
        if output == '/dev/stdout':
            log.info("Writing to stdout is not supported.")
            return
        log.info('Writing v3bw file to %s', output)
1017
1018
        # To avoid inconsistent reads, the bandwidth data is written to an
        # archive path, then atomically symlinked to 'latest.v3bw'
juga  's avatar
juga committed
1019
1020
        out_dir = os.path.dirname(output)
        out_link = os.path.join(out_dir, 'latest.v3bw')
1021
        out_link_tmp = out_link + '.tmp'
juga  's avatar
juga committed
1022
1023
1024
1025
1026
1027
        with DirectoryLock(out_dir):
            with open(output, 'wt') as fd:
                fd.write(str(self.header))
                for line in self.bw_lines:
                    fd.write(str(line))
            output_basename = os.path.basename(output)
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
            # To atomically symlink a file, we need to create a temporary link,
            # then rename it to the final link name. (POSIX guarantees that
            # rename is atomic.)
            log.debug('Creating symlink {} -> {}.'
                      .format(out_link_tmp, output_basename))
            os.symlink(output_basename, out_link_tmp)
            log.debug('Renaming symlink {} -> {} to {} -> {}.'
                      .format(out_link_tmp, output_basename,
                              out_link, output_basename))
            os.rename(out_link_tmp, out_link)