Commit 692e1fac authored by juga's avatar juga
Browse files

Merge branch 'master' into HEAD

There was an ammended commit diverging from canonical master
parents 1bb248b8 6070d360
......@@ -159,7 +159,7 @@ When a PR is being reviewed, new changes might be needed:
- If the change does not modify a previous change, create new commits and push.
- If the change modifies a previous change and it's small,
`git commit fixup <>`_
`git commit fixup <>`_
should be used. When it is agreed that the PR is ready, create a new branch
named ``mybranch_02`` and run:
......@@ -3,13 +3,17 @@
[![Build Status](](
Simple Bandwidth Scanner (called `sbws`) is a Tor bandwidth scanner that
produces bandwidth files to be used by Directory Authorities.
generates bandwidth files to be used by Directory Authorities.
The scanner builds two hop circuits consisting of the relay being measured and
a fast exit. Over these circuits it measures bandwidth and store the results.
The scanner measures the bandwidth of each relay in the Tor network
(except the directory authorities) by creating a two hops circuit
with the relay. It then measures the bandwidth by downloading data
from a destination Web Server and stores the measurements.
The generator read the measurement results, scales them using torflow's
scaling method and creates the bandwidth file.
The generator read the measurements, aggregates, filters and
scales them using torflow's scaling method.
Then it generates a bandwidth list file that is read
by a directory authority to report relays’ bandwidth in its vote.
**WARNING**: This software is intended to be run by researchers using a test
Tor network, such as chutney or shadow, or by the Tor bandwidth authorities
......@@ -25,6 +25,10 @@ verify = False
# Use ZZ if the location is unknown (for instance, a CDN).
country = ZZ
# Number of consecutive times that a destination could not be used to measure
# before stopping to try to use it for a while that by default is 3h.
max_num_failures = 3
## The following logging options are set by default.
## There is no need to change them unless other options are prefered.
; [logging]
......@@ -7,12 +7,19 @@ Running the scanner
The :term:`scanner` obtain a list of relays from the Tor network.
It measures the bandwidth of each relay by creating a two hop circuit with the
relay to measure and download data from a :term:`destination` Web Server.
The :term:`generator` creates a :term:`bandwidth list file` that is read
by a :term:`directory authority` and used to report relays' bandwidth in its
.. The following text is part of the introduction in the README, but rst
The :term:`scanner` measures the bandwidth of each relay in the Tor network
(except the directory authorities) by creating a two hops circuit
with the relay. It then measures the bandwidth by downloading data
from a :term:`destination` Web Server and stores the measurements.
The :term:`generator` read the measurements, aggregates, filters and
scales them using torflow's scaling method.
Then it generates a :term:`bandwidth list file` that is read
by a :term:`directory authority` to report relays’ bandwidth in its vote.
.. image:: ./images/scanner.svg
:height: 200px
__version__ = '1.0.3'
__version__ = '1.1.0-dev0'
import threading # noqa
......@@ -7,10 +7,14 @@ import threading
import traceback
import uuid
from multiprocessing.context import TimeoutError
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
from ..lib.resultdump import ResultSuccess, ResultErrorCircuit
from ..lib.resultdump import ResultErrorStream
from ..lib.resultdump import (
ResultSuccess, ResultErrorCircuit, ResultErrorStream,
ResultErrorSecondRelay, ResultError, # ResultErrorDestination
from ..lib.relaylist import RelayList
from ..lib.relayprioritizer import RelayPrioritizer
from ..lib.destination import (DestinationList,
......@@ -238,11 +242,22 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.debug('Measuring %s %s', relay.nickname, relay.fingerprint)
our_nick = conf['scanner']['nickname']
s = requests_utils.make_session(
cb.controller, conf.getfloat('general', 'http_timeout'))
# Probably because the scanner is stopping.
if s is None:
return None
if settings.end_event.is_set():
return None
# In future refactor this should be returned from the make_session
reason = "Unable to get proxies."
log.debug(reason + ' to measure %s %s',
relay.nickname, relay.fingerprint)
return [
ResultError(relay, [], '', our_nick,
# Pick a destionation
dest =
# If there is no any destination at this point, it can not continue.
......@@ -252,8 +267,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.critical("There are not any functional destinations.\n"
"It is recommended to set several destinations so that "
"the scanner can continue if one fails.")
# Exit the scanner with error stopping threads first.
stop_threads(signal.SIGTERM, None, 1)
# NOTE: Because this is executed in a thread, stop_threads can not
# be call from here, it has to be call from the main thread.
# Instead set the singleton end event, that will call stop_threads
# from the main process.
# Errors with only one destination are set in ResultErrorStream.
return None
# Pick a relay to help us measure the given relay. If the given relay is an
# exit, then pick a non-exit. Otherwise pick an exit.
helper = None
......@@ -272,14 +292,15 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
circ_fps = [relay.fingerprint, helper.fingerprint]
nicknames = [relay.nickname, helper.nickname]
if not helper:
# TODO: Return ResultError of some sort
log.debug('Unable to pick a 2nd relay to help measure %s (%s)',
reason = 'Unable to select a second relay'
log.debug(reason + ' to help measure %s (%s)',
relay.fingerprint, relay.nickname)
return None
assert helper
assert circ_fps is not None and len(circ_fps) == 2
return [
ResultErrorSecondRelay(relay, [], dest.url, our_nick,
# Build the circuit
our_nick = conf['scanner']['nickname']
circ_id, reason = cb.build_circuit(circ_fps)
if not circ_id:
log.debug('Could not build circuit with path %s (%s): %s ',
......@@ -297,10 +318,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.debug('Destination %s unusable via circuit %s (%s), %s',
dest.url, circ_fps, nicknames, usable_data)
# TODO: Return a different/new type of ResultError?
msg = 'The destination seemed to have stopped being usable'
return [
ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
ResultErrorStream(relay, circ_fps, dest.url, our_nick,
assert is_usable
assert 'content_length' in usable_data
......@@ -448,33 +468,28 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
instead ``result_putter_error``, which logs the error and complete
Before iterating over the next relay, it waits (non blocking, since it
happens in the main thread) until one of the ``max_pending_results``
threads has finished.
This is not needed, since otherwise async_result will queue the relays to
measure in order and won't start reusing a thread to measure a relay until
other thread has finished. But it makes the logic a bit more sequential.
Before the outer loop iterates, it also waits (again non blocking) that all
the ``Results`` are ready.
Before the outer loop iterates, it waits (non blocking) that all
the ``Results`` are ready calling ``wait_for_results``.
This avoid to start measuring the same relay which might still being
pending_results = []
# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
time_to_sleep = conf.getfloat('general', 'http_timeout') / 2
# Do not start a new loop if sbws is stopping.
while not settings.end_event.is_set():
log.debug("Starting a new measurement loop.")
num_relays = 0
# Since loop might finish before pending_results is 0 due waiting too
# long, set it here and not outside the loop.
pending_results = []
loop_tstart = time.time()
for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
num_relays += 1
# callback and callback_err must be non-blocking
callback = result_putter(result_dump)
......@@ -484,22 +499,14 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
[args, conf, destinations, circuit_builder, relay_list,
target], {}, callback, callback_err)
# Instead of letting apply_async to queue the relays in order until
# a thread has finished, wait here until a thread has finished.
while len(pending_results) >= max_pending_results:
# sleep is non-blocking since happens in the main process.
pending_results = [r for r in pending_results if not r.ready()]
time_waiting = 0
while (len(pending_results) > 0
and time_waiting <= TIMEOUT_MEASUREMENTS):
log.debug("Number of pending measurement threads %s after "
"a prioritization loop.", len(pending_results))
time_waiting += time_to_sleep
pending_results = [r for r in pending_results if not r.ready()]
if time_waiting > TIMEOUT_MEASUREMENTS:
# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
# a dictionary with AsyncResults as items.
num_relays_to_measure = len(pending_results)
wait_for_results(num_relays_to_measure, pending_results)
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
......@@ -510,6 +517,92 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
stop_threads(signal.SIGTERM, None)
def wait_for_results(num_relays_to_measure, pending_results):
"""Wait for the pool to finish and log progress.
While there are relays being measured, just log the progress
and sleep :const:`~sbws.globals.TIMEOUT_MEASUREMENTS` (3mins),
which is aproximately the time it can take to measure a relay in
the worst case.
When there has not been any relay measured in ``TIMEOUT_MEASUREMENTS``
and there are still relays pending to be measured, it means there is no
progress and call :func:`~sbws.core.scanner.force_get_results`.
This can happen in the case of a bug that makes either
:func:`~sbws.core.scanner.result_putter` (callback) and/or
:func:`~sbws.core.scanner.result_putter_error` (callback error) stall.
.. note:: in a future refactor, this could be simpler by:
1. Initializing the pool at the begingging of each loop
2. Callling :meth:`~Pool.close`; :meth:`~Pool.join` after
to ensure no new jobs are added until the pool has finished with all
the ones in the queue.
As currently, there would be still two cases when the pool could stall:
1. There's an exception in ``measure_relay`` and another in
2. There's an exception ``callback``.
This could also be simpler by not having callback and callback error in
``apply_async`` and instead just calling callback with the
(callback could be also simpler by not having a thread and queue and
just storing to disk, since the time to write to disk is way smaller
than the time to request over the network.)
num_last_measured = 1
while num_last_measured > 0 and not settings.end_event.is_set():"Pending measurements: %s out of %s: ",
len(pending_results), num_relays_to_measure)
old_pending_results = pending_results
pending_results = [r for r in pending_results if not r.ready()]
num_last_measured = len(old_pending_results) - len(pending_results)
if len(pending_results) > 0:
def force_get_results(pending_results):
"""Try to get either the result or an exception, which gets logged.
It is call by :func:`~sbws.core.scanner.wait_for_results` when
the time waiting for the results was long.
To get either the :class:`~sbws.lib.resultdump.Result` or an exception,
call :meth:`~AsyncResult.get` with timeout.
Timeout is low since we already waited.
``get`` is not call before, because it blocks and the callbacks
are not call.
log.debug("Forcing get")
for r in pending_results:
result = r.get(timeout=0.1)
log.warning("Result %s was not stored, it took too long.",
# TimeoutError is raised when the result is not ready, ie. has not
# been processed yet
except TimeoutError:
log.warning("A result was not stored, it was not ready.")
# If the result raised an exception, `get` returns it,
# then log any exception so that it can be fixed.
# This should not happen, since `callback_err` would have been call
# first.
except Exception as e:
# If the exception happened in the threads, `log.exception` does
# not have the traceback.
log.warning("traceback %s",
traceback.print_exception(type(e), e, e.__traceback__))
def run_speedtest(args, conf):
"""Initializes all the data and threads needed to measure the relays.
......@@ -551,8 +644,10 @@ def run_speedtest(args, conf):
# Call only once to initialize http_headers
settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
rl = RelayList(args, conf, controller)
# To do not have to pass args and conf to RelayList, pass an extra
# argument with the data_period
measurements_period = conf.getint('general', 'data_period')
rl = RelayList(args, conf, controller, measurements_period, state)
cb = CB(args, conf, controller, rl)
rd = ResultDump(args, conf)
rp = RelayPrioritizer(args, conf, rl, rd)
......@@ -80,7 +80,19 @@ MIN_REPORT = 60
# in the bandwidth lines in percentage
# With the new KeyValues in #29591, the lines are greater than 510
# Tor already accept lines of any size, but leaving the limit anyway.
# RelayList, ResultDump, v3bwfile
# For how many seconds in the past the relays and measurements data is keep/
# considered valid.
# This is currently set by default in config.default.ini as ``date_period``,
# and used in ResultDump and v3bwfile.
# In a future refactor, constants in config.default.ini should be moved here,
# or calculated in settings, so that there's no need to pass the configuration
# to all the functions.
MEASUREMENTS_PERIOD = 5 * 24 * 60 * 60
# Metadata to send in every requests, so that data servers can know which
# scanners are using them.
......@@ -110,9 +122,18 @@ HTTP_GET_HEADERS = {
# This number might need adjusted depending on the percentage of circuits and
# HTTP requests failures.
# While the scanner can not recover from some/all failing destionations,
# set a big number so that it continues trying.
# Number of attempts to use a destination, that are stored, in order to decide
# whether the destination is functional or not.
# Time to wait before trying again a destination that wasn't functional.
# Number of consecutive times a destination can fail before considering it
# not functional.
# By which factor to multiply DELTA_SECONDS_RETRY_DESTINATION when the
# destination fail again.
def fail_hard(*a, **kw):
import collections
import datetime
import logging
import random
import requests
......@@ -6,8 +8,14 @@ from stem.control import EventType
import sbws.util.stem as stem_utils
from ..globals import (
from sbws import settings
log = logging.getLogger(__name__)
......@@ -91,6 +99,10 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
should commence. False and an error string otherwise.
assert isinstance(dest, Destination)
log.debug("Connecting to destination over circuit.")
# Do not start if sbws is stopping
if settings.end_event.is_set():
return False, "Shutting down."
error_prefix = 'When sending HTTP HEAD to {}, '.format(dest.url)
with stem_utils.stream_building_lock:
listener = stem_utils.attach_stream_to_circuit_listener(cont, circ_id)
......@@ -98,73 +110,153 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
head = session.head(dest.url, verify=dest.verify)
except requests.exceptions.RequestException as e:
return False, 'Could not connect to {} over circ {} {}: {}'.format(
dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
stem_utils.remove_event_listener(cont, listener)
if head.status_code !=
return False, error_prefix + 'we expected HTTP code '\
'{} not {}'.format(, head.status_code)
if 'content-length' not in head.headers:
return False, error_prefix + 'we except the header Content-Length '\
'to exist in the response'
content_length = int(head.headers['content-length'])
if max_dl > content_length:
return False, error_prefix + 'our maximum configured download size '\
'is {} but the content is only {}'.format(max_dl, content_length)
log.debug('Connected to %s over circuit %s', dest.url, circ_id)
# Any failure connecting to the destination will call set_failure,
# which will set `failed` to True and count consecutives failures.
# It can not be set at the start, to be able to know if it failed a
# a previous time, which is checked by set_failure.
# Future improvement: use a list to count consecutive failures
# or calculate it from the results.
dest.failed = False
# Any failure connecting to the destination will call add_failure,
# It can not be set at the start, to be able to know whether it is
# failing consecutive times.
return True, {'content_length': content_length}
class Destination:
def __init__(self, url, max_dl, verify):
"""Web server from which data is downloaded to measure bandwidth.
# NOTE: max_dl and verify should be optional and have defaults
def __init__(self, url, max_dl, verify,
"""Initalizes the Web server from which the data is downloaded.
:param str url: Web server data URL to download.
:param int max_dl: Maximum size of the the data to download.
:param bool verify: Whether to verify or not the TLS certificate.
:param int max_num_failures: Number of consecutive failures when the
destination is not considered functional.
:param int delta_seconds_retry: Delta time to try a destination
that was not functional.
:param int num_attempts_stored: Number of attempts to store.
:param int factor_increment_retry: Factor to increment delta by
before trying to use a destination again.
self._max_dl = max_dl
u = urlparse(url)
self._url = u
self._verify = verify
# Flag to record whether this destination failed in the last
# measurement.
# Failures can happen if:
# - an HTTPS request can not be made over Tor
# (which might be the relays fault, not the destination being
# unreachable)
# - the destination does not support HTTP Range requests.
self.failed = False
self.consecutive_failures = 0
def is_functional(self):
# Attributes to decide whether a destination is functional or not.
self._max_num_failures = max_num_failures
self._num_attempts_stored = num_attempts_stored
# Default delta time to try a destination that was not functional.
self._default_delta_seconds_retry = delta_seconds_retry
self._delta_seconds_retry = delta_seconds_retry
# Using a deque (FIFO) to do not grow forever and
# to do not have to remove old attempts.
# Store tuples of timestamp and whether the destination succed or not
# (succed, 1, failed, 0).
# Initialize it as if it never failed.
self._attempts = collections.deque([(datetime.datetime.utcnow(), 1), ],
self._factor = factor_increment_retry
def _last_attempts(self, n=None):
"""Return the last ``n`` attempts the destination was used."""
# deque does not accept slices,
# a new deque is returned with the last n items
# (or less if there were less).
return collections.deque(self._attempts,
maxlen=(n or self._max_num_failures))
def _are_last_attempts_failures(self, n=None):
Return True if the last`` n`` times the destination was used
and failed.
Returns True if there has not been a number consecutive measurements.
Otherwise warn about it and return False.
# Count the number that there was a failure when used
n = n if n else self._max_num_failures
return ([i[1] for i in self._last_attempts(n)].count(0)
>= self._max_num_failures)
def _increment_time_to_retry(self, factor=None):
if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES:
log.warning("Destination %s is not functional. Please check that "
"it is correct.", self._url)
Increment the time a destination will be tried again by a ``factor``.
self._delta_seconds_retry *= factor or self._factor"Incremented the time to try destination %s to %s hours.",
self.url, self._delta_seconds_retry / 60 / 60)
def _is_last_try_old_enough(self, n=None):
Return True if the last time it was used it was ``n`` seconds ago.
# Timestamp of the last attempt.
last_time = self._attempts[-1][0]
# If the last attempt is older than _delta_seconds_retry, try again
return (datetime.datetime.utcnow()
- datetime.timedelta(seconds=self._delta_seconds_retry)
> last_time)
return False
def is_functional(self):
"""Whether connections to a destination are failing or not.
Return True if:
- It did not fail more than n (by default 3) consecutive times.
- The last time the destination was tried
was x (by default 3h) seconds ago.
And False otherwise.
When the destination is tried again after the consecutive failures,
the time to try again is incremented and resetted as soon as the
destination does not fail.
# Failed the last X consecutive times
if self._are_last_attempts_failures():
log.warning("The last %s times the destination %s failed."
"It will not be used again in %s hours.\n",
self._max_num_failures, self.url,
self._delta_seconds_retry / 60 / 60)
log.warning("Please, add more destinations or increment the "
"number of maximum number of consecutive failures "
"in the configuration.")
# It was not used for a while and the last time it was used
# was long ago, then try again
if self._is_last_try_old_enough():"The destination %s was not tried for %s hours, "
"it is going to by tried again.")
# Set the next time to retry higher, in case this attempt fails
return True
return False
# Reset the time to retry to the initial value
# In case it was incrememented
self._delta_seconds_retry = self._default_delta_seconds_retry
return True
def set_failure(self):
"""Set failed to True and increase the number of consecutive failures.
Only if it also failed in the previous measuremnt.
def add_failure(self, dt=None):
self._attempts.append((dt or datetime.datetime.utcnow(), 0))
# if it failed in the last measurement
if self.failed:
self.consecutive_failures += 1
self.failed = True
def add_success(self, dt=None):
self._attempts.append((dt or datetime.datetime.utcnow(), 1))
def url(self):
......@@ -197,7 +289,15 @@ class Destination:
assert 'url' in conf_section
url = conf_section['url']
verify = _parse_verify_option(conf_section)
return Destination(url, max_dl, verify)
max_num_failures = conf_section.getint('max_num_failures')
except ValueError:
log.warning("Configuration max_num_failures is wrong, ignoring.")
max_num_failures = None
if max_num_failures:
return Destination(url, max_dl, verify, max_num_failures)