Commit 149e236e authored by juga's avatar juga
Browse files

Merge branch 'bug28869_squashed'

Solved merge conflicts in sbws/ and sbws/core/
after merging bug28741
parents c523619e c24b2354
__version__ = '1.0.3-dev0'
import threading # noqa
from . import globals # noqa
class Settings:
"""Singleton settings for all the packages.
This way change settings can be seen by all the packages that import it.
It lives in ```` to leave open the possibility of having a
```` module for user settings.
.. note:: After refactoring, globals should only have constants.
Any other variable that needs to be modified when initializing
should be initialized here.
def __init__(self):
# update this dict from globals (but only for ALL_CAPS settings)
for setting in dir(globals):
if setting.isupper():
setattr(self, setting, getattr(globals, setting))
self.end_event = threading.Event()
def init_http_headers(self, nickname, uuid, tor_version):
self.HTTP_HEADERS['Tor-Bandwidth-Scanner-Nickname'] = nickname
self.HTTP_HEADERS['Tor-Bandwidth-Scanner-UUID'] = uuid
self.HTTP_HEADERS['User-Agent'] += tor_version
def set_end_event(self):
settings = Settings() # noqa
''' Measure the relays. '''
import signal
import sys
import threading
import uuid
......@@ -13,24 +14,44 @@ from ..lib.relayprioritizer import RelayPrioritizer
from ..lib.destination import DestinationList
from ..util.timestamp import now_isodt_str
from ..util.state import State
from sbws.globals import fail_hard, TIMEOUT_MEASUREMENTS, HTTP_GET_HEADERS
from sbws.globals import fail_hard, HTTP_GET_HEADERS
import sbws.util.stem as stem_utils
import sbws.util.requests as requests_utils
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
from threading import Event
import time
import os
import logging
import requests
import random
from sbws import settings
from .. import settings
rng = random.SystemRandom()
end_event = Event()
log = logging.getLogger(__name__)
# Declare the objects that manage the threads global so that sbws can exit
# gracefully at any time.
pool = None
rd = None
controller = None
def stop_threads(signal, frame):
global rd, pool
log.debug('Stopping sbws.')
# Avoid new threads to start.
# Stop Pool threads
# Stop ResultDump thread
# Stop Tor thread
signal.signal(signal.SIGTERM, stop_threads)
def dumpstacks():
......@@ -71,8 +92,11 @@ def timed_recv_from_server(session, dest, byte_range):
# headers are merged with the session ones, not overwritten.
session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
# NewConnectionError will be raised when shutting down.
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout) as e:
requests.exceptions.NewConnectionError) as e:
return False, e
end_time = time.time()
return True, end_time - start_time
......@@ -148,7 +172,7 @@ def measure_bandwidth_to_server(session, conf, dest, content_length):
'target': conf.getfloat('scanner', 'download_target'),
'max': conf.getfloat('scanner', 'download_max'),
while len(results) < num_downloads:
while len(results) < num_downloads and not settings.end_event.is_set():
assert expected_amount >= min_dl
assert expected_amount <= max_dl
random_range = get_random_range_string(content_length, expected_amount)
......@@ -207,6 +231,7 @@ def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
def measure_relay(args, conf, destinations, cb, rl, relay):
log.debug('Measuring %s %s', relay.nickname, relay.fingerprint)
s = requests_utils.make_session(
cb.controller, conf.getfloat('general', 'http_timeout'))
# Pick a destionation
......@@ -300,11 +325,7 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
def dispatch_worker_thread(*a, **kw):
return measure_relay(*a, **kw)
except Exception as err:
log.exception('Unhandled exception in worker thread')
raise err
return measure_relay(*a, **kw)
def _should_keep_result(did_request_maximum, result_time, download_times):
......@@ -356,13 +377,114 @@ def result_putter_error(target):
''' Create a function that takes a single argument -- an error from a
measurement -- and return that function so it can be used by someone else
def closure(err):
log.error('Unhandled exception caught while measuring %s: %s %s',
target.nickname, type(err), err)
def closure(object):
# The only object that can be here if there is not any uncatched
# exception is stem.SocketClosed when stopping sbws
return closure
def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
relay_prioritizer, destinations, max_pending_results, pool):
"""Starts and reuse the threads that measure the relays forever.
It starts a loop that will be run while there is not and event signaling
that sbws is stopping (because of SIGTERM or SIGINT).
Then, it starts a second loop with an ordered list (generator) of relays
to measure that might a subset of all the current relays in the Network.
For every relay, it starts a new thread which runs ``measure_relay`` to
measure the relay until there are ``max_pending_results`` threads.
After that, it will reuse a thread that has finished for every relay to
It is the the pool method ``apply_async`` which starts or reuse a thread.
This method returns an ``ApplyResult`` immediately, which has a ``ready``
methods that tells whether the thread has finished or not.
When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
``result_putter`` callback, which put the ``Result`` in ``ResultDump``
queue and complete immediately.
``ResultDump`` thread (started before and out of this function) will get
the ``Result`` from the queue and write it to disk, so this doesn't block
the measurement threads.
If there was an exception not catched by ``measure_relay``, it will call
instead ``result_putter_error``, which logs the error and complete
Before iterating over the next relay, it waits (non blocking, since it
happens in the main thread) until one of the ``max_pending_results``
threads has finished.
This is not needed, since otherwise async_result will queue the relays to
measure in order and won't start reusing a thread to measure a relay until
other thread has finished. But it makes the logic a bit more sequential.
Before the outer loop iterates, it also waits (again non blocking) that all
the ``Results`` are ready.
This avoid to start measuring the same relay which might still being
pending_results = []
# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
time_to_sleep = conf.getfloat('general', 'http_timeout') / 2
# Do not start a new loop if sbws is stopping.
while not settings.end_event.is_set():
log.debug("Starting a new measurement loop.")
num_relays = 0
loop_tstart = time.time()
for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
num_relays += 1
# callback and callback_err must be non-blocking
callback = result_putter(result_dump)
callback_err = result_putter_error(target)
async_result = pool.apply_async(
[args, conf, destinations, circuit_builder, relay_list,
target], {}, callback, callback_err)
# Instead of letting apply_async to queue the relays in order until
# a thread has finished, wait here until a thread has finished.
while len(pending_results) >= max_pending_results:
# sleep is non-blocking since happens in the main process.
pending_results = [r for r in pending_results if not r.ready()]
while len(pending_results) > 0:
log.debug("There are %s pending measurements.",
# sleep is non-blocking since happens in the main process.
pending_results = [r for r in pending_results if not r.ready()]
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
def run_speedtest(args, conf):
"""Initializes all the data and threads needed to measure the relays.
It launches or connect to Tor in a thread.
It initializes the list of relays seen in the Tor network.
It starts a thread to read the previous measurements and wait for new
measurements to write them to the disk.
It initializes a class that will be used to order the relays depending
on their measurements age.
It initializes the list of destinations that will be used for the
It initializes the thread pool that will launch the measurement threads.
The pool starts 3 other threads that are not the measurement (worker)
Finally, it calls the function that will manage the measurement threads.
global rd, pool, controller
controller, _ = stem_utils.init_controller(
path=conf.getpath('tor', 'control_socket'))
if not controller:
......@@ -389,7 +511,7 @@ def run_speedtest(args, conf):
rl = RelayList(args, conf, controller)
cb = CB(args, conf, controller, rl)
rd = ResultDump(args, conf, end_event)
rd = ResultDump(args, conf)
rp = RelayPrioritizer(args, conf, rl, rd)
destinations, error_msg = DestinationList.from_config(
conf, cb, rl, controller)
......@@ -397,38 +519,14 @@ def run_speedtest(args, conf):
max_pending_results = conf.getint('scanner', 'measurement_threads')
pool = Pool(max_pending_results)
pending_results = []
while True:
num_relays = 0
loop_tstart = time.time()"Starting a new loop to measure relays.")
for target in rp.best_priority():
num_relays += 1
log.debug('Measuring %s %s', target.nickname,
callback = result_putter(rd)
callback_err = result_putter_error(target)
async_result = pool.apply_async(
[args, conf, destinations, cb, rl, target],
{}, callback, callback_err)
while len(pending_results) >= max_pending_results:
pending_results = [r for r in pending_results if not r.ready()]
time_waiting = 0
while (len(pending_results) > 0
and time_waiting <= TIMEOUT_MEASUREMENTS):
log.debug("Number of pending measurement threads %s after "
"a prioritization loop.", len(pending_results))
time_waiting += 5
pending_results = [r for r in pending_results if not r.ready()]
if time_waiting > TIMEOUT_MEASUREMENTS:
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60"Measured %s relays in %s minutes", num_relays, loop_tdelta)
main_loop(args, conf, controller, rl, cb, rd, rp, destinations,
max_pending_results, pool)
except KeyboardInterrupt:"Interrupted by the user.")
stop_threads(signal.SIGINT, None)
def gen_parser(sub):
......@@ -458,9 +556,4 @@ def main(args, conf):
if 'uuid' not in state:
state['uuid'] = str(uuid.uuid4())
run_speedtest(args, conf)
except KeyboardInterrupt as e:
raise e
run_speedtest(args, conf)
from stem import CircuitExtensionFailed, InvalidRequest, ProtocolError, Timeout
from stem import InvalidArguments, ControllerError
from stem import InvalidArguments, ControllerError, SocketClosed
import random
from .relaylist import Relay
import logging
......@@ -56,16 +56,12 @@ class CircuitBuilder:
raise NotImplementedError()
def close_circuit(self, circ_id):
c = self.controller
c.get_circuit(circ_id, default=None)
except (InvalidArguments, InvalidRequest):
except (ControllerError, ValueError) as e:
log.exception("Error trying to get circuit to close it: %s.", e)
# SocketClosed will be raised when stopping sbws
except (InvalidArguments, InvalidRequest, SocketClosed) as e:
def _build_circuit_impl(self, path):
......@@ -4,7 +4,6 @@ import time
import logging
from glob import glob
from threading import Thread
from threading import Event
from threading import RLock
from queue import Queue
from queue import Empty
......@@ -14,6 +13,7 @@ from enum import Enum
from sbws.globals import RESULT_VERSION, fail_hard
from sbws.util.filelock import DirectoryLock
from sbws.lib.relaylist import Relay
from .. import settings
log = logging.getLogger(__name__)
......@@ -527,13 +527,11 @@ class ResultSuccess(Result):
class ResultDump:
''' Runs the enter() method in a new thread and collects new Results on its
queue. Writes them to daily result files in the data directory '''
def __init__(self, args, conf, end_event):
def __init__(self, args, conf):
assert os.path.isdir(conf.getpath('paths', 'datadir'))
assert isinstance(end_event, Event)
self.conf = conf
self.fresh_days = conf.getint('general', 'data_period')
self.datadir = conf.getpath('paths', 'datadir')
self.end_event = end_event = {}
self.data_lock = RLock()
self.thread = Thread(target=self.enter)
......@@ -563,7 +561,7 @@ class ResultDump:
assert isinstance(result, Result)
fp = result.fingerprint
nick = result.nickname
if isinstance(result, ResultError) and self.end_event.is_set():
if isinstance(result, ResultError) and settings.end_event.is_set():
log.debug('Ignoring %s for %s %s because we are shutting down',
type(result).__name__, nick, fp)
......@@ -586,7 +584,7 @@ class ResultDump:
with self.data_lock: = load_recent_results_in_datadir(
self.fresh_days, self.datadir)
while not (self.end_event.is_set() and self.queue.empty()):
while not (settings.end_event.is_set() and self.queue.empty()):
event = self.queue.get(timeout=1)
except Empty:
import socks
from stem.control import (Controller, Listener)
from stem import (SocketError, InvalidRequest, UnsatisfiableRequest,
OperationFailed, ControllerError, InvalidArguments,
ProtocolError, SocketClosed)
from stem.connection import IncorrectSocketType
import stem.process
from configparser import ConfigParser
......@@ -249,8 +251,9 @@ def circuit_str(controller, circ_id):
log.warning('Circuit %s no longer seems to exist so can\'t return '
'a valid circuit string for it: %s', circ_id, e)
return None
except ControllerError as e:
log.exception("Exception trying to get circuit string %s", e)
# exceptions raised when stopping the scanner
except (ControllerError, SocketClosed, socks.GeneralProxyError) as e:
return None
return '[' +\
' -> '.join(['{} ({})'.format(n, fp[0:8]) for fp, n in circ.path]) +\
from sbws.lib.resultdump import ResultDump
from sbws.lib.resultdump import ResultSuccess, ResultErrorCircuit
from sbws.lib.relayprioritizer import RelayPrioritizer
from threading import Event
from unittest.mock import patch
from sbws import settings
def static_time(value):
while True:
......@@ -41,8 +42,7 @@ def test_relayprioritizer_general(time_mock, sbwshome_empty, args,
now = 1000000
time_mock.side_effect = static_time(now)
end_event = Event()
rd = ResultDump(args, conf, end_event)
rd = ResultDump(args, conf)
rp = RelayPrioritizer(args, conf, rl, rd)
results = []
......@@ -66,4 +66,4 @@ def test_relayprioritizer_general(time_mock, sbwshome_empty, args,
relay = best_list[pos]
assert relay.nickname == nick
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment