scanner.py 31.3 KB
Newer Older
1
''' Measure the relays. '''
2
import queue
3

4
import signal
5
6
import sys
import threading
juga's avatar
juga committed
7
import traceback
8
import uuid
9

10
11
from multiprocessing.context import TimeoutError

Matt Traudt's avatar
Matt Traudt committed
12
13
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
14
15
from ..lib.resultdump import (
    ResultSuccess, ResultErrorCircuit, ResultErrorStream,
16
    ResultErrorSecondRelay,  ResultError, ResultErrorDestination
17
    )
Matt Traudt's avatar
Matt Traudt committed
18
from ..lib.relaylist import RelayList
19
from ..lib.relayprioritizer import RelayPrioritizer
20
21
from ..lib.destination import (DestinationList,
                               connect_to_destination_over_circuit)
juga's avatar
juga committed
22
from ..util.timestamp import now_isodt_str
23
from ..util.state import State
juga's avatar
juga committed
24
from sbws.globals import fail_hard, HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS
Matt Traudt's avatar
Matt Traudt committed
25
import sbws.util.stem as stem_utils
26
import sbws.util.requests as requests_utils
Matt Traudt's avatar
Matt Traudt committed
27
28
29
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
import time
Matt Traudt's avatar
Matt Traudt committed
30
import os
31
import logging
32
import random
Matt Traudt's avatar
Matt Traudt committed
33

juga's avatar
juga committed
34
from .. import settings
35
from ..lib.heartbeat import Heartbeat
36

37
rng = random.SystemRandom()
38
log = logging.getLogger(__name__)
39
40
41
42
43
# Declare the objects that manage the threads global so that sbws can exit
# gracefully at any time.
pool = None
rd = None
controller = None
Matt Traudt's avatar
Matt Traudt committed
44

45
FILLUP_TICKET_MSG = """Something went wrong.
juga's avatar
juga committed
46
47
48
Please create an issue at
https://gitlab.torproject.org/tpo/network-health/sbws/-/issues with this
traceback."""
49

50

51
def stop_threads(signal, frame, exit_code=0):
52
53
54
55
56
57
58
59
60
61
62
    global rd, pool
    log.debug('Stopping sbws.')
    # Avoid new threads to start.
    settings.set_end_event()
    # Stop Pool threads
    pool.close()
    pool.join()
    # Stop ResultDump thread
    rd.thread.join()
    # Stop Tor thread
    controller.close()
63
    sys.exit(exit_code)
64
65
66


signal.signal(signal.SIGTERM, stop_threads)
Matt Traudt's avatar
Matt Traudt committed
67

68

69
def dumpstacks():
70
    log.critical(FILLUP_TICKET_MSG)
71
72
73
74
    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
    for thread_id, stack in sys._current_frames().items():
        log.critical("Thread: %s(%d)",
                     thread_id2name.get(thread_id, ""), thread_id)
75
        log.critical(traceback.format_stack("".join(stack)))
76
77
78
79
80
81
82
83
84
    # If logging level is less than DEBUG (more verbose), start pdb so that
    # developers can debug the issue.
    if log.getEffectiveLevel() < logging.DEBUG:
        import pdb
        pdb.set_trace()
    # Otherwise exit.
    else:
        # Change to stop threads when #28869 is merged
        sys.exit(1)
85
86


87
88
89
def timed_recv_from_server(session, dest, byte_range):
    ''' Request the **byte_range** from the URL at **dest**. If successful,
    return True and the time it took to download. Otherwise return False and an
90
    exception. '''
91

92
    start_time = time.time()
93
    HTTP_GET_HEADERS['Range'] = byte_range
juga's avatar
juga committed
94
95
96
97
98
    # - response.elapsed "measures the time taken between sending the first
    #   byte of the request and finishing parsing the headers.
    #   It is therefore unaffected by consuming the response content"
    #   If this mean that the content has arrived, elapsed could be used to
    #   know the time it took.
99
    try:
100
        # headers are merged with the session ones, not overwritten.
101
        session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
102
103
104
105
106
    # All `requests` exceptions could be caught with
    # `requests.exceptions.RequestException`, but it seems that `requests`
    # does not catch all the ssl exceptions and urllib3 doesn't seem to have
    # a base exception class.
    except Exception as e:
107
        log.debug(e)
Matt Traudt's avatar
Matt Traudt committed
108
        return False, e
109
    end_time = time.time()
110
    return True, end_time - start_time
111
112


113
114
115
116
117
118
119
120
121
122
123
124
def get_random_range_string(content_length, size):
    '''
    Return a random range of bytes of length **size**. **content_length** is
    the size of the file we will be requesting a range of bytes from.

    For example, for content_length of 100 and size 10, this function will
    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
    '''
    assert size <= content_length
    # start can be anywhere in the content_length as long as it is **size**
    # bytes away from the end or more. Because range is [start, end) (doesn't
    # include the end value), add 1 to the end.
125
    start = rng.choice(range(0, content_length - size + 1))
126
127
128
129
130
131
132
133
134
    # Unlike range, the byte range in an http header is [start, end] (does
    # include the end value), so we subtract one
    end = start + size - 1
    # start and end are indexes, while content_length is a length, therefore,
    # the largest index end should ever be should be less than the total length
    # of the content. For example, if content_length is 10, end could be
    # anywhere from 0 to 9.
    assert end < content_length
    return 'bytes={}-{}'.format(start, end)
135
136


137
138
139
140
def measure_rtt_to_server(session, conf, dest, content_length):
    ''' Make multiple end-to-end RTT measurements by making small HTTP requests
    over a circuit + stream that should already exist, persist, and not need
    rebuilding. If something goes wrong and not all of the RTT measurements can
141
142
143
144
145
146
    be made, return None. Otherwise return a list of the RTTs (in seconds).

    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    '''
Matt Traudt's avatar
Matt Traudt committed
147
    rtts = []
148
    size = conf.getint('scanner', 'min_download_size')
Matt Traudt's avatar
Matt Traudt committed
149
    for _ in range(0, conf.getint('scanner', 'num_rtts')):
150
        log.debug('Measuring RTT to %s', dest.url)
151
        random_range = get_random_range_string(content_length, size)
152
153
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
154
            # data is an exception
juga's avatar
juga committed
155
156
157
            log.debug('While measuring the RTT to %s we hit an exception '
                      '(does the webserver support Range requests?): %s',
                      dest.url, data)
158
            return None, data
159
160
161
162
        assert success
        # data is an RTT
        assert isinstance(data, float) or isinstance(data, int)
        rtts.append(data)
163
    return rtts, None
Matt Traudt's avatar
Matt Traudt committed
164
165


Matt Traudt's avatar
Matt Traudt committed
166
def measure_bandwidth_to_server(session, conf, dest, content_length):
167
168
169
170
171
    """
    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    """
172
    results = []
Matt Traudt's avatar
Matt Traudt committed
173
    num_downloads = conf.getint('scanner', 'num_downloads')
Matt Traudt's avatar
Matt Traudt committed
174
175
176
    expected_amount = conf.getint('scanner', 'initial_read_request')
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
177
    download_times = {
Matt Traudt's avatar
Matt Traudt committed
178
179
180
181
        'toofast': conf.getfloat('scanner', 'download_toofast'),
        'min': conf.getfloat('scanner', 'download_min'),
        'target': conf.getfloat('scanner', 'download_target'),
        'max': conf.getfloat('scanner', 'download_max'),
182
    }
juga's avatar
juga committed
183
    while len(results) < num_downloads and not settings.end_event.is_set():
Matt Traudt's avatar
Matt Traudt committed
184
185
186
187
188
        assert expected_amount >= min_dl
        assert expected_amount <= max_dl
        random_range = get_random_range_string(content_length, expected_amount)
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
189
            # data is an exception
juga's avatar
juga committed
190
191
192
            log.debug('While measuring the bandwidth to %s we hit an '
                      'exception (does the webserver support Range '
                      'requests?): %s', dest.url, data)
193
            return None, data
Matt Traudt's avatar
Matt Traudt committed
194
195
196
        assert success
        # data is a download time
        assert isinstance(data, float) or isinstance(data, int)
197
        if _should_keep_result(
Matt Traudt's avatar
Matt Traudt committed
198
                expected_amount == max_dl, data, download_times):
199
            results.append({
Matt Traudt's avatar
Matt Traudt committed
200
                'duration': data, 'amount': expected_amount})
201
        expected_amount = _next_expected_amount(
202
            expected_amount, data, download_times, min_dl, max_dl)
203
    return results, None
Matt Traudt's avatar
Matt Traudt committed
204
205


206
207
208
209
210
211
def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
    '''
    Sbws builds two hop circuits. Given the **relay** to measure with
    destination **dest**, pick a second relay that is or is not an exit
    according to **is_exit**.
    '''
212
213
    candidates = rl.exits_not_bad_allowing_port(dest.port) if is_exit \
        else rl.non_exits
214
215
    if not len(candidates):
        return None
216
    min_relay_bw = rl.exit_min_bw() if is_exit else rl.non_exit_min_bw()
217
218
219
    log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
              relay.nickname, len(candidates), is_exit)
    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
juga's avatar
juga committed
220
        min_bw = relay.consensus_bandwidth * min_bw_factor
221
222
223
224
225
        # We might have a really slow/new relay. Try to measure it properly by
        # using only relays with or above our calculated min_relay_bw (see:
        # _calculate_min_bw_second_hop() in relaylist.py).
        if min_bw < min_relay_bw:
            min_bw = min_relay_bw
226
227
228
229
230
231
232
233
        new_candidates = stem_utils.only_relays_with_bandwidth(
            cont, candidates, min_bw=min_bw)
        if len(new_candidates) > 0:
            chosen = rng.choice(new_candidates)
            log.debug(
                'Found %d candidate 2nd hops with at least %sx the bandwidth '
                'of %s. Returning %s (bw=%s).',
                len(new_candidates), min_bw_factor, relay.nickname,
juga's avatar
juga committed
234
                chosen.nickname, chosen.consensus_bandwidth)
235
            return chosen
236
237
    candidates = sorted(candidates, key=lambda r: r.consensus_bandwidth,
                        reverse=True)
238
239
240
241
    chosen = candidates[0]
    log.debug(
        'Didn\'t find any 2nd hops at least as fast as %s (bw=%s). It\'s '
        'probably really fast. Returning %s (bw=%s), the fastest '
juga's avatar
juga committed
242
243
        'candidate we have.', relay.nickname, relay.consensus_bandwidth,
        chosen.nickname, chosen.consensus_bandwidth)
244
245
246
    return chosen


247
def measure_relay(args, conf, destinations, cb, rl, relay):
juga's avatar
juga committed
248
249
250
251
252
253
254
    """
    Select a Web server, a relay to build the circuit,
    build the circuit and measure the bandwidth of the given relay.

    :return Result: a measurement Result object

    """
255
    log.debug('Measuring %s %s', relay.nickname, relay.fingerprint)
256
    our_nick = conf['scanner']['nickname']
Matt Traudt's avatar
Matt Traudt committed
257
258
    s = requests_utils.make_session(
        cb.controller, conf.getfloat('general', 'http_timeout'))
259
260
    # Probably because the scanner is stopping.
    if s is None:
261
262
263
264
265
266
267
268
269
270
271
        if settings.end_event.is_set():
            return None
        else:
            # In future refactor this should be returned from the make_session
            reason = "Unable to get proxies."
            log.debug(reason + ' to measure %s %s',
                      relay.nickname, relay.fingerprint)
            return [
                ResultError(relay, [], '', our_nick,
                            msg=reason),
                ]
272
    # Pick a destionation
273
    dest = destinations.next()
274
    # When there're no any functional destinations.
275
    if not dest:
276
277
278
279
280
281
282
283
        # NOTE: When there're still functional destinations but only one of
        # them fail, the error will be included in `ResultErrorStream`.
        # Since this is being executed in a thread, the scanner can not
        # be stop here, but the `end_event` signal can be set so that the
        # main thread stop the scanner.
        # It might be useful to store the fact that the destinations fail,
        # so store here the error, and set the signal once the error is stored
        # (in `resultump`).
284
285
286
        log.critical("There are not any functional destinations.\n"
                     "It is recommended to set several destinations so that "
                     "the scanner can continue if one fails.")
287
288
289
290
291
292
        reason = "No functional destinations"
        # Resultdump will set end_event after storing the error
        return [
            ResultErrorDestination(relay, [], '', our_nick, msg=reason),
        ]

293
294
295
296
    # Pick a relay to help us measure the given relay. If the given relay is an
    # exit, then pick a non-exit. Otherwise pick an exit.
    helper = None
    circ_fps = None
297
    if relay.is_exit_not_bad_allowing_port(dest.port):
298
299
300
301
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=False)
        if helper:
            circ_fps = [helper.fingerprint, relay.fingerprint]
302
303
            # stored for debugging
            nicknames = [helper.nickname, relay.nickname]
304
305
306
307
308
    else:
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=True)
        if helper:
            circ_fps = [relay.fingerprint, helper.fingerprint]
309
            nicknames = [relay.nickname, helper.nickname]
310
    if not helper:
311
312
        reason = 'Unable to select a second relay'
        log.debug(reason + ' to help measure %s (%s)',
juga's avatar
juga committed
313
                  relay.fingerprint, relay.nickname)
314
315
316
317
318
        return [
            ResultErrorSecondRelay(relay, [], dest.url, our_nick,
                                   msg=reason),
            ]

319
    # Build the circuit
320
    circ_id, reason = cb.build_circuit(circ_fps)
321
    if not circ_id:
322
323
        log.debug('Could not build circuit with path %s (%s): %s ',
                  circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
324
        return [
juga's avatar
juga committed
325
326
            ResultErrorCircuit(relay, circ_fps, dest.url, our_nick,
                               msg=reason),
Matt Traudt's avatar
Matt Traudt committed
327
        ]
328
329
    log.debug('Built circuit with path %s (%s) to measure %s (%s)',
              circ_fps, nicknames, relay.fingerprint, relay.nickname)
330
331
332
    # Make a connection to the destination
    is_usable, usable_data = connect_to_destination_over_circuit(
        dest, circ_id, s, cb.controller, dest._max_dl)
333
    if not is_usable:
334
335
        log.debug('Destination %s unusable via circuit %s (%s), %s',
                  dest.url, circ_fps, nicknames, usable_data)
336
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
337
        return [
338
339
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=usable_data),
Matt Traudt's avatar
Matt Traudt committed
340
        ]
341
342
    assert is_usable
    assert 'content_length' in usable_data
343
    # FIRST: measure RTT
344
345
    rtts, reason = measure_rtt_to_server(s, conf, dest,
                                         usable_data['content_length'])
346
    if rtts is None:
347
348
349
        log.debug('Unable to measure RTT for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
350
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
351
        return [
juga's avatar
juga committed
352
353
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
354
        ]
355
    # SECOND: measure bandwidth
356
    bw_results, reason = measure_bandwidth_to_server(
357
        s, conf, dest, usable_data['content_length'])
Matt Traudt's avatar
Matt Traudt committed
358
    if bw_results is None:
359
360
361
        log.debug('Unable to measure bandwidth for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
362
363
        cb.close_circuit(circ_id)
        return [
juga's avatar
juga committed
364
365
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
366
        ]
Matt Traudt's avatar
Matt Traudt committed
367
    cb.close_circuit(circ_id)
368
    # Finally: store result
369
    log.debug('Success measurement for %s (%s) via circuit %s (%s) to %s',
juga's avatar
juga committed
370
              relay.fingerprint, relay.nickname, circ_fps, nicknames, dest.url)
371
372
373
    return [
        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
    ]
Matt Traudt's avatar
Matt Traudt committed
374

375

376
def dispatch_worker_thread(*a, **kw):
377
378
379
380
381
382
383
384
385
386
387
388
389
    # If at the point where the relay is actually going to be measured there
    # are not any functional destinations or the `end_event` is set, do not
    # try to start measuring the relay, since it will fail anyway.
    try:
        # a[2] is the argument `destinations`
        functional_destinations = a[2].functional_destinations
    # In case the arguments or the method change, catch the possible exceptions
    # but ignore here that there are not destinations.
    except (IndexError, TypeError):
        log.debug("Wrong argument or attribute.")
        functional_destinations = True
    if not functional_destinations or settings.end_event.is_set():
        return None
390
    return measure_relay(*a, **kw)
391
392


393
394
395
396
397
398
399
400
401
402
403
404
405
406
def _should_keep_result(did_request_maximum, result_time, download_times):
    # In the normal case, we didn't ask for the maximum allowed amount. So we
    # should only allow ourselves to keep results that are between the min and
    # max allowed time
    if not did_request_maximum and \
            result_time >= download_times['min'] and \
            result_time < download_times['max']:
        return True
    # If we did request the maximum amount, we should keep the result as long
    # as it took less than the maximum amount of time
    if did_request_maximum and \
            result_time < download_times['max']:
        return True
    # In all other cases, return false
407
    log.debug('Not keeping result time %f.%s', result_time,
Matt Traudt's avatar
Matt Traudt committed
408
409
              '' if not did_request_maximum else ' We requested the maximum '
              'amount allowed.')
410
411
412
    return False


413
414
def _next_expected_amount(expected_amount, result_time, download_times,
                          min_dl, max_dl):
415
416
417
418
419
420
421
422
423
424
    if result_time < download_times['toofast']:
        # Way too fast, greatly increase the amount we ask for
        expected_amount = int(expected_amount * 5)
    elif result_time < download_times['min'] or \
            result_time >= download_times['max']:
        # As long as the result is between min/max, keep the expected amount
        # the same. Otherwise, adjust so we are aiming for the target amount.
        expected_amount = int(
            expected_amount * download_times['target'] / result_time)
    # Make sure we don't request too much or too little
425
426
    expected_amount = max(min_dl, expected_amount)
    expected_amount = min(max_dl, expected_amount)
427
428
429
    return expected_amount


Matt Traudt's avatar
Matt Traudt committed
430
def result_putter(result_dump):
Matt Traudt's avatar
Matt Traudt committed
431
432
    ''' Create a function that takes a single argument -- the measurement
    result -- and return that function so it can be used by someone else '''
433

434
    def closure(measurement_result):
435
436
        # Since result_dump thread is calling queue.get() every second,
        # the queue should be full for only 1 second.
437
438
439
440
441
442
443
444
445
446
447
        # This call blocks at maximum timeout seconds.
        try:
            result_dump.queue.put(measurement_result, timeout=3)
        except queue.Full:
            # The result would be lost, the scanner will continue working.
            log.warning(
                "The queue with measurements is full, when adding %s.\n"
                "It is possible that the thread that get them to "
                "write them to the disk (ResultDump.enter) is stalled.",
                measurement_result
                )
Matt Traudt's avatar
Matt Traudt committed
448
449
    return closure

Matt Traudt's avatar
Matt Traudt committed
450

451
def result_putter_error(target):
Matt Traudt's avatar
Matt Traudt committed
452
453
454
    ''' Create a function that takes a single argument -- an error from a
    measurement -- and return that function so it can be used by someone else
    '''
455
    def closure(object):
456
457
        if settings.end_event.is_set():
            return
458
459
        # The only object that can be here if there is not any uncatched
        # exception is stem.SocketClosed when stopping sbws
juga's avatar
juga committed
460
461
        # An exception here means that the worker thread finished.
        log.warning(FILLUP_TICKET_MSG)
462
463
464
465
466
        # To print the traceback that happened in the thread, not here in
        # the main process.
        log.warning("".join(traceback.format_exception(
            type(object), object, object.__traceback__))
            )
467
    return closure
468
469


juga's avatar
juga committed
470
def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
471
              relay_prioritizer, destinations, pool):
juga's avatar
juga committed
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
    """Starts and reuse the threads that measure the relays forever.

    It starts a loop that will be run while there is not and event signaling
    that sbws is stopping (because of SIGTERM or SIGINT).

    Then, it starts a second loop with an ordered list (generator) of relays
    to measure that might a subset of all the current relays in the Network.

    For every relay, it starts a new thread which runs ``measure_relay`` to
    measure the relay until there are ``max_pending_results`` threads.
    After that, it will reuse a thread that has finished for every relay to
    measure.
    It is the the pool method ``apply_async`` which starts or reuse a thread.
    This method returns an ``ApplyResult`` immediately, which has a ``ready``
    methods that tells whether the thread has finished or not.

    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
    queue and complete immediately.

    ``ResultDump`` thread (started before and out of this function) will get
    the ``Result`` from the queue and write it to disk, so this doesn't block
    the measurement threads.

    If there was an exception not catched by ``measure_relay``, it will call
    instead ``result_putter_error``, which logs the error and complete
    immediately.

500
501
    Before the outer loop iterates, it waits (non blocking) that all
    the ``Results`` are ready calling ``wait_for_results``.
juga's avatar
juga committed
502
503
504
505
    This avoid to start measuring the same relay which might still being
    measured.

    """
506
    hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
juga's avatar
juga committed
507

508
509
    # Set the time to wait for a thread to finish as the half of an HTTP
    # request timeout.
juga's avatar
juga committed
510
511
512
513
    # Do not start a new loop if sbws is stopping.
    while not settings.end_event.is_set():
        log.debug("Starting a new measurement loop.")
        num_relays = 0
514
515
516
        # Since loop might finish before pending_results is 0 due waiting too
        # long, set it here and not outside the loop.
        pending_results = []
juga's avatar
juga committed
517
        loop_tstart = time.time()
518
519
520
521

        # Register relay fingerprints to the heartbeat module
        hbeat.register_consensus_fprs(relay_list.relays_fingerprints)

juga's avatar
juga committed
522
523
524
525
        for target in relay_prioritizer.best_priority():
            # Don't start measuring a relay if sbws is stopping.
            if settings.end_event.is_set():
                break
526
            relay_list.increment_recent_measurement_attempt()
527
            target.increment_relay_recent_measurement_attempt()
juga's avatar
juga committed
528
529
530
531
532
533
534
535
536
            num_relays += 1
            # callback and callback_err must be non-blocking
            callback = result_putter(result_dump)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
                dispatch_worker_thread,
                [args, conf, destinations, circuit_builder, relay_list,
                 target], {}, callback, callback_err)
            pending_results.append(async_result)
537

538
539
540
            # Register this measurement to the heartbeat module
            hbeat.register_measured_fpr(target.fingerprint)

541
542
543
544
545
546
547
        # After the for has finished, the pool has queued all the relays
        # and pending_results has the list of all the AsyncResults.
        # It could also be obtained with pool._cache, which contains
        # a dictionary with AsyncResults as items.
        num_relays_to_measure = len(pending_results)
        wait_for_results(num_relays_to_measure, pending_results)

548
549
        # Print the heartbeat message
        hbeat.print_heartbeat_message()
juga's avatar
juga committed
550

juga's avatar
juga committed
551
552
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
553
554
555
556
        # At this point, we know the relays that were queued to be measured.
        # That does not mean they were actually measured.
        log.debug("Attempted to measure %s relays in %s minutes",
                  num_relays, loop_tdelta)
juga's avatar
juga committed
557
558
559
560
        # In a testing network, exit after first loop
        if controller.get_conf('TestingTorNetwork') == '1':
            log.info("In a testing network, exiting after the first loop.")
            # Threads should be closed nicely in some refactor
juga's avatar
juga committed
561
562
            stop_threads(signal.SIGTERM, None)

juga's avatar
juga committed
563

564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def wait_for_results(num_relays_to_measure, pending_results):
    """Wait for the pool to finish and log progress.

    While there are relays being measured, just log the progress
    and sleep :const:`~sbws.globals.TIMEOUT_MEASUREMENTS` (3mins),
    which is aproximately the time it can take to measure a relay in
    the worst case.

    When there has not been any relay measured in ``TIMEOUT_MEASUREMENTS``
    and there are still relays pending to be measured, it means there is no
    progress and call :func:`~sbws.core.scanner.force_get_results`.

    This can happen in the case of a bug that makes either
    :func:`~sbws.core.scanner.measure_relay`,
    :func:`~sbws.core.scanner.result_putter` (callback) and/or
    :func:`~sbws.core.scanner.result_putter_error` (callback error) stall.

    .. note:: in a future refactor, this could be simpler by:

      1. Initializing the pool at the begingging of each loop
      2. Callling :meth:`~Pool.close`; :meth:`~Pool.join` after
         :meth:`~Pool.apply_async`,
         to ensure no new jobs are added until the pool has finished with all
         the ones in the queue.

      As currently, there would be still two cases when the pool could stall:

      1. There's an exception in ``measure_relay`` and another in
         ``callback_err``
      2. There's an exception ``callback``.

      This could also be simpler by not having callback and callback error in
      ``apply_async`` and instead just calling callback with the
      ``pending_results``.

      (callback could be also simpler by not having a thread and queue and
      just storing to disk, since the time to write to disk is way smaller
      than the time to request over the network.)
    """
    num_last_measured = 1
604
    while num_last_measured > 0 and not settings.end_event.is_set():
605
606
607
608
        log.info("Pending measurements: %s out of %s: ",
                 len(pending_results), num_relays_to_measure)
        time.sleep(TIMEOUT_MEASUREMENTS)
        old_pending_results = pending_results
609
        pending_results = [r for r in pending_results if not r.ready()]
610
611
612
        num_last_measured = len(old_pending_results) - len(pending_results)
    if len(pending_results) > 0:
        force_get_results(pending_results)
613
614


615
def force_get_results(pending_results):
616
    """Try to get either the result or an exception, which gets logged.
617

618
619
620
621
622
    It is call by :func:`~sbws.core.scanner.wait_for_results` when
    the time waiting for the results was long.

    To get either the :class:`~sbws.lib.resultdump.Result` or an exception,
    call :meth:`~AsyncResult.get` with timeout.
623
    Timeout is low since we already waited.
624
625
626

    ``get`` is not call before, because it blocks and the callbacks
    are not call.
627
    """
628
    log.debug("Forcing get")
629
630
631
632
633
634
635
636
637
    for r in pending_results:
        try:
            result = r.get(timeout=0.1)
            log.warning("Result %s was not stored, it took too long.",
                        result)
        # TimeoutError is raised when the result is not ready, ie. has not
        # been processed yet
        except TimeoutError:
            log.warning("A result was not stored, it was not ready.")
638
639
640
641
        # If the result raised an exception, `get` returns it,
        # then log any exception so that it can be fixed.
        # This should not happen, since `callback_err` would have been call
        # first.
642
643
        except Exception as e:
            log.critical(FILLUP_TICKET_MSG)
644
645
            # If the exception happened in the threads, `log.exception` does
            # not have the traceback.
646
647
648
649
            # Using `format_exception` instead of of `print_exception` to show
            # the traceback in all the log handlers.
            log.warning("".join(traceback.format_exception(
                        type(e), e, e.__traceback__)))
650
651


652
def run_speedtest(args, conf):
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
    """Initializes all the data and threads needed to measure the relays.

    It launches or connect to Tor in a thread.
    It initializes the list of relays seen in the Tor network.
    It starts a thread to read the previous measurements and wait for new
    measurements to write them to the disk.
    It initializes a class that will be used to order the relays depending
    on their measurements age.
    It initializes the list of destinations that will be used for the
    measurements.
    It initializes the thread pool that will launch the measurement threads.
    The pool starts 3 other threads that are not the measurement (worker)
    threads.
    Finally, it calls the function that will manage the measurement threads.

    """
669
    global rd, pool, controller
670
    controller, _ = stem_utils.init_controller(
671
        path=conf.getpath('tor', 'control_socket'))
672
673
674
675
676
677
678
679
680
681
682
    if not controller:
        controller = stem_utils.launch_tor(conf)
    else:
        log.warning(
            'Is sbws already running? '
            'We found an existing Tor process at %s. We are not going to '
            'launch Tor, nor are we going to try to configure it to behave '
            'like we expect. This might work okay, but it also might not. '
            'If you experience problems, you should try letting sbws launch '
            'Tor for itself. The ability to use an already running Tor only '
            'exists for sbws developers. It is expected to be broken and may '
683
684
            'even lead to messed up results.',
            conf.getpath('tor', 'control_socket'))
685
        time.sleep(15)
686
687
688
689

    # When there will be a refactor where conf is global, this can be removed
    # from here.
    state = State(conf.getpath('paths', 'state_fname'))
juga's avatar
juga committed
690
691
692
693
694
    # XXX: tech-debt: create new function to obtain the controller and to
    # write the state, so that a unit test to check the state tor version can
    # be created
    # Store tor version whenever the scanner starts.
    state['tor_version'] = str(controller.get_version())
695
696
    # Call only once to initialize http_headers
    settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
juga's avatar
juga committed
697
                               state['tor_version'])
698
699
700
    # To do not have to pass args and conf to RelayList, pass an extra
    # argument with the data_period
    measurements_period = conf.getint('general', 'data_period')
701
    rl = RelayList(args, conf, controller, measurements_period, state)
702
    cb = CB(args, conf, controller, rl)
juga's avatar
juga committed
703
    rd = ResultDump(args, conf)
704
    rp = RelayPrioritizer(args, conf, rl, rd)
705
706
    destinations, error_msg = DestinationList.from_config(
        conf, cb, rl, controller)
707
    if not destinations:
708
        fail_hard(error_msg)
Matt Traudt's avatar
Matt Traudt committed
709
    max_pending_results = conf.getint('scanner', 'measurement_threads')
Matt Traudt's avatar
Matt Traudt committed
710
    pool = Pool(max_pending_results)
juga's avatar
juga committed
711
    try:
712
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool)
juga's avatar
juga committed
713
714
715
    except KeyboardInterrupt:
        log.info("Interrupted by the user.")
        stop_threads(signal.SIGINT, None)
716
717
718
719
720
721
    # Any exception not catched at this point would make the scanner stall.
    # Log it and exit gracefully.
    except Exception as e:
        log.critical(FILLUP_TICKET_MSG)
        log.exception(e)
        stop_threads(signal.SIGTERM, None, 1)
Matt Traudt's avatar
Matt Traudt committed
722
723


Matt Traudt's avatar
Matt Traudt committed
724
def gen_parser(sub):
Matt Traudt's avatar
Matt Traudt committed
725
    d = 'The scanner side of sbws. This should be run on a well-connected '\
726
727
728
        'machine on the Internet with a healthy amount of spare bandwidth. '\
        'This continuously builds circuits, measures relays, and dumps '\
        'results into a datadir, commonly found in ~/.sbws'
Matt Traudt's avatar
Matt Traudt committed
729
    sub.add_parser('scanner', formatter_class=ArgumentDefaultsHelpFormatter,
730
                   description=d)
Matt Traudt's avatar
Matt Traudt committed
731
732


733
def main(args, conf):
Matt Traudt's avatar
Matt Traudt committed
734
    if conf.getint('scanner', 'measurement_threads') < 1:
735
        fail_hard('Number of measurement threads must be larger than 1')
736

737
738
739
740
741
742
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
    if max_dl < min_dl:
        fail_hard('Max download size %d cannot be smaller than min %d',
                  max_dl, min_dl)

743
    os.makedirs(conf.getpath('paths', 'datadir'), exist_ok=True)
744

745
    state = State(conf.getpath('paths', 'state_fname'))
746
    state['scanner_started'] = now_isodt_str()
747
748
749
    # Generate an unique identifier for each scanner
    if 'uuid' not in state:
        state['uuid'] = str(uuid.uuid4())
750

juga's avatar
juga committed
751
    run_speedtest(args, conf)