Commit fe16d6d8 authored by juga's avatar juga
Browse files

scanner: catch SIGINT in the main loop

also split main function into an extra main_loop function to be
able to stop the threads after they have started.
Also check end event in the mean loop and before starting to
measure a new relay.

Fixes bug #28869. Bugfix v0.1.0.
parent b85fb114
Loading
Loading
Loading
Loading
+90 −25
Original line number Diff line number Diff line
@@ -354,6 +354,88 @@ def result_putter_error(target):
    return closure


def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
              relay_prioritizer, destinations, max_pending_results, pool):
    """Starts and reuse the threads that measure the relays forever.

    It starts a loop that will be run while there is not and event signaling
    that sbws is stopping (because of SIGTERM or SIGINT).

    Then, it starts a second loop with an ordered list (generator) of relays
    to measure that might a subset of all the current relays in the Network.

    For every relay, it starts a new thread which runs ``measure_relay`` to
    measure the relay until there are ``max_pending_results`` threads.
    After that, it will reuse a thread that has finished for every relay to
    measure.
    It is the the pool method ``apply_async`` which starts or reuse a thread.
    This method returns an ``ApplyResult`` immediately, which has a ``ready``
    methods that tells whether the thread has finished or not.

    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
    queue and complete immediately.

    ``ResultDump`` thread (started before and out of this function) will get
    the ``Result`` from the queue and write it to disk, so this doesn't block
    the measurement threads.

    If there was an exception not catched by ``measure_relay``, it will call
    instead ``result_putter_error``, which logs the error and complete
    immediately.

    Before iterating over the next relay, it waits (non blocking, since it
    happens in the main thread) until one of the ``max_pending_results``
    threads has finished.

    This is not needed, since otherwise async_result will queue the relays to
    measure in order and won't start reusing a thread to measure a relay until
    other thread has finished. But it makes the logic a bit more sequential.

    Before the outer loop iterates, it also waits (again non blocking) that all
    the ``Results`` are ready.
    This avoid to start measuring the same relay which might still being
    measured.

    """
    pending_results = []
    # Do not start a new loop if sbws is stopping.
    while not settings.end_event.is_set():
        log.debug("Starting a new measurement loop.")
        num_relays = 0
        loop_tstart = time.time()
        for target in relay_prioritizer.best_priority():
            # Don't start measuring a relay if sbws is stopping.
            if settings.end_event.is_set():
                break
            num_relays += 1
            log.debug('Measuring %s %s', target.nickname,
                      target.fingerprint[0:8])
            # callback and callback_err must be non-blocking
            callback = result_putter(result_dump)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
                dispatch_worker_thread,
                [args, conf, destinations, circuit_builder, relay_list,
                 target], {}, callback, callback_err)
            pending_results.append(async_result)
            # Instead of letting apply_async to queue the relays in order until
            # a thread has finished, wait here until a thread has finished.
            while len(pending_results) >= max_pending_results:
                # sleep is non-blocking sine happens in the main process
                time.sleep(5)
                pending_results = [r for r in pending_results if not r.ready()]
        while len(pending_results) > 0:
            log.debug("There are %s pending measurements.",
                      len(pending_results))
            # sleep is non-blocking sine happens in the main process
            time.sleep(5)
            pending_results = [r for r in pending_results if not r.ready()]
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
        log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)


def run_speedtest(args, conf):
    global rd, pool, controller
    controller, _ = stem_utils.init_controller(
@@ -382,31 +464,14 @@ def run_speedtest(args, conf):
        fail_hard(error_msg)
    max_pending_results = conf.getint('scanner', 'measurement_threads')
    pool = Pool(max_pending_results)
    pending_results = []
    while True:
        num_relays = 0
        loop_tstart = time.time()
        log.info("Starting a new loop to measure relays.")
        for target in rp.best_priority():
            num_relays += 1
            log.debug('Measuring %s %s', target.nickname,
                      target.fingerprint[0:8])
            callback = result_putter(rd)
            callback_err = result_putter_error(target)
            async_result = pool.apply_async(
                dispatch_worker_thread,
                [args, conf, destinations, cb, rl, target],
                {}, callback, callback_err)
            pending_results.append(async_result)
            while len(pending_results) >= max_pending_results:
                time.sleep(5)
                pending_results = [r for r in pending_results if not r.ready()]
        while len(pending_results) > 0:
            time.sleep(5)
            pending_results = [r for r in pending_results if not r.ready()]
        loop_tstop = time.time()
        loop_tdelta = (loop_tstop - loop_tstart) / 60
        log.info("Measured %s relays in %s minutes", num_relays, loop_tdelta)

    try:
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations,
                  max_pending_results, pool)
    except KeyboardInterrupt:
        log.info("Interrupted by the user.")
    finally:
        stop_threads(signal.SIGINT, None)


def gen_parser(sub):