Commit 3653396c authored by juga's avatar juga
Browse files

fix: scanner: Increase time getting measurements

- Increase the time waiting for the last measurements queued, to avoid
  canceling unfinished measurements and gc maybe not releasing thread
  variables
- Use the already declared global pool instead of passing it by args
- Log more information when the last measuremetns timeout

Closes: #40087
parent f3c07f29
Loading
Loading
Loading
Loading
+25 −5
Original line number Diff line number Diff line
@@ -15,7 +15,12 @@ from multiprocessing.dummy import Pool

import sbws.util.requests as requests_utils
import sbws.util.stem as stem_utils
from sbws.globals import HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS, fail_hard
from sbws.globals import (
    HTTP_GET_HEADERS,
    SOCKET_TIMEOUT,
    TIMEOUT_MEASUREMENTS,
    fail_hard,
)

from .. import settings
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
@@ -77,7 +82,7 @@ def dumpstacks():
        log.critical(
            "Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id
        )
        log.critical(traceback.format_stack("".join(stack)))
        log.critical("Traceback: %s", "".join(traceback.format_stack(stack)))
    # If logging level is less than DEBUG (more verbose), start pdb so that
    # developers can debug the issue.
    if log.getEffectiveLevel() < logging.DEBUG:
@@ -660,7 +665,6 @@ def main_loop(
    result_dump,
    relay_prioritizer,
    destinations,
    pool,
):
    """Starts and reuse the threads that measure the relays forever.

@@ -696,6 +700,7 @@ def main_loop(
    measured.

    """
    global pool
    log.info("Started the main loop to measure the relays.")
    hbeat = Heartbeat(conf.getpath("paths", "state_fname"))

@@ -743,6 +748,7 @@ def main_loop(
            # Register this measurement to the heartbeat module
            hbeat.register_measured_fpr(target.fingerprint)

        log.debug("Measurements queued.")
        # After the for has finished, the pool has queued all the relays
        # and pending_results has the list of all the AsyncResults.
        # It could also be obtained with pool._cache, which contains
@@ -815,6 +821,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
            len(pending_results),
            num_relays_to_measure,
        )
        log.info("Last measured: %s", num_last_measured)
        time.sleep(TIMEOUT_MEASUREMENTS)
        old_pending_results = pending_results
        pending_results = [r for r in pending_results if not r.ready()]
@@ -836,15 +843,28 @@ def force_get_results(pending_results):
    ``get`` is not call before, because it blocks and the callbacks
    are not call.
    """
    global pool
    log.debug("Forcing get")
    # In case there are no finished AsyncResults, print the cache here
    # at level info so that is visible even if debug is not enabled.
    log.info("Pool cache %s", pool._cache)
    for r in pending_results:
        try:
            result = r.get(timeout=0.1)
            # HTTP timeout is 10
            result = r.get(timeout=SOCKET_TIMEOUT + 10)
            log.warning("Result %s was not stored, it took too long.", result)
        # TimeoutError is raised when the result is not ready, ie. has not
        # been processed yet
        except TimeoutError:
            log.warning("A result was not stored, it was not ready.")
            # This is the only place where using psutil so far.
            import psutil

            log.warning(psutil.Process(os.getpid()).memory_full_info())
            virtualMemoryInfo = psutil.virtual_memory()
            availableMemory = virtualMemoryInfo.available
            log.warning("Memory available %s MB.", availableMemory / 1024 ** 2)
            dumpstacks()
        # If the result raised an exception, `get` returns it,
        # then log any exception so that it can be fixed.
        # This should not happen, since `callback_err` would have been call
@@ -910,7 +930,7 @@ def run_speedtest(args, conf):
    max_pending_results = conf.getint("scanner", "measurement_threads")
    pool = Pool(max_pending_results)
    try:
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool)
        main_loop(args, conf, controller, rl, cb, rd, rp, destinations)
    except KeyboardInterrupt:
        log.info("Interrupted by the user.")
        stop_threads(signal.SIGINT, None)
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ include_package_data = True
# See stable releases at https://www.python.org/downloads/
python_requires = >= 3.6
install_requires =
    psutil >= 5.5
    stem >= 1.7.0
    ; # Now versioneer is also needed as dependency
    versioneer