Commit 612807e3 authored by juga's avatar juga
Browse files

fix: destination: Multiply errors by the threads

Since when a destination fails, all the threads using it will also
fail that moment.
For now not checking which threads are actually using it.
Also lower the time to retry.

Closes: #29891.
parent eb749c09
......@@ -29,7 +29,6 @@ from multiprocessing.dummy import Pool
import time
import os
import logging
import requests
import random
from .. import settings
......@@ -367,6 +366,19 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
def dispatch_worker_thread(*a, **kw):
# If at the point where the relay is actually going to be measured there
# are not any functional destinations or the `end_event` is set, do not
# try to start measuring the relay, since it will fail anyway.
try:
# a[2] is the argument `destinations`
functional_destinations = a[2].functional_destinations
# In case the arguments or the method change, catch the possible exceptions
# but ignore here that there are not destinations.
except (IndexError, TypeError):
log.debug("Wrong argument or attribute.")
functional_destinations = True
if not functional_destinations or settings.end_event.is_set():
return None
return measure_relay(*a, **kw)
......@@ -530,7 +542,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
# At this point, we know the relays that were queued to be measured.
# That does not mean they were actually measured.
log.debug("Attempted to measure %s relays in %s minutes",
num_relays, loop_tdelta)
# In a testing network, exit after first loop
if controller.get_conf('TestingTorNetwork') == '1':
log.info("In a testing network, exiting after the first loop.")
......
......@@ -127,7 +127,9 @@ DESTINATION_VERIFY_CERTIFICATE = True
# whether the destination is functional or not.
NUM_DESTINATION_ATTEMPTS_STORED = 10
# Time to wait before trying again a destination that wasn't functional.
DELTA_SECONDS_RETRY_DESTINATION = 60 * 60 * 3
# Because intermitent failures with CDN destinations, start trying again
# after 5 min.
DELTA_SECONDS_RETRY_DESTINATION = 60 * 5
# Number of consecutive times a destination can fail before considering it
# not functional.
MAX_NUM_DESTINATION_FAILURES = 3
......
......@@ -229,12 +229,19 @@ class Destination:
the time to try again is incremented and resetted as soon as the
destination does not fail.
"""
# NOTE: does a destination fail because several threads are using
# it at the same time?
# If a destination fails for 1 minute and there're 3 threads, the
# 3 threads will fail.
# Failed the last X consecutive times
if self._are_last_attempts_failures():
# The log here will appear in all the the queued
# relays and threads.
log.warning("The last %s times the destination %s failed."
"It will not be used again in %s hours.\n",
"Disabled for %s minutes.",
self._max_num_failures, self.url,
self._delta_seconds_retry / 60 / 60)
self._delta_seconds_retry / 60)
log.warning("Please, add more destinations or increment the "
"number of maximum number of consecutive failures "
"in the configuration.")
......@@ -285,19 +292,22 @@ class Destination:
return p
@staticmethod
def from_config(conf_section, max_dl):
def from_config(conf_section, max_dl, number_threads):
assert 'url' in conf_section
url = conf_section['url']
verify = _parse_verify_option(conf_section)
try:
max_num_failures = conf_section.getint('max_num_failures')
# Because one a destination fails, all the threads that are using
# it at that moment will fail too, multiply by the number of
# threads.
max_num_failures = (conf_section.getint('max_num_failures')
or MAX_NUM_DESTINATION_FAILURES)
except ValueError:
log.warning("Configuration max_num_failures is wrong, ignoring.")
max_num_failures = None
if max_num_failures:
return Destination(url, max_dl, verify, max_num_failures)
else:
return Destination(url, max_dl, verify)
# If the operator did not setup the number, set to the default.
max_num_failures = MAX_NUM_DESTINATION_FAILURES
max_num_failures *= number_threads
return Destination(url, max_dl, verify, max_num_failures)
class DestinationList:
......@@ -331,7 +341,10 @@ class DestinationList:
log.debug('Loading info for destination %s', key)
dests.append(Destination.from_config(
conf[dest_sec],
conf.getint('scanner', 'max_download_size')))
# Multiply by the number of threads since all the threads will
# fail at the same time.
conf.getint('scanner', 'max_download_size'),
conf.getint('scanner', 'measurement_threads')))
if len(dests) < 1:
msg = 'No enabled destinations in config. Please see '\
'docs/source/man_sbws.ini.rst" or "man 5 sbws.ini" ' \
......
......@@ -5,9 +5,9 @@ from sbws.lib import destination
def test_destination_is_functional():
eight_hours_ago = datetime.utcnow() - timedelta(hours=8)
four_hours_ago = datetime.utcnow() - timedelta(hours=4)
two_hours_ago = datetime.utcnow() - timedelta(hours=2)
eleven_mins_ago = datetime.utcnow() - timedelta(minutes=11)
six_mins_ago = datetime.utcnow() - timedelta(minutes=6)
four_mins_ago = datetime.utcnow() - timedelta(minutes=4)
d = destination.Destination('unexistenturl', 0, False)
assert d.is_functional()
......@@ -29,19 +29,19 @@ def test_destination_is_functional():
d.add_failure()
d.add_failure()
# And last failure was 2h ago
d.add_failure(two_hours_ago)
d.add_failure(four_mins_ago)
assert d._are_last_attempts_failures()
assert not d._is_last_try_old_enough()
assert not d.is_functional()
# But if the last failure was 4h ago, try to use it again
# And last failure was 4h ago
d.add_failure(four_hours_ago)
d.add_failure(six_mins_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()
# If last failure was 8h ago, try to use it again again
d.add_failure(eight_hours_ago)
d.add_failure(eleven_mins_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment