scanner.py 16.1 KB
Newer Older
1
''' Measure the relays. '''
2

Matt Traudt's avatar
Matt Traudt committed
3
4
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
Matt Traudt's avatar
Matt Traudt committed
5
6
from ..lib.resultdump import ResultSuccess, ResultErrorCircuit
from ..lib.resultdump import ResultErrorStream
Matt Traudt's avatar
Matt Traudt committed
7
from ..lib.relaylist import RelayList
8
from ..lib.relayprioritizer import RelayPrioritizer
9
from ..lib.destination import DestinationList
juga's avatar
juga committed
10
from ..util.timestamp import now_isodt_str
11
from ..util.state import State
12
from sbws.globals import (fail_hard, is_initted)
Matt Traudt's avatar
Matt Traudt committed
13
import sbws.util.stem as stem_utils
14
import sbws.util.requests as requests_utils
Matt Traudt's avatar
Matt Traudt committed
15
16
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
Matt Traudt's avatar
Matt Traudt committed
17
from threading import Event
Matt Traudt's avatar
Matt Traudt committed
18
import time
Matt Traudt's avatar
Matt Traudt committed
19
import os
20
import logging
21
import requests
22
import random
Matt Traudt's avatar
Matt Traudt committed
23

24
rng = random.SystemRandom()
Matt Traudt's avatar
Matt Traudt committed
25
end_event = Event()
26
log = logging.getLogger(__name__)
Matt Traudt's avatar
Matt Traudt committed
27

28

29
30
31
def timed_recv_from_server(session, dest, byte_range):
    ''' Request the **byte_range** from the URL at **dest**. If successful,
    return True and the time it took to download. Otherwise return False and an
32
    exception. '''
33
    headers = {'Range': byte_range, 'Accept-Encoding': 'identity'}
34
    start_time = time.time()
35
36
37
    # TODO:
    # - What other exceptions can this throw?
    # - Do we have to read the content, or did requests already do so?
38
    try:
39
40
        requests_utils.get(
            session, dest.url, headers=headers, verify=dest.verify)
41
42
    except requests.exceptions.ConnectionError as e:
        return False, e
Matt Traudt's avatar
Matt Traudt committed
43
44
    except requests.exceptions.ReadTimeout as e:
        return False, e
45
    end_time = time.time()
46
    return True, end_time - start_time
47
48


49
50
51
52
53
54
55
56
57
58
59
60
def get_random_range_string(content_length, size):
    '''
    Return a random range of bytes of length **size**. **content_length** is
    the size of the file we will be requesting a range of bytes from.

    For example, for content_length of 100 and size 10, this function will
    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
    '''
    assert size <= content_length
    # start can be anywhere in the content_length as long as it is **size**
    # bytes away from the end or more. Because range is [start, end) (doesn't
    # include the end value), add 1 to the end.
61
    start = rng.choice(range(0, content_length - size + 1))
62
63
64
65
66
67
68
69
70
    # Unlike range, the byte range in an http header is [start, end] (does
    # include the end value), so we subtract one
    end = start + size - 1
    # start and end are indexes, while content_length is a length, therefore,
    # the largest index end should ever be should be less than the total length
    # of the content. For example, if content_length is 10, end could be
    # anywhere from 0 to 9.
    assert end < content_length
    return 'bytes={}-{}'.format(start, end)
71
72


73
74
75
76
77
def measure_rtt_to_server(session, conf, dest, content_length):
    ''' Make multiple end-to-end RTT measurements by making small HTTP requests
    over a circuit + stream that should already exist, persist, and not need
    rebuilding. If something goes wrong and not all of the RTT measurements can
    be made, return None. Otherwise return a list of the RTTs (in seconds). '''
Matt Traudt's avatar
Matt Traudt committed
78
    rtts = []
79
80
    size = conf.getint('scanner', 'min_download_size')
    log.debug('Measuring RTT to %s', dest.url)
Matt Traudt's avatar
Matt Traudt committed
81
    for _ in range(0, conf.getint('scanner', 'num_rtts')):
82
        random_range = get_random_range_string(content_length, size)
83
84
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
85
            # data is an exception
86
87
88
            log.warning('While measuring the RTT to %s we hit an exception '
                        '(does the webserver support Range requests?): %s',
                        dest.url, data)
89
            return None
90
91
92
93
        assert success
        # data is an RTT
        assert isinstance(data, float) or isinstance(data, int)
        rtts.append(data)
Matt Traudt's avatar
Matt Traudt committed
94
95
96
    return rtts


Matt Traudt's avatar
Matt Traudt committed
97
def measure_bandwidth_to_server(session, conf, dest, content_length):
98
    results = []
Matt Traudt's avatar
Matt Traudt committed
99
    num_downloads = conf.getint('scanner', 'num_downloads')
Matt Traudt's avatar
Matt Traudt committed
100
101
102
    expected_amount = conf.getint('scanner', 'initial_read_request')
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
103
    download_times = {
Matt Traudt's avatar
Matt Traudt committed
104
105
106
107
        'toofast': conf.getfloat('scanner', 'download_toofast'),
        'min': conf.getfloat('scanner', 'download_min'),
        'target': conf.getfloat('scanner', 'download_target'),
        'max': conf.getfloat('scanner', 'download_max'),
108
109
    }
    while len(results) < num_downloads:
Matt Traudt's avatar
Matt Traudt committed
110
111
112
113
114
        assert expected_amount >= min_dl
        assert expected_amount <= max_dl
        random_range = get_random_range_string(content_length, expected_amount)
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
115
            # data is an exception
Matt Traudt's avatar
Matt Traudt committed
116
117
118
119
120
121
122
            log.warning('While measuring the bandwidth to %s we hit an '
                        'exception (does the webserver support Range '
                        'requests?): %s', dest.url, data)
            return None
        assert success
        # data is a download time
        assert isinstance(data, float) or isinstance(data, int)
123
        if _should_keep_result(
Matt Traudt's avatar
Matt Traudt committed
124
                expected_amount == max_dl, data, download_times):
125
            results.append({
Matt Traudt's avatar
Matt Traudt committed
126
                'duration': data, 'amount': expected_amount})
127
        expected_amount = _next_expected_amount(
128
            expected_amount, data, download_times, min_dl, max_dl)
Matt Traudt's avatar
Matt Traudt committed
129
130
131
    return results


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
    '''
    Sbws builds two hop circuits. Given the **relay** to measure with
    destination **dest**, pick a second relay that is or is not an exit
    according to **is_exit**.
    '''
    candidates = []
    candidates.extend(rl.exits if is_exit else rl.non_exits)
    if not len(candidates):
        return None
    log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
              relay.nickname, len(candidates), is_exit)
    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
        min_bw = relay.bandwidth * min_bw_factor
        new_candidates = stem_utils.only_relays_with_bandwidth(
            cont, candidates, min_bw=min_bw)
        if len(new_candidates) > 0:
            chosen = rng.choice(new_candidates)
            log.debug(
                'Found %d candidate 2nd hops with at least %sx the bandwidth '
                'of %s. Returning %s (bw=%s).',
                len(new_candidates), min_bw_factor, relay.nickname,
                chosen.nickname, chosen.bandwidth)
            return chosen
    candidates = sorted(candidates, key=lambda r: r.bandwidth, reverse=True)
    chosen = candidates[0]
    log.debug(
        'Didn\'t find any 2nd hops at least as fast as %s (bw=%s). It\'s '
        'probably really fast. Returning %s (bw=%s), the fastest '
        'candidate we have.', relay.nickname, relay.bandwidth,
        chosen.nickname, chosen.bandwidth)
    return chosen


166
def measure_relay(args, conf, destinations, cb, rl, relay):
Matt Traudt's avatar
Matt Traudt committed
167
168
    s = requests_utils.make_session(
        cb.controller, conf.getfloat('general', 'http_timeout'))
169
    # Pick a destionation
170
171
172
173
174
    dest = destinations.next()
    if not dest:
        log.warning('Unable to get destination to measure %s %s',
                    relay.nickname, relay.fingerprint[0:8])
        return None
175
176
177
178
    # Pick a relay to help us measure the given relay. If the given relay is an
    # exit, then pick a non-exit. Otherwise pick an exit.
    helper = None
    circ_fps = None
179
180
    if relay.can_exit_to(dest.hostname, dest.port) and \
            relay not in rl.bad_exits:
181
182
183
184
185
186
187
188
189
190
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=False)
        if helper:
            circ_fps = [helper.fingerprint, relay.fingerprint]
    else:
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=True)
        if helper:
            circ_fps = [relay.fingerprint, helper.fingerprint]
    if not helper:
191
        # TODO: Return ResultError of some sort
192
193
        log.warning('Unable to pick a 2nd hop to help measure %s %s',
                    relay.nickname, relay.fingerprint[0:8])
194
        return None
195
196
    assert helper
    assert circ_fps is not None and len(circ_fps) == 2
197
    # Build the circuit
Matt Traudt's avatar
Matt Traudt committed
198
199
    our_nick = conf['scanner']['nickname']
    circ_id = cb.build_circuit(circ_fps)
200
    if not circ_id:
201
        log.warning('Could not build circuit involving %s', relay.nickname)
Matt Traudt's avatar
Matt Traudt committed
202
203
204
205
        msg = 'Unable to complete circuit'
        return [
            ResultErrorCircuit(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
206
207
    log.debug('Built circ %s %s for relay %s %s', circ_id,
              stem_utils.circuit_str(cb.controller, circ_id), relay.nickname,
208
              relay.fingerprint[0:8])
209
210
    # Make a connection to the destionation webserver and make sure it can
    # still help us measure
211
212
213
214
215
    is_usable, usable_data = dest.is_usable(circ_id, s, cb.controller)
    if not is_usable:
        log.warning('When measuring %s %s the destination seemed to have '
                    'stopped being usable: %s', relay.nickname,
                    relay.fingerprint[0:8], usable_data)
216
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
217
218
219
220
221
        # TODO: Return a different/new type of ResultError?
        msg = 'The destination seemed to have stopped being usable'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
222
223
    assert is_usable
    assert 'content_length' in usable_data
224
    # FIRST: measure RTT
225
    rtts = measure_rtt_to_server(s, conf, dest, usable_data['content_length'])
226
227
228
229
    if rtts is None:
        log.warning('Unable to measure RTT to %s via relay %s %s',
                    dest.url, relay.nickname, relay.fingerprint[0:8])
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
230
231
232
233
234
        # TODO: Return a different/new type of ResultError?
        msg = 'Something bad happened while measuring RTTs'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
235
    # SECOND: measure bandwidth
Matt Traudt's avatar
Matt Traudt committed
236
    bw_results = measure_bandwidth_to_server(
237
        s, conf, dest, usable_data['content_length'])
Matt Traudt's avatar
Matt Traudt committed
238
239
240
241
242
243
244
245
246
    if bw_results is None:
        log.warning('Unable to measure bandwidth to %s via relay %s %s',
                    dest.url, relay.nickname, relay.fingerprint[0:8])
        cb.close_circuit(circ_id)
        # TODO: Return a different/new type of ResultError?
        msg = 'Something bad happened while measuring bandwidth'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
Matt Traudt's avatar
Matt Traudt committed
247
    cb.close_circuit(circ_id)
248
    # Finally: store result
249
250
251
    return [
        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
    ]
Matt Traudt's avatar
Matt Traudt committed
252

253

254
255
256
257
def dispatch_worker_thread(*a, **kw):
    try:
        return measure_relay(*a, **kw)
    except Exception as err:
258
        log.exception('Unhandled exception in worker thread')
259
260
261
        raise err


262
263
264
265
266
267
268
269
270
271
272
273
274
275
def _should_keep_result(did_request_maximum, result_time, download_times):
    # In the normal case, we didn't ask for the maximum allowed amount. So we
    # should only allow ourselves to keep results that are between the min and
    # max allowed time
    if not did_request_maximum and \
            result_time >= download_times['min'] and \
            result_time < download_times['max']:
        return True
    # If we did request the maximum amount, we should keep the result as long
    # as it took less than the maximum amount of time
    if did_request_maximum and \
            result_time < download_times['max']:
        return True
    # In all other cases, return false
276
    log.debug('Not keeping result time %f.%s', result_time,
Matt Traudt's avatar
Matt Traudt committed
277
278
              '' if not did_request_maximum else ' We requested the maximum '
              'amount allowed.')
279
280
281
    return False


282
283
def _next_expected_amount(expected_amount, result_time, download_times,
                          min_dl, max_dl):
284
285
286
287
288
289
290
291
292
293
    if result_time < download_times['toofast']:
        # Way too fast, greatly increase the amount we ask for
        expected_amount = int(expected_amount * 5)
    elif result_time < download_times['min'] or \
            result_time >= download_times['max']:
        # As long as the result is between min/max, keep the expected amount
        # the same. Otherwise, adjust so we are aiming for the target amount.
        expected_amount = int(
            expected_amount * download_times['target'] / result_time)
    # Make sure we don't request too much or too little
294
295
    expected_amount = max(min_dl, expected_amount)
    expected_amount = min(max_dl, expected_amount)
296
297
298
    return expected_amount


Matt Traudt's avatar
Matt Traudt committed
299
def result_putter(result_dump):
Matt Traudt's avatar
Matt Traudt committed
300
301
    ''' Create a function that takes a single argument -- the measurement
    result -- and return that function so it can be used by someone else '''
302
    def closure(measurement_result):
Matt Traudt's avatar
Matt Traudt committed
303
        return result_dump.queue.put(measurement_result)
Matt Traudt's avatar
Matt Traudt committed
304
305
    return closure

Matt Traudt's avatar
Matt Traudt committed
306

307
def result_putter_error(target):
Matt Traudt's avatar
Matt Traudt committed
308
309
310
    ''' Create a function that takes a single argument -- an error from a
    measurement -- and return that function so it can be used by someone else
    '''
311
    def closure(err):
312
313
        log.error('Unhandled exception caught while measuring %s: %s %s',
                  target.nickname, type(err), err)
314
    return closure
315
316


317
def run_speedtest(args, conf):
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
    controller, _ = stem_utils.init_controller(
        path=conf['tor']['control_socket'])
    if not controller:
        controller = stem_utils.launch_tor(conf)
    else:
        log.warning(
            'Is sbws already running? '
            'We found an existing Tor process at %s. We are not going to '
            'launch Tor, nor are we going to try to configure it to behave '
            'like we expect. This might work okay, but it also might not. '
            'If you experience problems, you should try letting sbws launch '
            'Tor for itself. The ability to use an already running Tor only '
            'exists for sbws developers. It is expected to be broken and may '
            'even lead to messed up results.', conf['tor']['control_socket'])
        time.sleep(15)
Matt Traudt's avatar
Matt Traudt committed
333
    rl = RelayList(args, conf, controller)
334
    cb = CB(args, conf, controller, rl)
335
336
    rd = ResultDump(args, conf, end_event)
    rp = RelayPrioritizer(args, conf, rl, rd)
337
338
    destinations, error_msg = DestinationList.from_config(
        conf, cb, rl, controller)
339
    if not destinations:
340
        fail_hard(error_msg)
Matt Traudt's avatar
Matt Traudt committed
341
    max_pending_results = conf.getint('scanner', 'measurement_threads')
Matt Traudt's avatar
Matt Traudt committed
342
343
    pool = Pool(max_pending_results)
    pending_results = []
344
345
    while True:
        for target in rp.best_priority():
346
347
            log.debug('Measuring %s %s', target.nickname,
                      target.fingerprint[0:8])
348
349
350
            callback = result_putter(rd)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
351
352
                dispatch_worker_thread,
                [args, conf, destinations, cb, rl, target],
353
                {}, callback, callback_err)
354
355
356
357
            pending_results.append(async_result)
            while len(pending_results) >= max_pending_results:
                time.sleep(5)
                pending_results = [r for r in pending_results if not r.ready()]
Matt Traudt's avatar
Matt Traudt committed
358
359


Matt Traudt's avatar
Matt Traudt committed
360
def gen_parser(sub):
Matt Traudt's avatar
Matt Traudt committed
361
    d = 'The scanner side of sbws. This should be run on a well-connected '\
362
363
364
        'machine on the Internet with a healthy amount of spare bandwidth. '\
        'This continuously builds circuits, measures relays, and dumps '\
        'results into a datadir, commonly found in ~/.sbws'
Matt Traudt's avatar
Matt Traudt committed
365
    sub.add_parser('scanner', formatter_class=ArgumentDefaultsHelpFormatter,
366
                   description=d)
Matt Traudt's avatar
Matt Traudt committed
367
368


369
def main(args, conf):
370
    if not is_initted(args.directory):
371
        fail_hard('Sbws isn\'t initialized. Try sbws init')
Matt Traudt's avatar
Matt Traudt committed
372

Matt Traudt's avatar
Matt Traudt committed
373
    if conf.getint('scanner', 'measurement_threads') < 1:
374
        fail_hard('Number of measurement threads must be larger than 1')
375

376
377
378
379
380
381
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
    if max_dl < min_dl:
        fail_hard('Max download size %d cannot be smaller than min %d',
                  max_dl, min_dl)

382
    os.makedirs(conf['paths']['datadir'], exist_ok=True)
383

384
385
386
    state = State(conf['paths']['state_fname'])
    state['scanner_started'] = now_isodt_str()

Matt Traudt's avatar
Matt Traudt committed
387
    try:
388
        run_speedtest(args, conf)
389
390
391
    except KeyboardInterrupt as e:
        raise e
    finally:
Matt Traudt's avatar
Matt Traudt committed
392
        end_event.set()