Commit 44489f37 authored by juga's avatar juga
Browse files

Merge branch 'maint-1.1'

parents e7bc95ca 30334218
......@@ -36,7 +36,7 @@ A first solution would be to obtain the git revision at runtime, but:
the git revision of that other repository.
So next solution was to obtain the git revision at build/install time.
To achive this, an script should be call from the installer or at runtime
To achive this, an script should be called from the installer or at runtime
whenever `__version__` needs to be read.
While it could be implemented by us, there're two external tools that achive
......@@ -95,4 +95,92 @@ git or python versions or we find a way to make `setuptools_scm` to detect
the same version at buildtime and runtime.
See `<https://github.com/MartinThoma/MartinThoma.github.io/blob/1235fcdecda4d71b42fc07bfe7db327a27e7bcde/content/2018-11-13-python-package-versions.md>`_
for other comparative versioning python packages.
\ No newline at end of file
for other comparative versioning python packages.
Changing Bandwidth file monitoring KeyValues
--------------------------------------------
In version 1.1.0 we added KeyValues call ``recent_X_count`` and
``relay_X_count`` which implied to modify serveral parts of the code.
We only stored numbers for simpliciy, but then the value of this numbers
accumulate over the time and there is no way to know to which number decrease
since some of the main objects are not recreated at runtime and do not have
attributes about when they were created or updated.
The relations between the object do no follow usual one-to-many or many-to-many
relationships either, to be able to induce some numbers from the related
objects.
The only way we could think to solve this is to store list of timestamps,
instead of just numbers, as an attribute in the objects that need to store
some counting.
Where the values of the keys come from?
```````````````````````````````````````
In the file system, there are only two types of files were these values can be
stored:
- the results files in ``datadir``
- the ``state.dat`` file
Because of the structure of the content in the results files, they can store
KeyValues for the relays, but not for the headers, which need to be stored in
the ``state.dat`` file.
The classes that manage these KeyValues are:
``RelayList``:
- recent_consensus_count
- recent_measurement_attempt_count
``RelayPrioritizer``:
- recent_priority_list_count
- recent_priority_relay_count
``Relay`` and ``Result``:
- relay_in_recent_consensus_count
- relay_recent_measurement_attempt_count
- relay_recent_priority_list_count
Transition from numbers to datetimes
````````````````````````````````````
The KeyValues named ``_count`` in the results and the state will be ignored
when sbws is restarted with this change, since they will be written without
``_count`` names in these files json .
We could add code to count this in the transition to this version, but these
numbers are wrong anyway and we don't think it's worth the effort since they
will be correct after 5 days and they have been wrong for long time.
Additionally ``recent_measurement_failure_count`` will be negative, since it's
calculated as ``recent_measurement_attempt_count`` minus all the results.
While the total number of results in the last 5 days is corrrect, the number of
the attempts won't be until 5 days have pass.
Disadvantages
`````````````
``sbws generate``, with 27795 measurement attempts takes 1min instead of a few
seconds.
The same happens with the ``RelayPrioritizer.best_priority``, though so far
that seems ok since it's a python generator in a thread and the measurements
start before it has calculated all the priorities.
The same happens with the ``ResultDump`` that read/write the data in a thread.
Conclussion
```````````
All these changes required lot of effort and are not optimal. It was the way
we could correct and maintain 1.1.0 version.
If a 2.0 version happens, we highly recommend re-design the data structures to
use a database using a well maintained ORM library, which will avoid the
limitations of json files, errors in data types conversions and which is
optimized for the type of counting and statistics we aim to.
.. note:: Documentation about a possible version 2.0 and the steps to change
the code from 1.X needs to be created.
#!/usr/bin/env python3
""""""
import argparse
from sbws.lib.bwfile_health import BwFile
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--file-path", help="Bandwidth file path.")
args = parser.parse_args()
header_health = BwFile.load(args.file_path)
header_health.report
if __name__ == "__main__":
main()
......@@ -211,10 +211,16 @@ def _pick_ideal_second_hop(relay, dest, rl, cont, is_exit):
else rl.non_exits
if not len(candidates):
return None
min_relay_bw = rl.exit_min_bw() if is_exit else rl.non_exit_min_bw()
log.debug('Picking a 2nd hop to measure %s from %d choices. is_exit=%s',
relay.nickname, len(candidates), is_exit)
for min_bw_factor in [2, 1.75, 1.5, 1.25, 1]:
min_bw = relay.consensus_bandwidth * min_bw_factor
# We might have a really slow/new relay. Try to measure it properly by
# using only relays with or above our calculated min_relay_bw (see:
# _calculate_min_bw_second_hop() in relaylist.py).
if min_bw < min_relay_bw:
min_bw = min_relay_bw
new_candidates = stem_utils.only_relays_with_bandwidth(
cont, candidates, min_bw=min_bw)
if len(new_candidates) > 0:
......@@ -515,8 +521,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
break
relay_list.increment_recent_measurement_attempt_count()
target.increment_relay_recent_measurement_attempt_count()
relay_list.increment_recent_measurement_attempt()
target.increment_relay_recent_measurement_attempt()
num_relays += 1
# callback and callback_err must be non-blocking
callback = result_putter(result_dump)
......
......@@ -148,6 +148,26 @@ MAX_NUM_DESTINATION_FAILURES = 3
# destination fail again.
FACTOR_INCREMENT_DESTINATION_RETRY = 2
# Constants to check health KeyValues in the bandwidth file
PERIOD_DAYS = int(MEASUREMENTS_PERIOD / (24 * 60 * 60))
MAX_RECENT_CONSENSUS_COUNT = PERIOD_DAYS * 24 # 120
# XXX: This was only defined in `config.default.ini`, it should be read from
# here.
FRACTION_RELAYS = 0.05
# A priority list currently takes more than 3h, ideally it should only take 1h.
MIN_HOURS_PRIORITY_LIST = 1
# As of 2020, there're less than 7000 relays.
MAX_RELAYS = 8000
# 120
MAX_RECENT_PRIORITY_LIST_COUNT = int(
PERIOD_DAYS * 24 / MIN_HOURS_PRIORITY_LIST
)
MAX_RELAYS_PER_PRIORITY_LIST = int(MAX_RELAYS * FRACTION_RELAYS) # 400
# 48000
MAX_RECENT_PRIORITY_RELAY_COUNT = (
MAX_RECENT_PRIORITY_LIST_COUNT * MAX_RELAYS_PER_PRIORITY_LIST
)
def fail_hard(*a, **kw):
''' Log something ... and then exit as fast as possible '''
......
"""Expected bandwidth file values for KeyValues."""
import logging
from stem import descriptor
from sbws.globals import (
PERIOD_DAYS,
FRACTION_RELAYS,
MAX_RECENT_PRIORITY_RELAY_COUNT, # 48000
MAX_RECENT_PRIORITY_LIST_COUNT, # 120
MAX_RECENT_CONSENSUS_COUNT, # 120
)
from sbws.lib.v3bwfile import HEADER_INT_KEYS, BWLINE_KEYS_V1_4
logging.basicConfig(level=logging.INFO,)
logger = logging.getLogger(__name__)
# Based on observation
MAX_HOURS_PRIORITY_LIST = 5
MIN_RECENT_CONSENSUS_COUNT = PERIOD_DAYS * 12 # 60
MIN_RELAYS = 6000
# 24
MIN_RECENT_PRIORITY_LIST_COUNT = PERIOD_DAYS * 24 / MAX_HOURS_PRIORITY_LIST
MIN_RELAYS_PER_PRIORITY_LIST = int(MIN_RELAYS * FRACTION_RELAYS) # 300
# 7200
MIN_RECENT_PRIORITY_RELAY_COUNT = (
MIN_RECENT_PRIORITY_LIST_COUNT * MIN_RELAYS_PER_PRIORITY_LIST
)
# If the number of attempts is not equal to the number of relays being in the
# priority list, there's a bug.
MIN_RECENT_MEASUREMENT_ATTEMPT_COUNT = MIN_RECENT_PRIORITY_RELAY_COUNT
MAX_RECENT_MEASUREMENT_ATTEMPT_COUNT = MAX_RECENT_PRIORITY_RELAY_COUNT
# noqa
REPORT_TEMPLATE_BWFILE = (
"sum(relay_recent_measurement_attempt_count) "
"<= recent_measurement_attempt_count, "
"{self.is_sum_relay_recent_measurement_attempt_count_lte_recent_measurement_attempt_count}\n" # noqa
)
REPORT_TEMPLATE_BWHEADER = """
Header,
recent_consensus_count >= min, {self.is_consensus_gte_min}
recent_consensus_count <= max, {self.is_consensus_lte_max}
recent_priority_list_count >= min, {self.is_priority_list_gte_min}
recent_priority_list_count <= max, {self.is_priority_list_lte_max}
recent_priority_relay_count >= min, {self.is_priority_relay_gte_min}
recent_priority_relay_count <= max, {self.is_priority_relay_lte_max}
""" + (
"recent_measurement_attempt_count >= min, "
"{self.is_measurement_attempt_gte_min}\n"
"recent_measurement_attempt_count <= max, "
"{self.is_measurement_attempt_lte_max}\n"
"recent_measurement_attempt_count == recent_priority_relay_count, "
"{self.is_attempt_e_priority_relay}\n"
"recent_measurement_attempt_count >= total excluded, "
"{self.is_attempt_gte_failure_exclude}\n"
)
REPORT_TEMPLATE_BWLINES = """
relays correct, {self.are_bwlines_correct}
"""
REPORT_TEMPLATE_BWLINE = """
relay_recent_measurement_attempt_count <= relay_recent_priority_list_count,
{self.is_relay_recent_measurement_attempt_count_lte_relay_recent_priority_list_count}
relay_recent_priority_list_count <= relay_recent_consensus_count,
{self.is_relay_recent_priority_list_count_lte_relay_recent_consensus_count}
"""
class BwFile:
def __init__(self, header, bwlines):
self.header = BwHeader(header)
self.bwlines = [BwLine(line) for line in bwlines]
@classmethod
def load(cls, file_path):
logger.info("Parsing content of %s.", file_path)
document = descriptor.parse_file(file_path)
bwfiles = list(document)
if bwfiles:
# When parsing one file, there is only 1 bwfile
bwfile = bwfiles[0]
return cls(bwfile.header, bwfile.measurements.values())
@property
def sum_relay_recent_measurement_attempt_count(self):
return sum(
[l.relay_recent_measurement_attempt_count for l in self.bwlines]
)
@property
def is_sum_relay_recent_measurement_attempt_count_lte_recent_measurement_attempt_count( # noqa
self,
):
return (
self.sum_relay_recent_measurement_attempt_count
<= self.header.recent_measurement_attempt_count
)
@property
def are_bwlines_correct(self):
return not list(filter(lambda x: not x.is_correct, self.bwlines))
@property
def is_correct(self):
methods = [m for m in dir(self) if m.startswith("is_")]
methods.remove("is_correct")
return not list(filter(lambda x: not getattr(self, x), methods))
@property
def report(self):
print(REPORT_TEMPLATE_BWFILE.format(self=self))
self.header.report
print(REPORT_TEMPLATE_BWLINES.format(self=self))
class BwLine:
def __init__(self, line):
for k, v in line.items():
if k in BWLINE_KEYS_V1_4:
setattr(self, k, int(v))
else:
setattr(self, k, v)
@property
def is_relay_recent_priority_list_count_lte_relay_recent_consensus_count(
self,
):
return (
self.relay_recent_priority_list_count
<= self.relay_in_recent_consensus_count
)
@property
def is_relay_recent_measurement_attempt_count_lte_relay_recent_priority_list_count( # noqa
self,
):
return (
self.relay_recent_measurement_attempt_count
<= self.relay_recent_priority_list_count
)
def is_relay_recent_consensus_count_lte_recent_consensus_count(
self, recent_consensus_count
):
return self.relay_in_recent_consensus_count <= recent_consensus_count
@property
def is_correct(self):
methods = [m for m in dir(self) if m.startswith("is_")]
methods.remove("is_correct")
return not list(filter(lambda x: not getattr(self, x), methods))
@property
def report(self):
print(REPORT_TEMPLATE_BWLINE.format(self=self))
class BwHeader:
def __init__(self, header):
for k, v in header.items():
if k in HEADER_INT_KEYS:
setattr(self, k, int(v))
else:
setattr(self, k, v)
# logger.info(self.__dict__)
@classmethod
def load(cls, file_path):
logger.info("Parsing content of %s.", file_path)
document = descriptor.parse_file(file_path)
bwfiles = list(document)
if bwfiles:
bwfile = bwfiles[0]
return cls(bwfile.header)
@property
def is_consensus_lte_max(self):
return self.recent_consensus_count <= MAX_RECENT_CONSENSUS_COUNT
@property
def is_consensus_gte_min(self):
return self.recent_consensus_count >= MIN_RECENT_CONSENSUS_COUNT
@property
def is_priority_list_lte_max(self):
return (
self.recent_priority_list_count <= MAX_RECENT_PRIORITY_LIST_COUNT
)
@property
def is_priority_list_gte_min(self):
return (
self.recent_priority_list_count >= MIN_RECENT_PRIORITY_LIST_COUNT
)
@property
def is_priority_relay_lte_max(self):
return (
self.recent_priority_relay_count <= MAX_RECENT_PRIORITY_RELAY_COUNT
)
@property
def is_priority_relay_gte_min(self):
return (
self.recent_priority_relay_count >= MIN_RECENT_PRIORITY_RELAY_COUNT
)
@property
def is_measurement_attempt_gte_min(self):
return (
self.recent_measurement_attempt_count
>= MIN_RECENT_MEASUREMENT_ATTEMPT_COUNT
)
@property
def is_measurement_attempt_lte_max(self):
return (
self.recent_measurement_attempt_count
<= MAX_RECENT_MEASUREMENT_ATTEMPT_COUNT
)
@property
def is_attempt_e_priority_relay(self):
return (
self.recent_measurement_attempt_count
== self.recent_priority_relay_count
)
@property
def is_attempt_gte_failure(self):
return (
self.recent_measurement_attempt_count
>= self.recent_measurement_failure_count
)
@property
def total_excluded(self):
return sum(
[
self.recent_measurements_excluded_error_count,
self.recent_measurements_excluded_few_count,
self.recent_measurements_excluded_near_count,
self.recent_measurements_excluded_old_count,
]
)
@property
def total_excluded_failure(self):
return sum(
[self.total_excluded, self.recent_measurement_failure_count]
)
@property
def is_attempt_gte_failure_exclude(self):
return (
self.recent_measurement_attempt_count
>= self.total_excluded_failure
)
@property
def is_correct(self):
methods = [m for m in dir(self) if m.startswith("is_")]
methods.remove("is_correct")
return not list(filter(lambda x: not getattr(self, x), methods))
@property
def report(self):
print(REPORT_TEMPLATE_BWHEADER.format(self=self))
......@@ -45,7 +45,7 @@ class Heartbeat(object):
Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
loops_count = self.state_dict.get('recent_priority_list_count', 0)
loops_count = self.state_dict.count('recent_priority_list')
not_measured_fp_set = self.consensus_fp_set.difference(
self.measured_fp_set
......
......@@ -8,30 +8,17 @@ import random
import logging
from threading import Lock
from ..globals import MEASUREMENTS_PERIOD
from ..util import timestamp
from ..globals import (
MAX_RECENT_CONSENSUS_COUNT,
MAX_RECENT_PRIORITY_RELAY_COUNT,
MAX_RECENT_PRIORITY_LIST_COUNT,
MEASUREMENTS_PERIOD
)
from ..util import timestamp, timestamps
log = logging.getLogger(__name__)
def remove_old_consensus_timestamps(
consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
"""
Remove the consensus timestamps that are older than period for which
the measurements are keep from a list of consensus_timestamps.
:param list consensus_timestamps:
:param int measurements_period:
:returns list: a new list of ``consensus_timestamps``
"""
new_consensus_timestamps = [
t
for t in consensus_timestamps
if not timestamp.is_old(t, measurements_period)
]
return new_consensus_timestamps
def valid_after_from_network_statuses(network_statuses):
"""Obtain the consensus Valid-After datetime from the ``document``
attribute of a ``stem.descriptor.RouterStatusEntryV3``.
......@@ -80,14 +67,20 @@ class Relay:
self._desc = cont.get_server_descriptor(fp, default=None)
except (DescriptorUnavailable, ControllerError) as e:
log.exception("Exception trying to get desc %s", e)
self._consensus_timestamps = []
self._add_consensus_timestamp(timestamp)
self.relay_in_recent_consensus = timestamps.DateTimeSeq(
[], MAX_RECENT_CONSENSUS_COUNT
)
self.update_relay_in_recent_consensus()
# The number of times that a relay is "prioritized" to be measured.
# It is incremented in ``RelayPrioritizer.best_priority``
self.relay_recent_priority_list_count = 0
self.relay_recent_priority_list = timestamps.DateTimeSeq(
[], MAX_RECENT_PRIORITY_LIST_COUNT
)
# The number of times that a relay has been queued to be measured.
# It is incremented in ``scanner.main_loop``
self.relay_recent_measurement_attempt_count = 0
self.relay_recent_measurement_attempt = timestamps.DateTimeSeq(
[], MAX_RECENT_PRIORITY_LIST_COUNT
)
def _from_desc(self, attr):
if not self._desc:
......@@ -174,68 +167,15 @@ class Relay:
@property
def last_consensus_timestamp(self):
if len(self._consensus_timestamps) >= 1:
return self._consensus_timestamps[-1]
return None
return self.relay_in_recent_consensus.last()
def _append_consensus_timestamp_if_later(self, timestamp):
"""Append timestamp to the list of consensus timestamps, if it is later
than the most recent existing timestamp, or there are no timestamps.
Should only be called by _add_consensus_timestamp().
timestamp must not be None, and it must not be zero.
"""
if not timestamp:
log.info('Bad timestamp %s, skipping consensus timestamp '
'update for relay %s', timestamp, self.fingerprint)
return
# The consensus timestamp list was initialized.
if self.last_consensus_timestamp is not None:
# timestamp is more recent than the most recent stored
# consensus timestamp.
if timestamp > self.last_consensus_timestamp:
# Add timestamp
self._consensus_timestamps.append(timestamp)
# The consensus timestamp list was not initialized.
else:
# Add timestamp
self._consensus_timestamps.append(timestamp)
def _add_consensus_timestamp(self, timestamp=None):
"""Add the consensus timestamp in which this relay is present.
"""
# It is possible to access to the relay's consensensus Valid-After
# so believe it, rather than the supplied timestamp
if self.consensus_valid_after is not None:
self._append_consensus_timestamp_if_later(
self.consensus_valid_after
)
elif timestamp:
# Add the arg timestamp.
self._append_consensus_timestamp_if_later(timestamp)
# In any other case
else:
log.warning('Bad timestamp %s, using current time for consensus '
'timestamp update for relay %s',
timestamp, self.fingerprint)
# Add the current datetime
self._append_consensus_timestamp_if_later(
datetime.utcnow().replace(microsecond=0))
def _remove_old_consensus_timestamps(
self, measurements_period=MEASUREMENTS_PERIOD):
self._consensus_timestamps = \
remove_old_consensus_timestamps(
copy.deepcopy(self._consensus_timestamps), measurements_period
)
def update_consensus_timestamps(self, timestamp=None):
self._add_consensus_timestamp(timestamp)
self._remove_old_consensus_timestamps()
def update_relay_in_recent_consensus(self, timestamp=None):
self.relay_in_recent_consensus.update(timestamp)
@property
def relay_in_recent_consensus_count(self):
"""Number of times the relay was in a conensus."""
return len(self._consensus_timestamps)
return len(self.relay_in_recent_consensus)
def can_exit_to_port(self, port):
"""
......@@ -267,19 +207,20 @@ class Relay:
Flag.EXIT in self.flags and
self.can_exit_to_port(port))
def increment_relay_recent_measurement_attempt_count(self):
def increment_relay_recent_measurement_attempt(self):
"""
Increment The number of times that a relay has been queued
to be measured.
It is call from :funf:`~sbws.core.scaner.main_loop`.
"""
# If it was not in the previous measurements version, start counting
if self.relay_recent_measurement_attempt_count is None:
self.relay_recent_measurement_attempt_count = 0
self.relay_recent_measurement_attempt_count += 1
self.relay_recent_measurement_attempt.update()
def increment_relay_recent_priority_list_count(self):
@property
def relay_recent_measurement_attempt_count(self):
return len(self.relay_recent_measurement_attempt)
def increment_relay_recent_priority_list(self):
"""
The number of times that a relay is "prioritized" to be measured.
......@@ -287,9 +228,11 @@ class Relay:
:meth:`~sbws.lib.relayprioritizer.RelayPrioritizer.best_priority`.