Commit 083e7c70 authored by juga's avatar juga
Browse files

new: destination: Recover destination when it failed

Closes: #29589.
parent 7cc6de1d
......@@ -267,16 +267,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.critical("There are not any functional destinations.\n"
"It is recommended to set several destinations so that "
"the scanner can continue if one fails.")
# Exit the scanner with error stopping threads first.
stop_threads(signal.SIGTERM, None, 1)
# When the destinations can recover would be implemented;
# reason = 'Unable to get destination'
# log.debug(reason + ' to measure %s %s',
# relay.nickname, relay.fingerprint)
# return [
# ResultErrorDestination(relay, [], dest.url, our_nick,
# msg=reason),
# ]
# NOTE: Because this is executed in a thread, stop_threads can not
# be call from here, it has to be call from the main thread.
# Instead set the singleton end event, that will call stop_threads
# from the main process.
# Errors with only one destination are set in ResultErrorStream.
settings.end_event.set()
return None
# Pick a relay to help us measure the given relay. If the given relay is an
# exit, then pick a non-exit. Otherwise pick an exit.
helper = None
......@@ -321,10 +318,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
log.debug('Destination %s unusable via circuit %s (%s), %s',
dest.url, circ_fps, nicknames, usable_data)
cb.close_circuit(circ_id)
# TODO: Return a different/new type of ResultError?
msg = 'The destination seemed to have stopped being usable'
return [
ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
ResultErrorStream(relay, circ_fps, dest.url, our_nick,
msg=usable_data),
]
assert is_usable
assert 'content_length' in usable_data
......@@ -561,7 +557,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
than the time to request over the network.)
"""
num_last_measured = 1
while num_last_measured > 0:
while num_last_measured > 0 and not settings.end_event.is_set():
log.info("Pending measurements: %s out of %s: ",
len(pending_results), num_relays_to_measure)
time.sleep(TIMEOUT_MEASUREMENTS)
......
import collections
import datetime
import logging
import random
import requests
......@@ -6,8 +8,13 @@ from stem.control import EventType
from sbws.globals import DESTINATION_VERIFY_CERTIFICATE
import sbws.util.stem as stem_utils
from ..globals import (
MAX_NUM_DESTINATION_FAILURES,
DELTA_SECONDS_RETRY_DESTINATION,
NUM_DESTINATION_ATTEMPTS_STORED,
FACTOR_INCREMENT_DESTINATION_RETRY
)
from ..globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
log = logging.getLogger(__name__)
......@@ -98,73 +105,155 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
try:
head = session.head(dest.url, verify=dest.verify)
except requests.exceptions.RequestException as e:
dest.set_failure()
dest.add_failure()
return False, 'Could not connect to {} over circ {} {}: {}'.format(
dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
finally:
stem_utils.remove_event_listener(cont, listener)
if head.status_code != requests.codes.ok:
dest.set_failure()
dest.add_failure()
return False, error_prefix + 'we expected HTTP code '\
'{} not {}'.format(requests.codes.ok, head.status_code)
if 'content-length' not in head.headers:
dest.set_failure()
dest.add_failure()
return False, error_prefix + 'we except the header Content-Length '\
'to exist in the response'
content_length = int(head.headers['content-length'])
if max_dl > content_length:
dest.set_failure()
dest.add_failure()
return False, error_prefix + 'our maximum configured download size '\
'is {} but the content is only {}'.format(max_dl, content_length)
log.debug('Connected to %s over circuit %s', dest.url, circ_id)
# Any failure connecting to the destination will call set_failure,
# which will set `failed` to True and count consecutives failures.
# It can not be set at the start, to be able to know if it failed a
# a previous time, which is checked by set_failure.
# Future improvement: use a list to count consecutive failures
# or calculate it from the results.
dest.failed = False
# Any failure connecting to the destination will call add_failure,
# It can not be set at the start, to be able to know whether it is
# failing consecutive times.
dest.add_success()
return True, {'content_length': content_length}
class Destination:
def __init__(self, url, max_dl, verify):
"""Web server from which data is downloaded to measure bandwidth.
"""
# NOTE: max_dl and verify should be optional and have defaults
def __init__(self, url, max_dl, verify,
max_num_failures=MAX_NUM_DESTINATION_FAILURES,
delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION,
num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED,
factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY):
"""Initalizes the Web server from which the data is downloaded.
:param str url: Web server data URL to download.
:param int max_dl: Maximum size of the the data to download.
:param bool verify: Whether to verify or not the TLS certificate.
:param int max_num_failures: Number of consecutive failures when the
destination is not considered functional.
:param int delta_seconds_retry: Delta time to try a destination
that was not functional.
:param int num_attempts_stored: Number of attempts to store.
:param int factor_increment_retry: Factor to increment delta by
before trying to use a destination again.
"""
self._max_dl = max_dl
u = urlparse(url)
self._url = u
self._verify = verify
# Flag to record whether this destination failed in the last
# measurement.
# Failures can happen if:
# - an HTTPS request can not be made over Tor
# (which might be the relays fault, not the destination being
# unreachable)
# - the destination does not support HTTP Range requests.
self.failed = False
self.consecutive_failures = 0
@property
def is_functional(self):
# Attributes to decide whether a destination is functional or not.
self._max_num_failures = max_num_failures
self._num_attempts_stored = num_attempts_stored
# Default delta time to try a destination that was not functional.
self._default_delta_seconds_retry = delta_seconds_retry
self._delta_seconds_retry = delta_seconds_retry
# Using a deque (FIFO) to do not grow forever and
# to do not have to remove old attempts.
# Store tuples of timestamp and whether the destination succed or not
# (succed, 1, failed, 0).
# Initialize it as if it never failed.
self._attempts = collections.deque([(datetime.datetime.utcnow(), 1), ],
maxlen=self._num_attempts_stored)
self._factor = factor_increment_retry
def _last_attempts(self, n=None):
"""Return the last ``n`` attempts the destination was used."""
# deque does not accept slices,
# a new deque is returned with the last n items
# (or less if there were less).
return collections.deque(self._attempts,
maxlen=(n or self._max_num_failures))
def _are_last_attempts_failures(self, n=None):
"""
Return True if the last`` n`` times the destination was used
and failed.
"""
# Count the number that there was a failure when used
n = n if n else self._max_num_failures
return ([i[1] for i in self._last_attempts(n)].count(0)
>= self._max_num_failures)
def _increment_time_to_retry(self, factor=None):
"""
Increment the time a destination will be tried again by a ``factor``.
"""
Returns True if there has not been a number consecutive measurements.
Otherwise warn about it and return False.
self._delta_seconds_retry *= factor or self._factor
log.info("Incremented the time to try destination %s to %s hours.",
self.url, self._delta_seconds_retry / 60 / 60)
def _is_last_try_old_enough(self, n=None):
"""
Return True if the last time it was used it was ``n`` seconds ago.
"""
if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES:
log.warning("Destination %s is not functional. Please check that "
"it is correct.", self._url)
# Timestamp of the last attempt.
last_time = self._attempts[-1][0]
# If the last attempt is older than _delta_seconds_retry,
if (datetime.datetime.utcnow()
- datetime.timedelta(seconds=self._delta_seconds_retry)
> last_time):
# And try again.
return True
return False
def is_functional(self):
"""Whether connections to a destination are failing or not.
Return True if:
- It did not fail more than n (by default 3) consecutive times.
- The last time the destination was tried
was x (by default 3h) seconds ago.
And False otherwise.
When the destination is tried again after the consecutive failures,
the time to try again is incremented and resetted as soon as the
destination does not fail.
"""
# Failed the last X consecutive times
if self._are_last_attempts_failures():
log.warning("The last %s times the destination %s failed."
"It will not be used again in %s hours.\n",
self._max_num_failures, self.url,
self._delta_seconds_retry / 60 / 60)
log.warning("Please, add more destinations or increment the "
"number of maximum number of consecutive failures "
"in the configuration.")
# It was not used for a while and the last time it was used
# was long ago, then try again
if self._is_last_try_old_enough():
log.info("The destination %s was not tried for %s hours, "
"it is going to by tried again.")
# Set the next time to retry higher, in case this attempt fails
self._increment_time_to_retry()
return True
return False
# Reset the time to retry to the initial value
# In case it was incrememented
self._delta_seconds_retry = self._default_delta_seconds_retry
return True
def set_failure(self):
"""Set failed to True and increase the number of consecutive failures.
Only if it also failed in the previous measuremnt.
def add_failure(self, dt=None):
self._attempts.append((dt or datetime.datetime.utcnow(), 0))
"""
# if it failed in the last measurement
if self.failed:
self.consecutive_failures += 1
self.failed = True
def add_success(self, dt=None):
self._attempts.append((dt or datetime.datetime.utcnow(), 1))
@property
def url(self):
......@@ -213,7 +302,7 @@ class DestinationList:
@property
def functional_destinations(self):
return [d for d in self._all_dests if d.is_functional]
return [d for d in self._all_dests if d.is_functional()]
@staticmethod
def from_config(conf, circuit_builder, relay_list, controller):
......@@ -250,4 +339,8 @@ class DestinationList:
# This removes the need for an extra lock for every measurement.
# Do not change the order of the destinations, just return a
# destination.
return self._rng.choice(self.functional_destinations)
# random.choice raises IndexError with an empty list.
if self.functional_destinations:
return self._rng.choice(self.functional_destinations)
else:
return None
"""Integration tests for destination.py"""
from sbws.globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
import sbws.util.requests as requests_utils
from sbws.lib.destination import (DestinationList, Destination,
connect_to_destination_over_circuit)
......@@ -36,16 +35,12 @@ def test_connect_to_destination_over_circuit_success(persistent_launch_tor,
destination, circuit_id, session, persistent_launch_tor, 1024)
assert is_usable is True
assert 'content_length' in response
assert not destination.failed
assert destination.consecutive_failures == 0
assert destination.is_functional
assert destination.is_functional()
def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
dests, cb, rl):
bad_destination = Destination('https://example.example', 1024, False)
# dests._all_dests.append(bad_destination)
# dests._usable_dests.append(bad_destination)
session = requests_utils.make_session(persistent_launch_tor, 10)
# Choose a relay that is not an exit
relay = [r for r in rl.relays
......@@ -61,35 +56,40 @@ def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
assert is_usable is False
# because it is the first time it fails, failures aren't count
assert bad_destination.failed
assert bad_destination.consecutive_failures == 0
assert bad_destination.is_functional
assert bad_destination.is_functional()
# fail twice in a row
# fail three times in a row
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
assert bad_destination.failed
assert bad_destination.consecutive_failures == 1
assert bad_destination.is_functional
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
assert not bad_destination.is_functional()
def test_functional_destinations(conf, cb, rl, persistent_launch_tor):
good_destination = Destination('https://127.0.0.1:28888', 1024, False)
# Mock that it failed before and just now, but it's still considered
# functional.
good_destination.consecutive_failures = 3
good_destination.failed = True
bad_destination = Destination('https://example.example', 1024, False)
# Mock that it didn't fail now, but it already failed 11 consecutive
# times.
bad_destination.consecutive_failures = \
MAXIMUM_NUMBER_DESTINATION_FAILURES + 1
bad_destination.failed = False
# None of the arguments are used, move to unit tests when this get
# refactored
session = requests_utils.make_session(persistent_launch_tor, 10)
# Choose a relay that is not an exit
relay = [r for r in rl.relays
if r.nickname == 'relay1mbyteMAB'][0]
# Choose an exit, for this test it does not matter the bandwidth
helper = rl.exits_not_bad_allowing_port(bad_destination.port)[0]
circuit_path = [relay.fingerprint, helper.fingerprint]
# Build a circuit.
circuit_id, _ = cb.build_circuit(circuit_path)
# fail three times in a row
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
is_usable, response = connect_to_destination_over_circuit(
bad_destination, circuit_id, session, persistent_launch_tor, 1024)
destination_list = DestinationList(
conf, [good_destination, bad_destination], cb, rl,
persistent_launch_tor)
expected_functional_destinations = [good_destination]
functional_destinations = destination_list.functional_destinations
assert expected_functional_destinations == functional_destinations
assert [good_destination] == functional_destinations
"""Unit tests for sbws.lib.destination."""
from datetime import datetime, timedelta
from sbws.lib import destination
def test_destination_is_functional():
eight_hours_ago = datetime.utcnow() - timedelta(hours=8)
four_hours_ago = datetime.utcnow() - timedelta(hours=4)
two_hours_ago = datetime.utcnow() - timedelta(hours=2)
d = destination.Destination('unexistenturl', 0, False)
assert d.is_functional()
# Fail 3 consecutive times
d.add_failure()
d.add_failure()
d.add_failure()
assert d._are_last_attempts_failures()
assert not d._is_last_try_old_enough()
assert not d.is_functional()
# Then doesn't fail and it's functional again
d.add_success()
assert not d._are_last_attempts_failures()
assert d.is_functional()
# Fail again 3 times
d.add_failure()
d.add_failure()
# And last failure was 2h ago
d.add_failure(two_hours_ago)
assert d._are_last_attempts_failures()
assert not d._is_last_try_old_enough()
assert not d.is_functional()
# But if the last failure was 4h ago, try to use it again
# And last failure was 4h ago
d.add_failure(four_hours_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()
# If last failure was 8h ago, try to use it again again
d.add_failure(eight_hours_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()
# Whenever it does not fail again, reset the time to try again
# on 3 consecutive failures
d.add_success()
assert not d._are_last_attempts_failures()
assert d.is_functional()
# And the delta to try is resetted
assert not d._is_last_try_old_enough()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment