resultdump.py 14.6 KB
Newer Older
Matt Traudt's avatar
Matt Traudt committed
1
import os
Matt Traudt's avatar
Matt Traudt committed
2
import json
Matt Traudt's avatar
Matt Traudt committed
3
import time
4
import logging
5
from glob import glob
Matt Traudt's avatar
Matt Traudt committed
6
7
from threading import Thread
from threading import Event
8
from threading import RLock
Matt Traudt's avatar
Matt Traudt committed
9
10
from queue import Queue
from queue import Empty
Matt Traudt's avatar
Matt Traudt committed
11
from datetime import datetime
12
from datetime import timedelta
13
from enum import Enum
14
from stem.descriptor.router_status_entry import RouterStatusEntryV3
15
from sbws.globals import RESULT_VERSION
16
from sbws.util.filelock import DirectoryLock
Matt Traudt's avatar
Matt Traudt committed
17

18
19
log = logging.getLogger(__name__)

Matt Traudt's avatar
Matt Traudt committed
20

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def group_results_by_relay(results, starting_dict=None):
    ''' Given a list of Results, sort them by the relay fingerprint that they
    measured and return the resulting dict. Optionally start with the given
    dict instead of an empty one. '''
    data = starting_dict if starting_dict else {}
    assert isinstance(data, dict)
    assert isinstance(results, list)
    for result in results:
        assert isinstance(result, Result)
        fp = result.fingerprint
        if fp not in data:
            data[fp] = []
        data[fp].append(result)
    return data


37
def load_result_file(fname, success_only=False):
38
39
40
41
42
43
    ''' Reads in all lines from the given file, and parses them into Result
    structures (or subclasses of Result). Optionally only keeps ResultSuccess.
    Returns all kept Results as a list. This function does not care about the
    age of the results '''
    assert os.path.isfile(fname)
    d = []
44
    num_ignored = 0
45
    with DirectoryLock(os.path.dirname(fname)):
46
47
48
49
50
51
52
53
54
        with open(fname, 'rt') as fd:
            for line in fd:
                r = Result.from_dict(json.loads(line.strip()))
                if r is None:
                    num_ignored += 1
                    continue
                if success_only and isinstance(r, ResultError):
                    continue
                d.append(r)
55
    log.debug('Read %d lines from %s', len(d), fname)
56
    if num_ignored > 0:
57
58
        log.warning('Had to ignore %d results due to not knowing how to '
                    'parse them.', num_ignored)
59
60
61
    return d


62
def trim_results(fresh_days, results):
63
64
65
66
67
    ''' Given a result list, remove all Results that are no longer valid and
    return the new list '''
    assert isinstance(fresh_days, int)
    assert isinstance(results, list)
    data_period = fresh_days * 24*60*60
Matt Traudt's avatar
Matt Traudt committed
68
    oldest_allowed = time.time() - data_period
69
70
71
72
    out_results = []
    for result in results:
        if result.time >= oldest_allowed:
            out_results.append(result)
73
    log.info('Keeping %d/%d results', len(out_results), len(results))
74
75
76
    return out_results


77
def load_recent_results_in_datadir(fresh_days, datadir, success_only=False):
78
79
80
81
82
83
    ''' Given a data directory, read all results files in it that could have
    results in them that are still valid. Trim them, and return the valid
    Results as a list '''
    assert isinstance(fresh_days, int)
    assert os.path.isdir(datadir)
    results = []
Matt Traudt's avatar
Matt Traudt committed
84
    today = datetime.utcfromtimestamp(time.time())
85
86
87
88
    data_period = fresh_days + 2
    oldest_day = today - timedelta(days=data_period)
    working_day = oldest_day
    while working_day <= today:
89
90
91
        # Cannot use ** and recursive=True in glob() because we support 3.4
        # So instead settle on finding files in the datadir and one
        # subdirectory below the datadir that fit the form of YYYY-MM-DD*.txt
Matt Traudt's avatar
Matt Traudt committed
92
93
94
        d = working_day.date()
        patterns = [os.path.join(datadir, '{}*.txt'.format(d)),
                    os.path.join(datadir, '*', '{}*.txt'.format(d))]
95
96
97
98
        for pattern in patterns:
            for fname in glob(pattern):
                results.extend(load_result_file(
                    fname, success_only=success_only))
99
        working_day += timedelta(days=1)
100
    results = trim_results(fresh_days, results)
101
    if len(results) == 0:
102
        log.warning('Results files that are valid not found. '
Matt Traudt's avatar
Matt Traudt committed
103
                    'Probably sbws scanner was not run first or '
104
105
106
                    'it ran more than %d days ago or '
                    'it was using a different datadir than %s.', data_period,
                    datadir)
107
108
109
    return results


110
111
112
113
def write_result_to_datadir(result, datadir):
    ''' Can be called from any thread '''
    assert isinstance(result, Result)
    assert os.path.isdir(datadir)
Matt Traudt's avatar
Matt Traudt committed
114
    dt = datetime.utcfromtimestamp(result.time)
115
116
    ext = '.txt'
    result_fname = os.path.join(
Matt Traudt's avatar
Matt Traudt committed
117
        datadir, '{}{}'.format(dt.date(), ext))
118
    with DirectoryLock(datadir):
Matt Traudt's avatar
Matt Traudt committed
119
        log.debug('Writing a result to %s', result_fname)
120
121
        with open(result_fname, 'at') as fd:
            fd.write('{}\n'.format(str(result)))
122
123


124
125
126
127
128
129
130
131
132
133
134
135
class _StrEnum(str, Enum):
    pass


class _ResultType(_StrEnum):
    Success = 'success'
    Error = 'error-misc'
    ErrorCircuit = 'error-circ'
    ErrorStream = 'error-stream'
    ErrorAuth = 'error-auth'


Matt Traudt's avatar
Matt Traudt committed
136
class Result:
Matt Traudt's avatar
Matt Traudt committed
137
138
    ''' A simple struct to pack a measurement result into so that other code
    can be confident it is handling a well-formed result. '''
139
140
141
142
143
144
145
146
147

    class Relay:
        ''' Implements just enough of a stem RouterStatusEntryV3 for this
        Result class to be happy '''
        def __init__(self, fingerprint, nickname, address):
            self.fingerprint = fingerprint
            self.nickname = nickname
            self.address = address

Matt Traudt's avatar
Matt Traudt committed
148
    def __init__(self, relay, circ, server_host, scanner_nick, t=None):
149
150
        self._relay = Result.Relay(relay.fingerprint, relay.nickname,
                                   relay.address)
Matt Traudt's avatar
Matt Traudt committed
151
152
        self._circ = circ
        self._server_host = server_host
Matt Traudt's avatar
Matt Traudt committed
153
        self._scanner = scanner_nick
Matt Traudt's avatar
Matt Traudt committed
154
        self._time = time.time() if t is None else t
Matt Traudt's avatar
Matt Traudt committed
155

156
157
158
159
    @property
    def type(self):
        raise NotImplementedError()

Matt Traudt's avatar
Matt Traudt committed
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
    @property
    def fingerprint(self):
        return self._relay.fingerprint

    @property
    def nickname(self):
        return self._relay.nickname

    @property
    def address(self):
        return self._relay.address

    @property
    def circ(self):
        return self._circ

Matt Traudt's avatar
Matt Traudt committed
176
    @property
177
178
    def server_host(self):
        return self._server_host
Matt Traudt's avatar
Matt Traudt committed
179

180
181
182
183
    @property
    def scanner(self):
        return self._scanner

Matt Traudt's avatar
Matt Traudt committed
184
185
186
187
    @property
    def time(self):
        return self._time

188
189
    @property
    def version(self):
190
        return RESULT_VERSION
191

192
193
194
195
196
197
198
199
200
    def to_dict(self):
        return {
            'fingerprint': self.fingerprint,
            'nickname': self.nickname,
            'address': self.address,
            'circ': self.circ,
            'server_host': self.server_host,
            'time': self.time,
            'type': self.type,
201
            'scanner': self.scanner,
202
            'version': self.version,
203
204
205
206
        }

    @staticmethod
    def from_dict(d):
207
208
209
210
211
        ''' Given a dict, returns the Result* subtype that is represented by
        the dict. If we don't know how to parse the dict into a Result and it's
        likely because the programmer forgot to implement something, raises
        NotImplementedError. If we can't parse the dict for some other reason,
        return None. '''
Matt Traudt's avatar
Matt Traudt committed
212
        assert 'version' in d
213
        if d['version'] != RESULT_VERSION:
214
            return None
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
        assert 'type' in d
        if d['type'] == _ResultType.Success.value:
            return ResultSuccess.from_dict(d)
        elif d['type'] == _ResultType.Error.value:
            return ResultError.from_dict(d)
        elif d['type'] == _ResultType.ErrorCircuit.value:
            return ResultErrorCircuit.from_dict(d)
        elif d['type'] == _ResultType.ErrorStream.value:
            return ResultErrorStream.from_dict(d)
        elif d['type'] == _ResultType.ErrorAuth.value:
            return ResultErrorAuth.from_dict(d)
        else:
            raise NotImplementedError(
                'Unknown result type {}'.format(d['type']))

    def __str__(self):
        return json.dumps(self.to_dict())


class ResultError(Result):
    def __init__(self, *a, msg=None, **kw):
        super().__init__(*a, **kw)
        self._msg = msg

Matt Traudt's avatar
Matt Traudt committed
239
    @property
240
241
    def type(self):
        return _ResultType.Error
Matt Traudt's avatar
Matt Traudt committed
242

243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
    @property
    def freshness_reduction_factor(self):
        '''
        When the RelayPrioritizer encounters this Result, how much should it
        adjust its freshness? (See RelayPrioritizer.best_priority() for more
        information about "freshness")

        A higher factor makes the freshness lower (making the Result seem
        older). A lower freshness leads to the relay having better priority,
        and better priority means it will be measured again sooner.

        The value 0.5 was chosen somewhat arbitrarily, but a few weeks of live
        network testing verifies that sbws is still able to perform useful
        measurements in a reasonable amount of time.
        '''
        return 0.5

Matt Traudt's avatar
Matt Traudt committed
260
    @property
261
262
    def msg(self):
        return self._msg
Matt Traudt's avatar
Matt Traudt committed
263

264
265
266
    @staticmethod
    def from_dict(d):
        assert isinstance(d, dict)
267
        return ResultError(
268
            Result.Relay(d['fingerprint'], d['nickname'], d['address']),
269
270
            d['circ'], d['server_host'], d['scanner'],
            msg=d['msg'], t=d['time'])
271

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
    def to_dict(self):
        d = super().to_dict()
        d.update({
            'msg': self.msg,
        })
        return d


class ResultErrorCircuit(ResultError):
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)

    @property
    def type(self):
        return _ResultType.ErrorCircuit

288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
    @property
    def freshness_reduction_factor(self):
        '''
        There are a few instances when it isn't the relay's fault that the
        circuit failed to get built. Maybe someday we'll try detecting whose
        fault it most likely was and subclassing ResultErrorCircuit. But for
        now we don't. So reduce the freshness slightly more than ResultError
        does by default so priority isn't hurt quite as much.

        A (hopefully very very rare) example of when a circuit would fail to
        get built is when the sbws client machine suddenly loses Internet
        access.
        '''
        return 0.6

303
304
305
306
307
    @staticmethod
    def from_dict(d):
        assert isinstance(d, dict)
        return ResultErrorCircuit(
            Result.Relay(d['fingerprint'], d['nickname'], d['address']),
308
309
            d['circ'], d['server_host'], d['scanner'],
            msg=d['msg'], t=d['time'])
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328

    def to_dict(self):
        d = super().to_dict()
        return d


class ResultErrorStream(ResultError):
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)

    @property
    def type(self):
        return _ResultType.ErrorStream

    @staticmethod
    def from_dict(d):
        assert isinstance(d, dict)
        return ResultErrorStream(
            Result.Relay(d['fingerprint'], d['nickname'], d['address']),
329
330
            d['circ'], d['server_host'], d['scanner'],
            msg=d['msg'], t=d['time'])
331
332
333
334
335
336
337
338
339
340
341
342
343
344

    def to_dict(self):
        d = super().to_dict()
        return d


class ResultErrorAuth(ResultError):
    def __init__(self, *a, **kw):
        super().__init__(*a, **kw)

    @property
    def type(self):
        return _ResultType.ErrorAuth

345
346
347
348
349
350
351
352
353
354
355
356
357
    @property
    def freshness_reduction_factor(self):
        '''
        Override the default ResultError.freshness_reduction_factor because a
        ResultErrorAuth is most likely not the measured relay's fault, so we
        shouldn't hurt its priority as much. A higher reduction factor means a
        Result's effective freshness is reduced more, which makes the relay's
        priority better.

        The value 0.9 was chosen somewhat arbitrarily.
        '''
        return 0.9

358
359
360
361
362
    @staticmethod
    def from_dict(d):
        assert isinstance(d, dict)
        return ResultErrorAuth(
            Result.Relay(d['fingerprint'], d['nickname'], d['address']),
363
364
            d['circ'], d['server_host'], d['scanner'],
            msg=d['msg'], t=d['time'])
365
366
367
368
369
370
371
372

    def to_dict(self):
        d = super().to_dict()
        return d


class ResultSuccess(Result):
    def __init__(self, rtts, downloads, *a, **kw):
373
        super().__init__(*a, **kw)
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
        self._rtts = rtts
        self._downloads = downloads

    @property
    def type(self):
        return _ResultType.Success

    @property
    def rtts(self):
        return self._rtts

    @property
    def downloads(self):
        return self._downloads

    @staticmethod
    def from_dict(d):
        assert isinstance(d, dict)
        return ResultSuccess(
            d['rtts'], d['downloads'],
            Result.Relay(d['fingerprint'], d['nickname'], d['address']),
395
396
            d['circ'], d['server_host'], d['scanner'],
            t=d['time'])
397
398
399
400

    def to_dict(self):
        d = super().to_dict()
        d.update({
Matt Traudt's avatar
Matt Traudt committed
401
            'rtts': self.rtts,
402
            'downloads': self.downloads,
403
        })
404
        return d
Matt Traudt's avatar
Matt Traudt committed
405
406
407


class ResultDump:
Matt Traudt's avatar
Matt Traudt committed
408
409
    ''' Runs the enter() method in a new thread and collects new Results on its
    queue. Writes them to daily result files in the data directory '''
410
    def __init__(self, args, conf, end_event):
411
        assert os.path.isdir(conf['paths']['datadir'])
Matt Traudt's avatar
Matt Traudt committed
412
        assert isinstance(end_event, Event)
413
        self.conf = conf
414
415
        self.fresh_days = conf.getint('general', 'data_period')
        self.datadir = conf['paths']['datadir']
Matt Traudt's avatar
Matt Traudt committed
416
        self.end_event = end_event
417
418
        self.data = None
        self.data_lock = RLock()
Matt Traudt's avatar
Matt Traudt committed
419
420
421
422
        self.thread = Thread(target=self.enter)
        self.queue = Queue()
        self.thread.start()

423
    def store_result(self, result):
424
        ''' Call from ResultDump thread '''
425
        assert isinstance(result, Result)
426
427
        with self.data_lock:
            self.data.append(result)
428
            self.data = trim_results(self.fresh_days, self.data)
429

Matt Traudt's avatar
Matt Traudt committed
430
    def enter(self):
431
432
        ''' Main loop for the ResultDump thread '''
        with self.data_lock:
433
            self.data = load_recent_results_in_datadir(
434
                self.fresh_days, self.datadir)
Matt Traudt's avatar
Matt Traudt committed
435
        while not (self.end_event.is_set() and self.queue.empty()):
Matt Traudt's avatar
Matt Traudt committed
436
437
438
439
            try:
                event = self.queue.get(timeout=1)
            except Empty:
                continue
Matt Traudt's avatar
Matt Traudt committed
440
            result = event
441
            if result is None:
442
                log.debug('Got None in ResultDump')
443
                continue
Matt Traudt's avatar
Matt Traudt committed
444
            elif not isinstance(result, Result):
445
                log.warning('The only thing we should ever receive in the '
446
                            'result thread is a Result type. Ignoring %s',
447
                            type(result))
448
                continue
Matt Traudt's avatar
Matt Traudt committed
449
450
            fp = result.fingerprint
            nick = result.nickname
451
            self.store_result(result)
452
            write_result_to_datadir(result, self.datadir)
Matt Traudt's avatar
Matt Traudt committed
453
            log.debug('%s %s finished measurement', fp, nick)
454
455
456
457
458

    def results_for_relay(self, relay):
        assert isinstance(relay, RouterStatusEntryV3)
        with self.data_lock:
            return [r for r in self.data if r.fingerprint == relay.fingerprint]