scanner.py 24.5 KB
Newer Older
1
''' Measure the relays. '''
2

3
import signal
4
5
import sys
import threading
6
import uuid
7

Matt Traudt's avatar
Matt Traudt committed
8
9
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
Matt Traudt's avatar
Matt Traudt committed
10
11
from ..lib.resultdump import ResultSuccess, ResultErrorCircuit
from ..lib.resultdump import ResultErrorStream
Matt Traudt's avatar
Matt Traudt committed
12
from ..lib.relaylist import RelayList
13
from ..lib.relayprioritizer import RelayPrioritizer
14
15
from ..lib.destination import (DestinationList,
                               connect_to_destination_over_circuit)
juga's avatar
juga committed
16
from ..util.timestamp import now_isodt_str
17
from ..util.state import State
juga's avatar
juga committed
18
from sbws.globals import fail_hard, HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS
Matt Traudt's avatar
Matt Traudt committed
19
import sbws.util.stem as stem_utils
20
import sbws.util.requests as requests_utils
Matt Traudt's avatar
Matt Traudt committed
21
22
23
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
import time
Matt Traudt's avatar
Matt Traudt committed
24
import os
25
import logging
26
import requests
27
import random
Matt Traudt's avatar
Matt Traudt committed
28

juga's avatar
juga committed
29
from .. import settings
30

31
rng = random.SystemRandom()
32
log = logging.getLogger(__name__)
33
34
35
36
37
# Declare the objects that manage the threads global so that sbws can exit
# gracefully at any time.
pool = None
rd = None
controller = None
Matt Traudt's avatar
Matt Traudt committed
38

39
40
41
FILLUP_TICKET_MSG = """Something went wrong.
Please create a ticket in https://trac.torproject.org with this traceback."""

42

43
def stop_threads(signal, frame, exit_code=0):
44
45
46
47
48
49
50
51
52
53
54
    global rd, pool
    log.debug('Stopping sbws.')
    # Avoid new threads to start.
    settings.set_end_event()
    # Stop Pool threads
    pool.close()
    pool.join()
    # Stop ResultDump thread
    rd.thread.join()
    # Stop Tor thread
    controller.close()
55
    sys.exit(exit_code)
56
57
58


signal.signal(signal.SIGTERM, stop_threads)
Matt Traudt's avatar
Matt Traudt committed
59

60

61
62
def dumpstacks():
    import traceback
63
64
65
    log.critical("sbws stop measuring relays, probably because of a bug."
                 "Please, open a ticket in trac.torproject.org with this"
                 "backtrace.")
66
67
68
69
70
    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
    for thread_id, stack in sys._current_frames().items():
        log.critical("Thread: %s(%d)",
                     thread_id2name.get(thread_id, ""), thread_id)
        log.critical(traceback.print_stack(stack))
71
72
73
74
75
76
77
78
79
    # If logging level is less than DEBUG (more verbose), start pdb so that
    # developers can debug the issue.
    if log.getEffectiveLevel() < logging.DEBUG:
        import pdb
        pdb.set_trace()
    # Otherwise exit.
    else:
        # Change to stop threads when #28869 is merged
        sys.exit(1)
80
81


82
83
84
def timed_recv_from_server(session, dest, byte_range):
    ''' Request the **byte_range** from the URL at **dest**. If successful,
    return True and the time it took to download. Otherwise return False and an
85
    exception. '''
86

87
    start_time = time.time()
88
    HTTP_GET_HEADERS['Range'] = byte_range
89
90
    # TODO:
    # - What other exceptions can this throw?
juga's avatar
juga committed
91
92
93
94
95
    # - response.elapsed "measures the time taken between sending the first
    #   byte of the request and finishing parsing the headers.
    #   It is therefore unaffected by consuming the response content"
    #   If this mean that the content has arrived, elapsed could be used to
    #   know the time it took.
96
    try:
97
        # headers are merged with the session ones, not overwritten.
98
        session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
99
    # NewConnectionError will be raised when shutting down.
juga's avatar
juga committed
100
    except (requests.exceptions.ConnectionError,
101
102
103
            requests.exceptions.ReadTimeout,
            requests.exceptions.NewConnectionError) as e:
        log.debug(e)
Matt Traudt's avatar
Matt Traudt committed
104
        return False, e
105
    end_time = time.time()
106
    return True, end_time - start_time
107
108


109
110
111
112
113
114
115
116
117
118
119
120
def get_random_range_string(content_length, size):
    '''
    Return a random range of bytes of length **size**. **content_length** is
    the size of the file we will be requesting a range of bytes from.

    For example, for content_length of 100 and size 10, this function will
    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
    '''
    assert size <= content_length
    # start can be anywhere in the content_length as long as it is **size**
    # bytes away from the end or more. Because range is [start, end) (doesn't
    # include the end value), add 1 to the end.
121
    start = rng.choice(range(0, content_length - size + 1))
122
123
124
125
126
127
128
129
130
    # Unlike range, the byte range in an http header is [start, end] (does
    # include the end value), so we subtract one
    end = start + size - 1
    # start and end are indexes, while content_length is a length, therefore,
    # the largest index end should ever be should be less than the total length
    # of the content. For example, if content_length is 10, end could be
    # anywhere from 0 to 9.
    assert end < content_length
    return 'bytes={}-{}'.format(start, end)
131
132


133
134
135
136
def measure_rtt_to_server(session, conf, dest, content_length):
    ''' Make multiple end-to-end RTT measurements by making small HTTP requests
    over a circuit + stream that should already exist, persist, and not need
    rebuilding. If something goes wrong and not all of the RTT measurements can
137
138
139
140
141
142
    be made, return None. Otherwise return a list of the RTTs (in seconds).

    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    '''
Matt Traudt's avatar
Matt Traudt committed
143
    rtts = []
144
    size = conf.getint('scanner', 'min_download_size')
Matt Traudt's avatar
Matt Traudt committed
145
    for _ in range(0, conf.getint('scanner', 'num_rtts')):
146
        log.debug('Measuring RTT to %s', dest.url)
147
        random_range = get_random_range_string(content_length, size)
148
149
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
150
            # data is an exception
juga's avatar
juga committed
151
152
153
            log.debug('While measuring the RTT to %s we hit an exception '
                      '(does the webserver support Range requests?): %s',
                      dest.url, data)
154
            return None, data
155
156
157
158
        assert success
        # data is an RTT
        assert isinstance(data, float) or isinstance(data, int)
        rtts.append(data)
159
    return rtts, None
Matt Traudt's avatar
Matt Traudt committed
160
161


Matt Traudt's avatar
Matt Traudt committed
162
def measure_bandwidth_to_server(session, conf, dest, content_length):
163
164
165
166
167
    """
    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    """
168
    results = []
Matt Traudt's avatar
Matt Traudt committed
169
    num_downloads = conf.getint('scanner', 'num_downloads')
Matt Traudt's avatar
Matt Traudt committed
170
171
172
    expected_amount = conf.getint('scanner', 'initial_read_request')
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
173
    download_times = {
Matt Traudt's avatar
Matt Traudt committed
174
175
176
177
        'toofast': conf.getfloat('scanner', 'download_toofast'),
        'min': conf.getfloat('scanner', 'download_min'),
        'target': conf.getfloat('scanner', 'download_target'),
        'max': conf.getfloat('scanner', 'download_max'),
178
    }
juga's avatar
juga committed
179
    while len(results) < num_downloads and not settings.end_event.is_set():
Matt Traudt's avatar
Matt Traudt committed
180
181
182
183
184
        assert expected_amount >= min_dl
        assert expected_amount <= max_dl
        random_range = get_random_range_string(content_length, expected_amount)
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
185
            # data is an exception
juga's avatar
juga committed
186
187
188
            log.debug('While measuring the bandwidth to %s we hit an '
                      'exception (does the webserver support Range '
                      'requests?): %s', dest.url, data)
189
            return None, data
Matt Traudt's avatar
Matt Traudt committed
190
191
192
        assert success
        # data is a download time
        assert isinstance(data, float) or isinstance(data, int)
193
        if _should_keep_result(
Matt Traudt's avatar
Matt Traudt committed
194
                expected_amount == max_dl, data, download_times):
195
            results.append({
Matt Traudt's avatar
Matt Traudt committed
196
                'duration': data, 'amount': expected_amount})
197
        expected_amount = _next_expected_amount(
198
            expected_amount, data, download_times, min_dl, max_dl)
199
    return results, None
Matt Traudt's avatar
Matt Traudt committed
200
201


202
203
204
205
206
207
def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
    '''
    Sbws builds two hop circuits. Given the **relay** to measure with
    destination **dest**, pick a second relay that is or is not an exit
    according to **is_exit**.
    '''
208
209
    candidates = rl.exits_not_bad_allowing_port(dest.port) if is_exit \
        else rl.non_exits
210
211
212
213
214
    if not len(candidates):
        return None
    log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
              relay.nickname, len(candidates), is_exit)
    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
juga's avatar
juga committed
215
        min_bw = relay.consensus_bandwidth * min_bw_factor
216
217
218
219
220
221
222
223
        new_candidates = stem_utils.only_relays_with_bandwidth(
            cont, candidates, min_bw=min_bw)
        if len(new_candidates) > 0:
            chosen = rng.choice(new_candidates)
            log.debug(
                'Found %d candidate 2nd hops with at least %sx the bandwidth '
                'of %s. Returning %s (bw=%s).',
                len(new_candidates), min_bw_factor, relay.nickname,
juga's avatar
juga committed
224
                chosen.nickname, chosen.consensus_bandwidth)
225
            return chosen
226
227
    candidates = sorted(candidates, key=lambda r: r.consensus_bandwidth,
                        reverse=True)
228
229
230
231
    chosen = candidates[0]
    log.debug(
        'Didn\'t find any 2nd hops at least as fast as %s (bw=%s). It\'s '
        'probably really fast. Returning %s (bw=%s), the fastest '
juga's avatar
juga committed
232
233
        'candidate we have.', relay.nickname, relay.consensus_bandwidth,
        chosen.nickname, chosen.consensus_bandwidth)
234
235
236
    return chosen


237
def measure_relay(args, conf, destinations, cb, rl, relay):
juga's avatar
juga committed
238
239
240
241
242
243
244
    """
    Select a Web server, a relay to build the circuit,
    build the circuit and measure the bandwidth of the given relay.

    :return Result: a measurement Result object

    """
245
    log.debug('Measuring %s %s', relay.nickname, relay.fingerprint)
Matt Traudt's avatar
Matt Traudt committed
246
247
    s = requests_utils.make_session(
        cb.controller, conf.getfloat('general', 'http_timeout'))
248
    # Pick a destionation
249
    dest = destinations.next()
250
    # If there is no any destination at this point, it can not continue.
251
    if not dest:
juga's avatar
juga committed
252
        # XXX: this should return a ResultError
253
        # instead of stopping the scanner once a destination can be recovered.
254
255
256
        log.critical("There are not any functional destinations.\n"
                     "It is recommended to set several destinations so that "
                     "the scanner can continue if one fails.")
257
258
        # Exit the scanner with error stopping threads first.
        stop_threads(signal.SIGTERM, None, 1)
259
260
261
262
    # Pick a relay to help us measure the given relay. If the given relay is an
    # exit, then pick a non-exit. Otherwise pick an exit.
    helper = None
    circ_fps = None
263
    if relay.is_exit_not_bad_allowing_port(dest.port):
264
265
266
267
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=False)
        if helper:
            circ_fps = [helper.fingerprint, relay.fingerprint]
268
269
            # stored for debugging
            nicknames = [helper.nickname, relay.nickname]
270
271
272
273
274
    else:
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=True)
        if helper:
            circ_fps = [relay.fingerprint, helper.fingerprint]
275
            nicknames = [relay.nickname, helper.nickname]
276
    if not helper:
277
        # TODO: Return ResultError of some sort
juga's avatar
juga committed
278
279
        log.debug('Unable to pick a 2nd relay to help measure %s (%s)',
                  relay.fingerprint, relay.nickname)
280
        return None
281
282
    assert helper
    assert circ_fps is not None and len(circ_fps) == 2
283
    # Build the circuit
Matt Traudt's avatar
Matt Traudt committed
284
    our_nick = conf['scanner']['nickname']
285
    circ_id, reason = cb.build_circuit(circ_fps)
286
    if not circ_id:
287
288
        log.debug('Could not build circuit with path %s (%s): %s ',
                  circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
289
        return [
juga's avatar
juga committed
290
291
            ResultErrorCircuit(relay, circ_fps, dest.url, our_nick,
                               msg=reason),
Matt Traudt's avatar
Matt Traudt committed
292
        ]
293
294
    log.debug('Built circuit with path %s (%s) to measure %s (%s)',
              circ_fps, nicknames, relay.fingerprint, relay.nickname)
295
296
297
    # Make a connection to the destination
    is_usable, usable_data = connect_to_destination_over_circuit(
        dest, circ_id, s, cb.controller, dest._max_dl)
298
    if not is_usable:
299
300
        log.debug('Destination %s unusable via circuit %s (%s), %s',
                  dest.url, circ_fps, nicknames, usable_data)
301
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
302
303
304
305
306
        # TODO: Return a different/new type of ResultError?
        msg = 'The destination seemed to have stopped being usable'
        return [
            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
        ]
307
308
    assert is_usable
    assert 'content_length' in usable_data
309
    # FIRST: measure RTT
310
311
    rtts, reason = measure_rtt_to_server(s, conf, dest,
                                         usable_data['content_length'])
312
    if rtts is None:
313
314
315
        log.debug('Unable to measure RTT for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
316
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
317
        return [
juga's avatar
juga committed
318
319
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
320
        ]
321
    # SECOND: measure bandwidth
322
    bw_results, reason = measure_bandwidth_to_server(
323
        s, conf, dest, usable_data['content_length'])
Matt Traudt's avatar
Matt Traudt committed
324
    if bw_results is None:
325
326
327
        log.debug('Unable to measure bandwidth for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
328
329
        cb.close_circuit(circ_id)
        return [
juga's avatar
juga committed
330
331
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
332
        ]
Matt Traudt's avatar
Matt Traudt committed
333
    cb.close_circuit(circ_id)
334
    # Finally: store result
335
    log.debug('Success measurement for %s (%s) via circuit %s (%s) to %s',
juga's avatar
juga committed
336
              relay.fingerprint, relay.nickname, circ_fps, nicknames, dest.url)
337
338
339
    return [
        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
    ]
Matt Traudt's avatar
Matt Traudt committed
340

341

342
def dispatch_worker_thread(*a, **kw):
343
    return measure_relay(*a, **kw)
344
345


346
347
348
349
350
351
352
353
354
355
356
357
358
359
def _should_keep_result(did_request_maximum, result_time, download_times):
    # In the normal case, we didn't ask for the maximum allowed amount. So we
    # should only allow ourselves to keep results that are between the min and
    # max allowed time
    if not did_request_maximum and \
            result_time >= download_times['min'] and \
            result_time < download_times['max']:
        return True
    # If we did request the maximum amount, we should keep the result as long
    # as it took less than the maximum amount of time
    if did_request_maximum and \
            result_time < download_times['max']:
        return True
    # In all other cases, return false
360
    log.debug('Not keeping result time %f.%s', result_time,
Matt Traudt's avatar
Matt Traudt committed
361
362
              '' if not did_request_maximum else ' We requested the maximum '
              'amount allowed.')
363
364
365
    return False


366
367
def _next_expected_amount(expected_amount, result_time, download_times,
                          min_dl, max_dl):
368
369
370
371
372
373
374
375
376
377
    if result_time < download_times['toofast']:
        # Way too fast, greatly increase the amount we ask for
        expected_amount = int(expected_amount * 5)
    elif result_time < download_times['min'] or \
            result_time >= download_times['max']:
        # As long as the result is between min/max, keep the expected amount
        # the same. Otherwise, adjust so we are aiming for the target amount.
        expected_amount = int(
            expected_amount * download_times['target'] / result_time)
    # Make sure we don't request too much or too little
378
379
    expected_amount = max(min_dl, expected_amount)
    expected_amount = min(max_dl, expected_amount)
380
381
382
    return expected_amount


Matt Traudt's avatar
Matt Traudt committed
383
def result_putter(result_dump):
Matt Traudt's avatar
Matt Traudt committed
384
385
    ''' Create a function that takes a single argument -- the measurement
    result -- and return that function so it can be used by someone else '''
386
    def closure(measurement_result):
Matt Traudt's avatar
Matt Traudt committed
387
        return result_dump.queue.put(measurement_result)
Matt Traudt's avatar
Matt Traudt committed
388
389
    return closure

Matt Traudt's avatar
Matt Traudt committed
390

391
def result_putter_error(target):
Matt Traudt's avatar
Matt Traudt committed
392
393
394
    ''' Create a function that takes a single argument -- an error from a
    measurement -- and return that function so it can be used by someone else
    '''
395
396
397
398
    def closure(object):
        # The only object that can be here if there is not any uncatched
        # exception is stem.SocketClosed when stopping sbws
        log.debug(type(object))
399
    return closure
400
401


juga's avatar
juga committed
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
              relay_prioritizer, destinations, max_pending_results, pool):
    """Starts and reuse the threads that measure the relays forever.

    It starts a loop that will be run while there is not and event signaling
    that sbws is stopping (because of SIGTERM or SIGINT).

    Then, it starts a second loop with an ordered list (generator) of relays
    to measure that might a subset of all the current relays in the Network.

    For every relay, it starts a new thread which runs ``measure_relay`` to
    measure the relay until there are ``max_pending_results`` threads.
    After that, it will reuse a thread that has finished for every relay to
    measure.
    It is the the pool method ``apply_async`` which starts or reuse a thread.
    This method returns an ``ApplyResult`` immediately, which has a ``ready``
    methods that tells whether the thread has finished or not.

    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
    queue and complete immediately.

    ``ResultDump`` thread (started before and out of this function) will get
    the ``Result`` from the queue and write it to disk, so this doesn't block
    the measurement threads.

    If there was an exception not catched by ``measure_relay``, it will call
    instead ``result_putter_error``, which logs the error and complete
    immediately.

    Before iterating over the next relay, it waits (non blocking, since it
    happens in the main thread) until one of the ``max_pending_results``
    threads has finished.

    This is not needed, since otherwise async_result will queue the relays to
    measure in order and won't start reusing a thread to measure a relay until
    other thread has finished. But it makes the logic a bit more sequential.

    Before the outer loop iterates, it also waits (again non blocking) that all
    the ``Results`` are ready.
    This avoid to start measuring the same relay which might still being
    measured.

    """
    pending_results = []
447
448
449
    # Set the time to wait for a thread to finish as the half of an HTTP
    # request timeout.
    time_to_sleep = conf.getfloat('general', 'http_timeout') / 2
juga's avatar
juga committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
    # Do not start a new loop if sbws is stopping.
    while not settings.end_event.is_set():
        log.debug("Starting a new measurement loop.")
        num_relays = 0
        loop_tstart = time.time()
        for target in relay_prioritizer.best_priority():
            # Don't start measuring a relay if sbws is stopping.
            if settings.end_event.is_set():
                break
            num_relays += 1
            # callback and callback_err must be non-blocking
            callback = result_putter(result_dump)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
                dispatch_worker_thread,
                [args, conf, destinations, circuit_builder, relay_list,
                 target], {}, callback, callback_err)
            pending_results.append(async_result)
            # Instead of letting apply_async to queue the relays in order until
            # a thread has finished, wait here until a thread has finished.
            while len(pending_results) >= max_pending_results:
471
472
                # sleep is non-blocking since happens in the main process.
                time.sleep(time_to_sleep)
juga's avatar
juga committed
473
                pending_results = [r for r in pending_results if not r.ready()]
juga's avatar
juga committed
474
475
476
477
478
        time_waiting = 0
        while (len(pending_results) > 0
               and time_waiting <= TIMEOUT_MEASUREMENTS):
            log.debug("Number of pending measurement threads %s after "
                      "a prioritization loop.", len(pending_results))
479
            time.sleep(time_to_sleep)
juga's avatar
juga committed
480
            time_waiting += time_to_sleep
juga's avatar
juga committed
481
            pending_results = [r for r in pending_results if not r.ready()]
juga's avatar
juga committed
482
483
        if time_waiting > TIMEOUT_MEASUREMENTS:
            dumpstacks()
juga's avatar
juga committed
484
485
486
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
        log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
juga's avatar
juga committed
487
488
489
490
        # In a testing network, exit after first loop
        if controller.get_conf('TestingTorNetwork') == '1':
            log.info("In a testing network, exiting after the first loop.")
            # Threads should be closed nicely in some refactor
juga's avatar
juga committed
491
492
            stop_threads(signal.SIGTERM, None)

juga's avatar
juga committed
493

494
def run_speedtest(args, conf):
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
    """Initializes all the data and threads needed to measure the relays.

    It launches or connect to Tor in a thread.
    It initializes the list of relays seen in the Tor network.
    It starts a thread to read the previous measurements and wait for new
    measurements to write them to the disk.
    It initializes a class that will be used to order the relays depending
    on their measurements age.
    It initializes the list of destinations that will be used for the
    measurements.
    It initializes the thread pool that will launch the measurement threads.
    The pool starts 3 other threads that are not the measurement (worker)
    threads.
    Finally, it calls the function that will manage the measurement threads.

    """
511
    global rd, pool, controller
512
    controller, _ = stem_utils.init_controller(
513
        path=conf.getpath('tor', 'control_socket'))
514
515
516
517
518
519
520
521
522
523
524
    if not controller:
        controller = stem_utils.launch_tor(conf)
    else:
        log.warning(
            'Is sbws already running? '
            'We found an existing Tor process at %s. We are not going to '
            'launch Tor, nor are we going to try to configure it to behave '
            'like we expect. This might work okay, but it also might not. '
            'If you experience problems, you should try letting sbws launch '
            'Tor for itself. The ability to use an already running Tor only '
            'exists for sbws developers. It is expected to be broken and may '
525
526
            'even lead to messed up results.',
            conf.getpath('tor', 'control_socket'))
527
        time.sleep(15)
528
529
530
531
532
533
534
535

    # When there will be a refactor where conf is global, this can be removed
    # from here.
    state = State(conf.getpath('paths', 'state_fname'))
    # Call only once to initialize http_headers
    settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
                               str(controller.get_version()))

Matt Traudt's avatar
Matt Traudt committed
536
    rl = RelayList(args, conf, controller)
537
    cb = CB(args, conf, controller, rl)
juga's avatar
juga committed
538
    rd = ResultDump(args, conf)
539
    rp = RelayPrioritizer(args, conf, rl, rd)
540
541
    destinations, error_msg = DestinationList.from_config(
        conf, cb, rl, controller)
542
    if not destinations:
543
        fail_hard(error_msg)
Matt Traudt's avatar
Matt Traudt committed
544
    max_pending_results = conf.getint('scanner', 'measurement_threads')
Matt Traudt's avatar
Matt Traudt committed
545
    pool = Pool(max_pending_results)
juga's avatar
juga committed
546
547
548
549
550
551
    try:
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations,
                  max_pending_results, pool)
    except KeyboardInterrupt:
        log.info("Interrupted by the user.")
        stop_threads(signal.SIGINT, None)
552
553
554
555
556
557
    # Any exception not catched at this point would make the scanner stall.
    # Log it and exit gracefully.
    except Exception as e:
        log.critical(FILLUP_TICKET_MSG)
        log.exception(e)
        stop_threads(signal.SIGTERM, None, 1)
Matt Traudt's avatar
Matt Traudt committed
558
559


Matt Traudt's avatar
Matt Traudt committed
560
def gen_parser(sub):
Matt Traudt's avatar
Matt Traudt committed
561
    d = 'The scanner side of sbws. This should be run on a well-connected '\
562
563
564
        'machine on the Internet with a healthy amount of spare bandwidth. '\
        'This continuously builds circuits, measures relays, and dumps '\
        'results into a datadir, commonly found in ~/.sbws'
Matt Traudt's avatar
Matt Traudt committed
565
    sub.add_parser('scanner', formatter_class=ArgumentDefaultsHelpFormatter,
566
                   description=d)
Matt Traudt's avatar
Matt Traudt committed
567
568


569
def main(args, conf):
Matt Traudt's avatar
Matt Traudt committed
570
    if conf.getint('scanner', 'measurement_threads') < 1:
571
        fail_hard('Number of measurement threads must be larger than 1')
572

573
574
575
576
577
578
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
    if max_dl < min_dl:
        fail_hard('Max download size %d cannot be smaller than min %d',
                  max_dl, min_dl)

579
    os.makedirs(conf.getpath('paths', 'datadir'), exist_ok=True)
580

581
    state = State(conf.getpath('paths', 'state_fname'))
582
    state['scanner_started'] = now_isodt_str()
583
584
585
    # Generate an unique identifier for each scanner
    if 'uuid' not in state:
        state['uuid'] = str(uuid.uuid4())
586

juga's avatar
juga committed
587
    run_speedtest(args, conf)