GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

scanner.py 31 KB
Newer Older
1
''' Measure the relays. '''
2
import queue
3

4
import signal
5 6
import sys
import threading
juga  's avatar
juga committed
7
import traceback
8
import uuid
9

10 11
from multiprocessing.context import TimeoutError

Matt Traudt's avatar
Matt Traudt committed
12 13
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
14 15
from ..lib.resultdump import (
    ResultSuccess, ResultErrorCircuit, ResultErrorStream,
16
    ResultErrorSecondRelay,  ResultError, ResultErrorDestination
17
    )
Matt Traudt's avatar
Matt Traudt committed
18
from ..lib.relaylist import RelayList
19
from ..lib.relayprioritizer import RelayPrioritizer
20 21
from ..lib.destination import (DestinationList,
                               connect_to_destination_over_circuit)
22
from ..util.timestamp import now_isodt_str
23
from ..util.state import State
juga  's avatar
juga committed
24
from sbws.globals import fail_hard, HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS
Matt Traudt's avatar
Matt Traudt committed
25
import sbws.util.stem as stem_utils
26
import sbws.util.requests as requests_utils
Matt Traudt's avatar
Matt Traudt committed
27 28 29
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
import time
Matt Traudt's avatar
Matt Traudt committed
30
import os
31
import logging
32
import random
Matt Traudt's avatar
Matt Traudt committed
33

juga  's avatar
juga committed
34
from .. import settings
35
from ..lib.heartbeat import Heartbeat
36

37
rng = random.SystemRandom()
38
log = logging.getLogger(__name__)
39 40 41 42 43
# Declare the objects that manage the threads global so that sbws can exit
# gracefully at any time.
pool = None
rd = None
controller = None
Matt Traudt's avatar
Matt Traudt committed
44

45 46 47
FILLUP_TICKET_MSG = """Something went wrong.
Please create a ticket in https://trac.torproject.org with this traceback."""

48

49
def stop_threads(signal, frame, exit_code=0):
50 51 52 53 54 55 56 57 58 59 60
    global rd, pool
    log.debug('Stopping sbws.')
    # Avoid new threads to start.
    settings.set_end_event()
    # Stop Pool threads
    pool.close()
    pool.join()
    # Stop ResultDump thread
    rd.thread.join()
    # Stop Tor thread
    controller.close()
61
    sys.exit(exit_code)
62 63 64


signal.signal(signal.SIGTERM, stop_threads)
Matt Traudt's avatar
Matt Traudt committed
65

66

67
def dumpstacks():
68
    log.critical(FILLUP_TICKET_MSG)
69 70 71 72
    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
    for thread_id, stack in sys._current_frames().items():
        log.critical("Thread: %s(%d)",
                     thread_id2name.get(thread_id, ""), thread_id)
73
        log.critical(traceback.format_stack("".join(stack)))
74 75 76 77 78 79 80 81 82
    # If logging level is less than DEBUG (more verbose), start pdb so that
    # developers can debug the issue.
    if log.getEffectiveLevel() < logging.DEBUG:
        import pdb
        pdb.set_trace()
    # Otherwise exit.
    else:
        # Change to stop threads when #28869 is merged
        sys.exit(1)
83 84


85 86 87
def timed_recv_from_server(session, dest, byte_range):
    ''' Request the **byte_range** from the URL at **dest**. If successful,
    return True and the time it took to download. Otherwise return False and an
88
    exception. '''
89

90
    start_time = time.time()
91
    HTTP_GET_HEADERS['Range'] = byte_range
juga  's avatar
juga committed
92 93 94 95 96
    # - response.elapsed "measures the time taken between sending the first
    #   byte of the request and finishing parsing the headers.
    #   It is therefore unaffected by consuming the response content"
    #   If this mean that the content has arrived, elapsed could be used to
    #   know the time it took.
97
    try:
98
        # headers are merged with the session ones, not overwritten.
99
        session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
100 101 102 103 104
    # All `requests` exceptions could be caught with
    # `requests.exceptions.RequestException`, but it seems that `requests`
    # does not catch all the ssl exceptions and urllib3 doesn't seem to have
    # a base exception class.
    except Exception as e:
105
        log.debug(e)
Matt Traudt's avatar
Matt Traudt committed
106
        return False, e
107
    end_time = time.time()
108
    return True, end_time - start_time
109 110


111 112 113 114 115 116 117 118 119 120 121 122
def get_random_range_string(content_length, size):
    '''
    Return a random range of bytes of length **size**. **content_length** is
    the size of the file we will be requesting a range of bytes from.

    For example, for content_length of 100 and size 10, this function will
    return one of the following: '0-9', '1-10', '2-11', [...] '89-98', '90-99'
    '''
    assert size <= content_length
    # start can be anywhere in the content_length as long as it is **size**
    # bytes away from the end or more. Because range is [start, end) (doesn't
    # include the end value), add 1 to the end.
123
    start = rng.choice(range(0, content_length - size + 1))
124 125 126 127 128 129 130 131 132
    # Unlike range, the byte range in an http header is [start, end] (does
    # include the end value), so we subtract one
    end = start + size - 1
    # start and end are indexes, while content_length is a length, therefore,
    # the largest index end should ever be should be less than the total length
    # of the content. For example, if content_length is 10, end could be
    # anywhere from 0 to 9.
    assert end < content_length
    return 'bytes={}-{}'.format(start, end)
133 134


135 136 137 138
def measure_rtt_to_server(session, conf, dest, content_length):
    ''' Make multiple end-to-end RTT measurements by making small HTTP requests
    over a circuit + stream that should already exist, persist, and not need
    rebuilding. If something goes wrong and not all of the RTT measurements can
139 140 141 142 143 144
    be made, return None. Otherwise return a list of the RTTs (in seconds).

    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    '''
Matt Traudt's avatar
Matt Traudt committed
145
    rtts = []
146
    size = conf.getint('scanner', 'min_download_size')
Matt Traudt's avatar
Matt Traudt committed
147
    for _ in range(0, conf.getint('scanner', 'num_rtts')):
148
        log.debug('Measuring RTT to %s', dest.url)
149
        random_range = get_random_range_string(content_length, size)
150 151
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
152
            # data is an exception
juga  's avatar
juga committed
153 154 155
            log.debug('While measuring the RTT to %s we hit an exception '
                      '(does the webserver support Range requests?): %s',
                      dest.url, data)
156
            return None, data
157 158 159 160
        assert success
        # data is an RTT
        assert isinstance(data, float) or isinstance(data, int)
        rtts.append(data)
161
    return rtts, None
Matt Traudt's avatar
Matt Traudt committed
162 163


Matt Traudt's avatar
Matt Traudt committed
164
def measure_bandwidth_to_server(session, conf, dest, content_length):
165 166 167 168 169
    """
    :returns tuple: results or None if the if the measurement fail.
        None or exception if the measurement fail.

    """
170
    results = []
Matt Traudt's avatar
Matt Traudt committed
171
    num_downloads = conf.getint('scanner', 'num_downloads')
Matt Traudt's avatar
Matt Traudt committed
172 173 174
    expected_amount = conf.getint('scanner', 'initial_read_request')
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
175
    download_times = {
Matt Traudt's avatar
Matt Traudt committed
176 177 178 179
        'toofast': conf.getfloat('scanner', 'download_toofast'),
        'min': conf.getfloat('scanner', 'download_min'),
        'target': conf.getfloat('scanner', 'download_target'),
        'max': conf.getfloat('scanner', 'download_max'),
180
    }
juga  's avatar
juga committed
181
    while len(results) < num_downloads and not settings.end_event.is_set():
Matt Traudt's avatar
Matt Traudt committed
182 183 184 185 186
        assert expected_amount >= min_dl
        assert expected_amount <= max_dl
        random_range = get_random_range_string(content_length, expected_amount)
        success, data = timed_recv_from_server(session, dest, random_range)
        if not success:
187
            # data is an exception
juga  's avatar
juga committed
188 189 190
            log.debug('While measuring the bandwidth to %s we hit an '
                      'exception (does the webserver support Range '
                      'requests?): %s', dest.url, data)
191
            return None, data
Matt Traudt's avatar
Matt Traudt committed
192 193 194
        assert success
        # data is a download time
        assert isinstance(data, float) or isinstance(data, int)
195
        if _should_keep_result(
Matt Traudt's avatar
Matt Traudt committed
196
                expected_amount == max_dl, data, download_times):
197
            results.append({
Matt Traudt's avatar
Matt Traudt committed
198
                'duration': data, 'amount': expected_amount})
199
        expected_amount = _next_expected_amount(
200
            expected_amount, data, download_times, min_dl, max_dl)
201
    return results, None
Matt Traudt's avatar
Matt Traudt committed
202 203


204 205 206 207 208 209
def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
    '''
    Sbws builds two hop circuits. Given the **relay** to measure with
    destination **dest**, pick a second relay that is or is not an exit
    according to **is_exit**.
    '''
210 211
    candidates = rl.exits_not_bad_allowing_port(dest.port) if is_exit \
        else rl.non_exits
212 213 214 215 216
    if not len(candidates):
        return None
    log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
              relay.nickname, len(candidates), is_exit)
    for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
juga  's avatar
juga committed
217
        min_bw = relay.consensus_bandwidth * min_bw_factor
218 219 220 221 222 223 224 225
        new_candidates = stem_utils.only_relays_with_bandwidth(
            cont, candidates, min_bw=min_bw)
        if len(new_candidates) > 0:
            chosen = rng.choice(new_candidates)
            log.debug(
                'Found %d candidate 2nd hops with at least %sx the bandwidth '
                'of %s. Returning %s (bw=%s).',
                len(new_candidates), min_bw_factor, relay.nickname,
juga  's avatar
juga committed
226
                chosen.nickname, chosen.consensus_bandwidth)
227
            return chosen
228 229
    candidates = sorted(candidates, key=lambda r: r.consensus_bandwidth,
                        reverse=True)
230 231 232 233
    chosen = candidates[0]
    log.debug(
        'Didn\'t find any 2nd hops at least as fast as %s (bw=%s). It\'s '
        'probably really fast. Returning %s (bw=%s), the fastest '
juga  's avatar
juga committed
234 235
        'candidate we have.', relay.nickname, relay.consensus_bandwidth,
        chosen.nickname, chosen.consensus_bandwidth)
236 237 238
    return chosen


239
def measure_relay(args, conf, destinations, cb, rl, relay):
240 241 242 243 244 245 246
    """
    Select a Web server, a relay to build the circuit,
    build the circuit and measure the bandwidth of the given relay.

    :return Result: a measurement Result object

    """
247
    log.debug('Measuring %s %s', relay.nickname, relay.fingerprint)
248
    our_nick = conf['scanner']['nickname']
Matt Traudt's avatar
Matt Traudt committed
249 250
    s = requests_utils.make_session(
        cb.controller, conf.getfloat('general', 'http_timeout'))
251 252
    # Probably because the scanner is stopping.
    if s is None:
253 254 255 256 257 258 259 260 261 262 263
        if settings.end_event.is_set():
            return None
        else:
            # In future refactor this should be returned from the make_session
            reason = "Unable to get proxies."
            log.debug(reason + ' to measure %s %s',
                      relay.nickname, relay.fingerprint)
            return [
                ResultError(relay, [], '', our_nick,
                            msg=reason),
                ]
264
    # Pick a destionation
265
    dest = destinations.next()
266
    # When there're no any functional destinations.
267
    if not dest:
268 269 270 271 272 273 274 275
        # NOTE: When there're still functional destinations but only one of
        # them fail, the error will be included in `ResultErrorStream`.
        # Since this is being executed in a thread, the scanner can not
        # be stop here, but the `end_event` signal can be set so that the
        # main thread stop the scanner.
        # It might be useful to store the fact that the destinations fail,
        # so store here the error, and set the signal once the error is stored
        # (in `resultump`).
276 277 278
        log.critical("There are not any functional destinations.\n"
                     "It is recommended to set several destinations so that "
                     "the scanner can continue if one fails.")
279 280 281 282 283 284
        reason = "No functional destinations"
        # Resultdump will set end_event after storing the error
        return [
            ResultErrorDestination(relay, [], '', our_nick, msg=reason),
        ]

285 286 287 288
    # Pick a relay to help us measure the given relay. If the given relay is an
    # exit, then pick a non-exit. Otherwise pick an exit.
    helper = None
    circ_fps = None
289
    if relay.is_exit_not_bad_allowing_port(dest.port):
290 291 292 293
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=False)
        if helper:
            circ_fps = [helper.fingerprint, relay.fingerprint]
294 295
            # stored for debugging
            nicknames = [helper.nickname, relay.nickname]
296 297 298 299 300
    else:
        helper = _pick_ideal_second_hop(
            relay, dest, rl, cb.controller, is_exit=True)
        if helper:
            circ_fps = [relay.fingerprint, helper.fingerprint]
301
            nicknames = [relay.nickname, helper.nickname]
302
    if not helper:
303 304
        reason = 'Unable to select a second relay'
        log.debug(reason + ' to help measure %s (%s)',
juga  's avatar
juga committed
305
                  relay.fingerprint, relay.nickname)
306 307 308 309 310
        return [
            ResultErrorSecondRelay(relay, [], dest.url, our_nick,
                                   msg=reason),
            ]

311
    # Build the circuit
312
    circ_id, reason = cb.build_circuit(circ_fps)
313
    if not circ_id:
314 315
        log.debug('Could not build circuit with path %s (%s): %s ',
                  circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
316
        return [
317 318
            ResultErrorCircuit(relay, circ_fps, dest.url, our_nick,
                               msg=reason),
Matt Traudt's avatar
Matt Traudt committed
319
        ]
320 321
    log.debug('Built circuit with path %s (%s) to measure %s (%s)',
              circ_fps, nicknames, relay.fingerprint, relay.nickname)
322 323 324
    # Make a connection to the destination
    is_usable, usable_data = connect_to_destination_over_circuit(
        dest, circ_id, s, cb.controller, dest._max_dl)
325
    if not is_usable:
326 327
        log.debug('Destination %s unusable via circuit %s (%s), %s',
                  dest.url, circ_fps, nicknames, usable_data)
328
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
329
        return [
330 331
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=usable_data),
Matt Traudt's avatar
Matt Traudt committed
332
        ]
333 334
    assert is_usable
    assert 'content_length' in usable_data
335
    # FIRST: measure RTT
336 337
    rtts, reason = measure_rtt_to_server(s, conf, dest,
                                         usable_data['content_length'])
338
    if rtts is None:
339 340 341
        log.debug('Unable to measure RTT for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
342
        cb.close_circuit(circ_id)
Matt Traudt's avatar
Matt Traudt committed
343
        return [
344 345
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
346
        ]
347
    # SECOND: measure bandwidth
348
    bw_results, reason = measure_bandwidth_to_server(
349
        s, conf, dest, usable_data['content_length'])
Matt Traudt's avatar
Matt Traudt committed
350
    if bw_results is None:
351 352 353
        log.debug('Unable to measure bandwidth for %s (%s) to %s via circuit '
                  '%s (%s): %s', relay.fingerprint, relay.nickname,
                  dest.url, circ_fps, nicknames, reason)
Matt Traudt's avatar
Matt Traudt committed
354 355
        cb.close_circuit(circ_id)
        return [
356 357
            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
                              msg=str(reason)),
Matt Traudt's avatar
Matt Traudt committed
358
        ]
Matt Traudt's avatar
Matt Traudt committed
359
    cb.close_circuit(circ_id)
360
    # Finally: store result
361
    log.debug('Success measurement for %s (%s) via circuit %s (%s) to %s',
juga  's avatar
juga committed
362
              relay.fingerprint, relay.nickname, circ_fps, nicknames, dest.url)
363 364 365
    return [
        ResultSuccess(rtts, bw_results, relay, circ_fps, dest.url, our_nick),
    ]
Matt Traudt's avatar
Matt Traudt committed
366

367

368
def dispatch_worker_thread(*a, **kw):
369 370 371 372 373 374 375 376 377 378 379 380 381
    # If at the point where the relay is actually going to be measured there
    # are not any functional destinations or the `end_event` is set, do not
    # try to start measuring the relay, since it will fail anyway.
    try:
        # a[2] is the argument `destinations`
        functional_destinations = a[2].functional_destinations
    # In case the arguments or the method change, catch the possible exceptions
    # but ignore here that there are not destinations.
    except (IndexError, TypeError):
        log.debug("Wrong argument or attribute.")
        functional_destinations = True
    if not functional_destinations or settings.end_event.is_set():
        return None
382
    return measure_relay(*a, **kw)
383 384


385 386 387 388 389 390 391 392 393 394 395 396 397 398
def _should_keep_result(did_request_maximum, result_time, download_times):
    # In the normal case, we didn't ask for the maximum allowed amount. So we
    # should only allow ourselves to keep results that are between the min and
    # max allowed time
    if not did_request_maximum and \
            result_time >= download_times['min'] and \
            result_time < download_times['max']:
        return True
    # If we did request the maximum amount, we should keep the result as long
    # as it took less than the maximum amount of time
    if did_request_maximum and \
            result_time < download_times['max']:
        return True
    # In all other cases, return false
399
    log.debug('Not keeping result time %f.%s', result_time,
Matt Traudt's avatar
Matt Traudt committed
400 401
              '' if not did_request_maximum else ' We requested the maximum '
              'amount allowed.')
402 403 404
    return False


405 406
def _next_expected_amount(expected_amount, result_time, download_times,
                          min_dl, max_dl):
407 408 409 410 411 412 413 414 415 416
    if result_time < download_times['toofast']:
        # Way too fast, greatly increase the amount we ask for
        expected_amount = int(expected_amount * 5)
    elif result_time < download_times['min'] or \
            result_time >= download_times['max']:
        # As long as the result is between min/max, keep the expected amount
        # the same. Otherwise, adjust so we are aiming for the target amount.
        expected_amount = int(
            expected_amount * download_times['target'] / result_time)
    # Make sure we don't request too much or too little
417 418
    expected_amount = max(min_dl, expected_amount)
    expected_amount = min(max_dl, expected_amount)
419 420 421
    return expected_amount


Matt Traudt's avatar
Matt Traudt committed
422
def result_putter(result_dump):
Matt Traudt's avatar
Matt Traudt committed
423 424
    ''' Create a function that takes a single argument -- the measurement
    result -- and return that function so it can be used by someone else '''
425

426
    def closure(measurement_result):
427 428
        # Since result_dump thread is calling queue.get() every second,
        # the queue should be full for only 1 second.
429 430 431 432 433 434 435 436 437 438 439
        # This call blocks at maximum timeout seconds.
        try:
            result_dump.queue.put(measurement_result, timeout=3)
        except queue.Full:
            # The result would be lost, the scanner will continue working.
            log.warning(
                "The queue with measurements is full, when adding %s.\n"
                "It is possible that the thread that get them to "
                "write them to the disk (ResultDump.enter) is stalled.",
                measurement_result
                )
Matt Traudt's avatar
Matt Traudt committed
440 441
    return closure

Matt Traudt's avatar
Matt Traudt committed
442

443
def result_putter_error(target):
Matt Traudt's avatar
Matt Traudt committed
444 445 446
    ''' Create a function that takes a single argument -- an error from a
    measurement -- and return that function so it can be used by someone else
    '''
447
    def closure(object):
448 449
        if settings.end_event.is_set():
            return
450 451
        # The only object that can be here if there is not any uncatched
        # exception is stem.SocketClosed when stopping sbws
juga  's avatar
juga committed
452 453
        # An exception here means that the worker thread finished.
        log.warning(FILLUP_TICKET_MSG)
454 455 456 457 458
        # To print the traceback that happened in the thread, not here in
        # the main process.
        log.warning("".join(traceback.format_exception(
            type(object), object, object.__traceback__))
            )
459
    return closure
460 461


462
def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
463
              relay_prioritizer, destinations, pool):
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
    """Starts and reuse the threads that measure the relays forever.

    It starts a loop that will be run while there is not and event signaling
    that sbws is stopping (because of SIGTERM or SIGINT).

    Then, it starts a second loop with an ordered list (generator) of relays
    to measure that might a subset of all the current relays in the Network.

    For every relay, it starts a new thread which runs ``measure_relay`` to
    measure the relay until there are ``max_pending_results`` threads.
    After that, it will reuse a thread that has finished for every relay to
    measure.
    It is the the pool method ``apply_async`` which starts or reuse a thread.
    This method returns an ``ApplyResult`` immediately, which has a ``ready``
    methods that tells whether the thread has finished or not.

    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
    queue and complete immediately.

    ``ResultDump`` thread (started before and out of this function) will get
    the ``Result`` from the queue and write it to disk, so this doesn't block
    the measurement threads.

    If there was an exception not catched by ``measure_relay``, it will call
    instead ``result_putter_error``, which logs the error and complete
    immediately.

492 493
    Before the outer loop iterates, it waits (non blocking) that all
    the ``Results`` are ready calling ``wait_for_results``.
494 495 496 497
    This avoid to start measuring the same relay which might still being
    measured.

    """
498
    hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
499

500 501
    # Set the time to wait for a thread to finish as the half of an HTTP
    # request timeout.
502 503 504 505
    # Do not start a new loop if sbws is stopping.
    while not settings.end_event.is_set():
        log.debug("Starting a new measurement loop.")
        num_relays = 0
506 507 508
        # Since loop might finish before pending_results is 0 due waiting too
        # long, set it here and not outside the loop.
        pending_results = []
509
        loop_tstart = time.time()
510 511 512 513

        # Register relay fingerprints to the heartbeat module
        hbeat.register_consensus_fprs(relay_list.relays_fingerprints)

514 515 516 517
        for target in relay_prioritizer.best_priority():
            # Don't start measuring a relay if sbws is stopping.
            if settings.end_event.is_set():
                break
518 519
            relay_list.increment_recent_measurement_attempt_count()
            target.increment_relay_recent_measurement_attempt_count()
520 521 522 523 524 525 526 527 528
            num_relays += 1
            # callback and callback_err must be non-blocking
            callback = result_putter(result_dump)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
                dispatch_worker_thread,
                [args, conf, destinations, circuit_builder, relay_list,
                 target], {}, callback, callback_err)
            pending_results.append(async_result)
529

530 531 532
            # Register this measurement to the heartbeat module
            hbeat.register_measured_fpr(target.fingerprint)

533 534 535 536 537 538 539
        # After the for has finished, the pool has queued all the relays
        # and pending_results has the list of all the AsyncResults.
        # It could also be obtained with pool._cache, which contains
        # a dictionary with AsyncResults as items.
        num_relays_to_measure = len(pending_results)
        wait_for_results(num_relays_to_measure, pending_results)

540 541
        # Print the heartbeat message
        hbeat.print_heartbeat_message()
542

543 544
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
545 546 547 548
        # At this point, we know the relays that were queued to be measured.
        # That does not mean they were actually measured.
        log.debug("Attempted to measure %s relays in %s minutes",
                  num_relays, loop_tdelta)
juga  's avatar
juga committed
549 550 551 552
        # In a testing network, exit after first loop
        if controller.get_conf('TestingTorNetwork') == '1':
            log.info("In a testing network, exiting after the first loop.")
            # Threads should be closed nicely in some refactor
553 554
            stop_threads(signal.SIGTERM, None)

555

556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
def wait_for_results(num_relays_to_measure, pending_results):
    """Wait for the pool to finish and log progress.

    While there are relays being measured, just log the progress
    and sleep :const:`~sbws.globals.TIMEOUT_MEASUREMENTS` (3mins),
    which is aproximately the time it can take to measure a relay in
    the worst case.

    When there has not been any relay measured in ``TIMEOUT_MEASUREMENTS``
    and there are still relays pending to be measured, it means there is no
    progress and call :func:`~sbws.core.scanner.force_get_results`.

    This can happen in the case of a bug that makes either
    :func:`~sbws.core.scanner.measure_relay`,
    :func:`~sbws.core.scanner.result_putter` (callback) and/or
    :func:`~sbws.core.scanner.result_putter_error` (callback error) stall.

    .. note:: in a future refactor, this could be simpler by:

      1. Initializing the pool at the begingging of each loop
      2. Callling :meth:`~Pool.close`; :meth:`~Pool.join` after
         :meth:`~Pool.apply_async`,
         to ensure no new jobs are added until the pool has finished with all
         the ones in the queue.

      As currently, there would be still two cases when the pool could stall:

      1. There's an exception in ``measure_relay`` and another in
         ``callback_err``
      2. There's an exception ``callback``.

      This could also be simpler by not having callback and callback error in
      ``apply_async`` and instead just calling callback with the
      ``pending_results``.

      (callback could be also simpler by not having a thread and queue and
      just storing to disk, since the time to write to disk is way smaller
      than the time to request over the network.)
    """
    num_last_measured = 1
596
    while num_last_measured > 0 and not settings.end_event.is_set():
597 598 599 600
        log.info("Pending measurements: %s out of %s: ",
                 len(pending_results), num_relays_to_measure)
        time.sleep(TIMEOUT_MEASUREMENTS)
        old_pending_results = pending_results
601
        pending_results = [r for r in pending_results if not r.ready()]
602 603 604
        num_last_measured = len(old_pending_results) - len(pending_results)
    if len(pending_results) > 0:
        force_get_results(pending_results)
605 606


607
def force_get_results(pending_results):
608
    """Try to get either the result or an exception, which gets logged.
609

610 611 612 613 614
    It is call by :func:`~sbws.core.scanner.wait_for_results` when
    the time waiting for the results was long.

    To get either the :class:`~sbws.lib.resultdump.Result` or an exception,
    call :meth:`~AsyncResult.get` with timeout.
615
    Timeout is low since we already waited.
616 617 618

    ``get`` is not call before, because it blocks and the callbacks
    are not call.
619
    """
620
    log.debug("Forcing get")
621 622 623 624 625 626 627 628 629
    for r in pending_results:
        try:
            result = r.get(timeout=0.1)
            log.warning("Result %s was not stored, it took too long.",
                        result)
        # TimeoutError is raised when the result is not ready, ie. has not
        # been processed yet
        except TimeoutError:
            log.warning("A result was not stored, it was not ready.")
630 631 632 633
        # If the result raised an exception, `get` returns it,
        # then log any exception so that it can be fixed.
        # This should not happen, since `callback_err` would have been call
        # first.
634 635
        except Exception as e:
            log.critical(FILLUP_TICKET_MSG)
636 637
            # If the exception happened in the threads, `log.exception` does
            # not have the traceback.
638 639 640 641
            # Using `format_exception` instead of of `print_exception` to show
            # the traceback in all the log handlers.
            log.warning("".join(traceback.format_exception(
                        type(e), e, e.__traceback__)))
642 643


644
def run_speedtest(args, conf):
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
    """Initializes all the data and threads needed to measure the relays.

    It launches or connect to Tor in a thread.
    It initializes the list of relays seen in the Tor network.
    It starts a thread to read the previous measurements and wait for new
    measurements to write them to the disk.
    It initializes a class that will be used to order the relays depending
    on their measurements age.
    It initializes the list of destinations that will be used for the
    measurements.
    It initializes the thread pool that will launch the measurement threads.
    The pool starts 3 other threads that are not the measurement (worker)
    threads.
    Finally, it calls the function that will manage the measurement threads.

    """
661
    global rd, pool, controller
662
    controller, _ = stem_utils.init_controller(
663
        path=conf.getpath('tor', 'control_socket'))
664 665 666 667 668 669 670 671 672 673 674
    if not controller:
        controller = stem_utils.launch_tor(conf)
    else:
        log.warning(
            'Is sbws already running? '
            'We found an existing Tor process at %s. We are not going to '
            'launch Tor, nor are we going to try to configure it to behave '
            'like we expect. This might work okay, but it also might not. '
            'If you experience problems, you should try letting sbws launch '
            'Tor for itself. The ability to use an already running Tor only '
            'exists for sbws developers. It is expected to be broken and may '
675 676
            'even lead to messed up results.',
            conf.getpath('tor', 'control_socket'))
677
        time.sleep(15)
678 679 680 681

    # When there will be a refactor where conf is global, this can be removed
    # from here.
    state = State(conf.getpath('paths', 'state_fname'))
682 683 684 685 686
    # XXX: tech-debt: create new function to obtain the controller and to
    # write the state, so that a unit test to check the state tor version can
    # be created
    # Store tor version whenever the scanner starts.
    state['tor_version'] = str(controller.get_version())
687 688
    # Call only once to initialize http_headers
    settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
689
                               state['tor_version'])
690 691 692
    # To do not have to pass args and conf to RelayList, pass an extra
    # argument with the data_period
    measurements_period = conf.getint('general', 'data_period')
693
    rl = RelayList(args, conf, controller, measurements_period, state)
694
    cb = CB(args, conf, controller, rl)
juga  's avatar
juga committed
695
    rd = ResultDump(args, conf)
696
    rp = RelayPrioritizer(args, conf, rl, rd)
697 698
    destinations, error_msg = DestinationList.from_config(
        conf, cb, rl, controller)
699
    if not destinations:
700
        fail_hard(error_msg)
Matt Traudt's avatar
Matt Traudt committed
701
    max_pending_results = conf.getint('scanner', 'measurement_threads')
Matt Traudt's avatar
Matt Traudt committed
702
    pool = Pool(max_pending_results)
703
    try:
704
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool)
705 706 707
    except KeyboardInterrupt:
        log.info("Interrupted by the user.")
        stop_threads(signal.SIGINT, None)
708 709 710 711 712 713
    # Any exception not catched at this point would make the scanner stall.
    # Log it and exit gracefully.
    except Exception as e:
        log.critical(FILLUP_TICKET_MSG)
        log.exception(e)
        stop_threads(signal.SIGTERM, None, 1)
Matt Traudt's avatar
Matt Traudt committed
714 715


Matt Traudt's avatar
Matt Traudt committed
716
def gen_parser(sub):
Matt Traudt's avatar
Matt Traudt committed
717
    d = 'The scanner side of sbws. This should be run on a well-connected '\
718 719 720
        'machine on the Internet with a healthy amount of spare bandwidth. '\
        'This continuously builds circuits, measures relays, and dumps '\
        'results into a datadir, commonly found in ~/.sbws'
Matt Traudt's avatar
Matt Traudt committed
721
    sub.add_parser('scanner', formatter_class=ArgumentDefaultsHelpFormatter,
722
                   description=d)
Matt Traudt's avatar
Matt Traudt committed
723 724


725
def main(args, conf):
Matt Traudt's avatar
Matt Traudt committed
726
    if conf.getint('scanner', 'measurement_threads') < 1:
727
        fail_hard('Number of measurement threads must be larger than 1')
728

729 730 731 732 733 734
    min_dl = conf.getint('scanner', 'min_download_size')
    max_dl = conf.getint('scanner', 'max_download_size')
    if max_dl < min_dl:
        fail_hard('Max download size %d cannot be smaller than min %d',
                  max_dl, min_dl)

735
    os.makedirs(conf.getpath('paths', 'datadir'), exist_ok=True)
736

737
    state = State(conf.getpath('paths', 'state_fname'))
738
    state['scanner_started'] = now_isodt_str()
739 740 741
    # Generate an unique identifier for each scanner
    if 'uuid' not in state:
        state['uuid'] = str(uuid.uuid4())
742

juga  's avatar
juga committed
743
    run_speedtest(args, conf)