scanner.py 17.3 KB
Newer Older
1
''' Measure the relays. '''
2

3
4
5
import sys
import threading

Matt Traudt's avatar
Matt Traudt committed
6
7
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
Matt Traudt's avatar
Matt Traudt committed
8
9
from ..lib.resultdump import ResultSuccess, ResultErrorCircuit
from ..lib.resultdump import ResultErrorStream
Matt Traudt's avatar
Matt Traudt committed
10
from ..lib.relaylist import RelayList
11
from ..lib.relayprioritizer import RelayPrioritizer
12
from ..lib.destination import DestinationList
juga's avatar
juga committed
13
from ..util.timestamp import now_isodt_str
14
from ..util.state import State
juga's avatar
juga committed
15
from sbws.globals import fail_hard
Matt Traudt's avatar
Matt Traudt committed
16
import sbws.util.stem as stem_utils
17
import sbws.util.requests as requests_utils
Matt Traudt's avatar
Matt Traudt committed
18
19
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
Matt Traudt's avatar
Matt Traudt committed
20
from threading import Event
Matt Traudt's avatar
Matt Traudt committed
21
import time
Matt Traudt's avatar
Matt Traudt committed
22
import os
23
import logging
24
import requests
25
import random
Matt Traudt's avatar
Matt Traudt committed
26

27

28
rng = random.SystemRandom()
Matt Traudt's avatar
Matt Traudt committed
29
end_event = Event()
30
log = logging.getLogger(__name__)
Matt Traudt's avatar
Matt Traudt committed
31

32

33
34
35
36
37
38
39
40
41
42
43
44
45
46
def dumpstacks():
    import pdb
    import traceback
    log.warning("sbws stop measuring relays, probably because of a bug."
                "Please, open a ticket in trac.torproject.org with this"
                "backtrace.")
    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
    for thread_id, stack in sys._current_frames().items():
        log.critical("Thread: %s(%d)",
                     thread_id2name.get(thread_id, ""), thread_id)
        log.critical(traceback.print_stack(stack))
    pdb.set_trace()


47
48
49
def timed_recv_from_server(session, dest, byte_range):
    ''' Request the **byte_range** from the URL at **dest**. If successful,
    return True and the time it took to download. Otherwise return False and an
50
    exception. '''
51
    headers = {'Range': byte_range, 'Accept-Encoding': 'identity'}
52
    start_time = time.time()
53
54
55
    # TODO:
    # - What other exceptions can this throw?
    # - Do we have to read the content, or did requests already do so?
56
    try:
57
58
        requests_utils.get(
            session, dest.url, headers=headers, verify=dest.verify)
59
60
    except requests.exceptions.ConnectionError as e:
        return False, e
Matt Traudt's avatar
Matt Traudt committed
61
62
    except requests.exceptions.ReadTimeout as e:
        return False, e
63
    end_time = time.time()
64
    return True, end_time - start_time
65
66


67
68
69
70
71
72
73
74
75
76
77
78
def get_random_range_string(content_length, size):
    '''
    Return a random range of bytes of length **size**. **content_length** is
    the size of the file we will be requesting a range of bytes from.

    For example, for content_length of 100 and size 10, this function will
    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
    '''
    assert size <= content_length
    # start can be anywhere in the content_length as long as it is **size**
    # bytes away from the end or more. Because range is [start, end) (doesn't
    # include the end value), add 1 to the end.
79
    start = rng.choice(range(0, content_length - size + 1))
80
81
82
83
84
85
86
87
88
    # Unlike range, the byte range in an http header is [start, end] (does
    # include the end value), so we subtract one
    end = start + size - 1
    # start and end are indexes, while content_length is a length, therefore,
    # the largest index end should ever be should be less than the total length
    # of the content. For example, if content_length is 10, end could be
    # anywhere from 0 to 9.
    assert end < content_length
    return 'bytes={}-{}'.format(start, end)
89
90


91
92
93
94
95
def measure_rtt_to_server(session, conf, dest, content_length):
    ''' Make multiple end-to-end RTT measurements by making small HTTP requests
    over a circuit + stream that should already exist, persist, and not need
    rebuilding. If something goes wrong and not all of the RTT measurements can
    be made, return None. Otherwise return a list of the RTTs (in seconds). '''
Matt Traudt's avatar
Matt Traudt committed
96
    rtts = []
97
98
    size = conf.getint('scanner', 'min_download_size')
    log.debug('Measuring RTT to %s', dest.url)
Matt Traudt's avatar
Matt Traudt committed
99
    for _ in range(0, conf.getint('scanner', 'num_rtts')):
100
        random_range = get_random_range_string(content_length, size)
101
102
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
103
            # data is an exception
104
105
106
            log.warning('While measuring the RTT to %s we hit an exception '
                        '(does the webserver support Range requests?): %s',
                        dest.url, data)
107
            return None
108
109
110
111
        assert success
        # data is an RTT
        assert isinstance(data, float) or isinstance(data, int)
        rtts.append(data)
Matt Traudt's avatar
Matt Traudt committed
112
113
114
    return rtts


Matt Traudt's avatar
Matt Traudt committed
115
def measure_bandwidth_to_server(session, conf, dest, content_length):
116
    results = []
Matt Traudt's avatar
Matt Traudt committed
117
    num_downloads = conf.getint('scanner', 'num_downloads')
Matt Traudt's avatar
Matt Traudt committed
118
119
120
    expected_amount = conf.getint('scanner', 'initial_read_request')
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
121
    download_times = {
Matt Traudt's avatar
Matt Traudt committed
122
123
124
125
        'toofast': conf.getfloat('scanner', 'download_toofast'),
        'min': conf.getfloat('scanner', 'download_min'),
        'target': conf.getfloat('scanner', 'download_target'),
        'max': conf.getfloat('scanner', 'download_max'),
126
127
    }
    while len(results) < num_downloads:
Matt Traudt's avatar
Matt Traudt committed
128
129
130
131
132
        assert expected_amount >= min_dl
        assert expected_amount <= max_dl
        random_range = get_random_range_string(content_length, expected_amount)
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
133
            # data is an exception
Matt Traudt's avatar
Matt Traudt committed
134
135
136
137
138
139
140
            log.warning('While measuring the bandwidth to %s we hit an '
                        'exception (does the webserver support Range '
                        'requests?): %s', dest.url, data)
            return None
        assert success
        # data is a download time
        assert isinstance(data, float) or isinstance(data, int)
141
        if _should_keep_result(
Matt Traudt's avatar
Matt Traudt committed
142
                expected_amount == max_dl, data, download_times):
143
            results.append({
Matt Traudt's avatar
Matt Traudt committed
144
                'duration': data, 'amount': expected_amount})
145
        expected_amount = _next_expected_amount(
146
            expected_amount, data, download_times, min_dl, max_dl)
Matt Traudt's avatar
Matt Traudt committed
147
148
149
    return results


150
151
152
153
154
155
def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
    '''
    Sbws builds two hop circuits. Given the **relay** to measure with
    destination **dest**, pick a second relay that is or is not an exit
    according to **is_exit**.
    '''
156
157
    candidates = rl.exits_not_bad_allowing_port(dest.port) if is_exit \
        else rl.non_exits
158
159
160
161
162
    if not len(candidates):
        return None
    log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
              relay.nickname, len(candidates), is_exit)
    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
juga's avatar
juga committed
163
        min_bw = relay.consensus_bandwidth * min_bw_factor
164
165
166
167
168
169
170
171
        new_candidates = stem_utils.only_relays_with_bandwidth(
            cont, candidates, min_bw=min_bw)
        if len(new_candidates) > 0:
            chosen = rng.choice(new_candidates)
            log.debug(
                'Found %d candidate 2nd hops with at least %sx the bandwidth '
                'of %s. Returning %s (bw=%s).',
                len(new_candidates), min_bw_factor, relay.nickname,
juga's avatar
juga committed
172
                chosen.nickname, chosen.consensus_bandwidth)
173
174
175
176
177
178
            return chosen
    candidates = sorted(candidates, key=lambda r: r.bandwidth, reverse=True)
    chosen = candidates[0]
    log.debug(
        'Didn\'t find any 2nd hops at least as fast as %s (bw=%s). It\'s '
        'probably really fast. Returning %s (bw=%s), the fastest '
juga's avatar
juga committed
179
180
        'candidate we have.', relay.nickname, relay.consensus_bandwidth,
        chosen.nickname, chosen.consensus_bandwidth)
181
182
183
    return chosen


184
def measure_relay(args, conf, destinations, cb, rl, relay):
Matt Traudt's avatar
Matt Traudt committed
185
186
    s = requests_utils.make_session(
        cb.controller, conf.getfloat('general', 'http_timeout'))
187
    # Pick a destionation
188
189
190
191
192
    dest = destinations.next()
    if not dest:
        log.warning('Unable to get destination to measure %s %s',
                    relay.nickname, relay.fingerprint[0:8])
        return None
193
194
195
196
    # Pick a relay to help us measure the given relay. If the given relay is an
    # exit, then pick a non-exit. Otherwise pick an exit.
    helper = None
    circ_fps = None
197
    if relay.is_exit_not_bad_allowing_port(dest.port):
198
199
200
201
202
203
204
205
206
207
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=False)
        if helper:
            circ_fps = [helper.fingerprint, relay.fingerprint]
    else:
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=True)
        if helper:
            circ_fps = [relay.fingerprint, helper.fingerprint]
    if not helper:
208
        # TODO: Return ResultError of some sort
209
210
        log.warning('Unable to pick a 2nd hop to help measure %s %s',
                    relay.nickname, relay.fingerprint[0:8])
211
        return None
212
213
    assert helper
    assert circ_fps is not None and len(circ_fps) == 2
214
    # Build the circuit
Matt Traudt's avatar
Matt Traudt committed
215
216
    our_nick = conf['scanner']['nickname']
    circ_id = cb.build_circuit(circ_fps)
217
    if not circ_id:
218
        log.warning('Could not build circuit involving %s', relay.nickname)
Matt Traudt's avatar
Matt Traudt committed
219
220
221
222
        msg = 'Unable to complete circuit'
        return [
            ResultErrorCircuit(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
223
224
    log.debug('Built circ %s %s for relay %s %s', circ_id,
              stem_utils.circuit_str(cb.controller, circ_id), relay.nickname,
225
              relay.fingerprint[0:8])
226
227
    # Make a connection to the destionation webserver and make sure it can
    # still help us measure
228
229
230
231
232
    is_usable, usable_data = dest.is_usable(circ_id, s, cb.controller)
    if not is_usable:
        log.warning('When measuring %s %s the destination seemed to have '
                    'stopped being usable: %s', relay.nickname,
                    relay.fingerprint[0:8], usable_data)
233
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
234
235
236
237
238
        # TODO: Return a different/new type of ResultError?
        msg = 'The destination seemed to have stopped being usable'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
239
240
    assert is_usable
    assert 'content_length' in usable_data
241
    # FIRST: measure RTT
242
    rtts = measure_rtt_to_server(s, conf, dest, usable_data['content_length'])
243
244
245
246
    if rtts is None:
        log.warning('Unable to measure RTT to %s via relay %s %s',
                    dest.url, relay.nickname, relay.fingerprint[0:8])
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
247
248
249
250
251
        # TODO: Return a different/new type of ResultError?
        msg = 'Something bad happened while measuring RTTs'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
252
    # SECOND: measure bandwidth
Matt Traudt's avatar
Matt Traudt committed
253
    bw_results = measure_bandwidth_to_server(
254
        s, conf, dest, usable_data['content_length'])
Matt Traudt's avatar
Matt Traudt committed
255
256
257
258
259
260
261
262
263
    if bw_results is None:
        log.warning('Unable to measure bandwidth to %s via relay %s %s',
                    dest.url, relay.nickname, relay.fingerprint[0:8])
        cb.close_circuit(circ_id)
        # TODO: Return a different/new type of ResultError?
        msg = 'Something bad happened while measuring bandwidth'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
Matt Traudt's avatar
Matt Traudt committed
264
    cb.close_circuit(circ_id)
265
    # Finally: store result
266
267
268
    return [
        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
    ]
Matt Traudt's avatar
Matt Traudt committed
269

270

271
272
273
274
def dispatch_worker_thread(*a, **kw):
    try:
        return measure_relay(*a, **kw)
    except Exception as err:
275
        log.exception('Unhandled exception in worker thread')
276
277
278
        raise err


279
280
281
282
283
284
285
286
287
288
289
290
291
292
def _should_keep_result(did_request_maximum, result_time, download_times):
    # In the normal case, we didn't ask for the maximum allowed amount. So we
    # should only allow ourselves to keep results that are between the min and
    # max allowed time
    if not did_request_maximum and \
            result_time >= download_times['min'] and \
            result_time < download_times['max']:
        return True
    # If we did request the maximum amount, we should keep the result as long
    # as it took less than the maximum amount of time
    if did_request_maximum and \
            result_time < download_times['max']:
        return True
    # In all other cases, return false
293
    log.debug('Not keeping result time %f.%s', result_time,
Matt Traudt's avatar
Matt Traudt committed
294
295
              '' if not did_request_maximum else ' We requested the maximum '
              'amount allowed.')
296
297
298
    return False


299
300
def _next_expected_amount(expected_amount, result_time, download_times,
                          min_dl, max_dl):
301
302
303
304
305
306
307
308
309
310
    if result_time < download_times['toofast']:
        # Way too fast, greatly increase the amount we ask for
        expected_amount = int(expected_amount * 5)
    elif result_time < download_times['min'] or \
            result_time >= download_times['max']:
        # As long as the result is between min/max, keep the expected amount
        # the same. Otherwise, adjust so we are aiming for the target amount.
        expected_amount = int(
            expected_amount * download_times['target'] / result_time)
    # Make sure we don't request too much or too little
311
312
    expected_amount = max(min_dl, expected_amount)
    expected_amount = min(max_dl, expected_amount)
313
314
315
    return expected_amount


Matt Traudt's avatar
Matt Traudt committed
316
def result_putter(result_dump):
Matt Traudt's avatar
Matt Traudt committed
317
318
    ''' Create a function that takes a single argument -- the measurement
    result -- and return that function so it can be used by someone else '''
319
    def closure(measurement_result):
Matt Traudt's avatar
Matt Traudt committed
320
        return result_dump.queue.put(measurement_result)
Matt Traudt's avatar
Matt Traudt committed
321
322
    return closure

Matt Traudt's avatar
Matt Traudt committed
323

324
def result_putter_error(target):
Matt Traudt's avatar
Matt Traudt committed
325
326
327
    ''' Create a function that takes a single argument -- an error from a
    measurement -- and return that function so it can be used by someone else
    '''
328
    def closure(err):
329
330
        log.error('Unhandled exception caught while measuring %s: %s %s',
                  target.nickname, type(err), err)
331
    return closure
332
333


334
def run_speedtest(args, conf):
335
    controller, _ = stem_utils.init_controller(
336
        path=conf.getpath('tor', 'control_socket'))
337
338
339
340
341
342
343
344
345
346
347
    if not controller:
        controller = stem_utils.launch_tor(conf)
    else:
        log.warning(
            'Is sbws already running? '
            'We found an existing Tor process at %s. We are not going to '
            'launch Tor, nor are we going to try to configure it to behave '
            'like we expect. This might work okay, but it also might not. '
            'If you experience problems, you should try letting sbws launch '
            'Tor for itself. The ability to use an already running Tor only '
            'exists for sbws developers. It is expected to be broken and may '
348
349
            'even lead to messed up results.',
            conf.getpath('tor', 'control_socket'))
350
        time.sleep(15)
Matt Traudt's avatar
Matt Traudt committed
351
    rl = RelayList(args, conf, controller)
352
    cb = CB(args, conf, controller, rl)
353
354
    rd = ResultDump(args, conf, end_event)
    rp = RelayPrioritizer(args, conf, rl, rd)
355
356
    destinations, error_msg = DestinationList.from_config(
        conf, cb, rl, controller)
357
    if not destinations:
358
        fail_hard(error_msg)
Matt Traudt's avatar
Matt Traudt committed
359
    max_pending_results = conf.getint('scanner', 'measurement_threads')
Matt Traudt's avatar
Matt Traudt committed
360
361
    pool = Pool(max_pending_results)
    pending_results = []
362
    while True:
363
364
        num_relays = 0
        loop_tstart = time.time()
365
        for target in rp.best_priority():
366
            num_relays += 1
367
368
            log.debug('Measuring %s %s', target.nickname,
                      target.fingerprint[0:8])
369
370
371
            callback = result_putter(rd)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
372
373
                dispatch_worker_thread,
                [args, conf, destinations, cb, rl, target],
374
                {}, callback, callback_err)
375
376
377
378
            pending_results.append(async_result)
            while len(pending_results) >= max_pending_results:
                time.sleep(5)
                pending_results = [r for r in pending_results if not r.ready()]
379
380
381
382
383
384
        counter = 0
        # give it 3min, otherwise there's a bug or deadlock
        while len(pending_results) > 0 and counter <= 36:
            counter += 1
            log.debug("Number of pending measurement threads %s after "
                      "a prioritization loop.", len(pending_results))
385
386
            time.sleep(5)
            pending_results = [r for r in pending_results if not r.ready()]
387
388
        if counter > 36:
            dumpstacks()
389
390
391
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
        log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
Matt Traudt's avatar
Matt Traudt committed
392
393


Matt Traudt's avatar
Matt Traudt committed
394
def gen_parser(sub):
Matt Traudt's avatar
Matt Traudt committed
395
    d = 'The scanner side of sbws. This should be run on a well-connected '\
396
397
398
        'machine on the Internet with a healthy amount of spare bandwidth. '\
        'This continuously builds circuits, measures relays, and dumps '\
        'results into a datadir, commonly found in ~/.sbws'
Matt Traudt's avatar
Matt Traudt committed
399
    sub.add_parser('scanner', formatter_class=ArgumentDefaultsHelpFormatter,
400
                   description=d)
Matt Traudt's avatar
Matt Traudt committed
401
402


403
def main(args, conf):
Matt Traudt's avatar
Matt Traudt committed
404
    if conf.getint('scanner', 'measurement_threads') < 1:
405
        fail_hard('Number of measurement threads must be larger than 1')
406

407
408
409
410
411
412
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
    if max_dl < min_dl:
        fail_hard('Max download size %d cannot be smaller than min %d',
                  max_dl, min_dl)

413
    os.makedirs(conf.getpath('paths', 'datadir'), exist_ok=True)
414

415
    state = State(conf.getpath('paths', 'state_fname'))
416
417
    state['scanner_started'] = now_isodt_str()

Matt Traudt's avatar
Matt Traudt committed
418
    try:
419
        run_speedtest(args, conf)
420
421
422
    except KeyboardInterrupt as e:
        raise e
    finally:
Matt Traudt's avatar
Matt Traudt committed
423
        end_event.set()