Commit 0333fadc authored by juga's avatar juga
Browse files

Merge branch 'ticket28652_rebased'

parents 692e1fac 969bfbbf
......@@ -33,6 +33,7 @@ import requests
import random
from .. import settings
from ..lib.heartbeat import Heartbeat
rng = random.SystemRandom()
log = logging.getLogger(__name__)
......@@ -474,6 +475,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
measured.
"""
hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
# Do not start a new loop if sbws is stopping.
......@@ -484,6 +487,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
# long, set it here and not outside the loop.
pending_results = []
loop_tstart = time.time()
# Register relay fingerprints to the heartbeat module
hbeat.register_consensus_fprs(relay_list.relays_fingerprints)
for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
......@@ -500,6 +507,9 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
target], {}, callback, callback_err)
pending_results.append(async_result)
# Register this measurement to the heartbeat module
hbeat.register_measured_fpr(target.fingerprint)
# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
......@@ -507,6 +517,9 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
num_relays_to_measure = len(pending_results)
wait_for_results(num_relays_to_measure, pending_results)
# Print the heartbeat message
hbeat.print_heartbeat_message()
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
......
"""
Classes and functions to implement a heartbeat system to monitor the progress.
"""
import logging
import time
from ..util.state import State
log = logging.getLogger(__name__)
class Heartbeat(object):
"""
Tracks current status of sbws and is capable of printing periodic
information about the current state
"""
def __init__(self, state_path):
# Variable to count total progress in the last days:
# In case it is needed to see which relays are not being measured,
# store their fingerprint, not only their number.
self.measured_fp_set = set()
self.consensus_fp_set = set()
self.measured_percent = 0
self.main_loop_tstart = time.monotonic()
self.state_dict = State(state_path)
self.previous_measurement_percent = 0
def register_measured_fpr(self, async_result):
self.measured_fp_set.add(async_result)
def register_consensus_fprs(self, relay_fprs):
for r in relay_fprs:
self.consensus_fp_set.add(r)
def print_heartbeat_message(self):
"""Print the new percentage of the different relays that were measured.
This way it can be known whether the scanner is making progress
measuring all the Network.
Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
loops_count = self.state_dict.get('recent_priority_list_count', 0)
not_measured_fp_set = self.consensus_fp_set.difference(
self.measured_fp_set
)
main_loop_tdelta = (time.monotonic() - self.main_loop_tstart) / 60
new_measured_percent = round(
len(self.measured_fp_set) / len(self.consensus_fp_set) * 100
)
log.info("Run %s main loops.", loops_count)
log.info("Measured in total %s (%s%%) unique relays in %s minutes",
len(self.measured_fp_set), new_measured_percent,
main_loop_tdelta)
log.info("%s relays still not measured.", len(not_measured_fp_set))
# The case when it is equal will only happen when all the relays
# have been measured.
if (new_measured_percent <= self.previous_measurement_percent):
log.warning("There is no progress measuring relays!.")
self.previous_measurement_percent = new_measured_percent
......@@ -355,6 +355,13 @@ class RelayList:
def authorities(self):
return self._relays_with_flag(Flag.AUTHORITY)
@property
def relays_fingerprints(self):
# Using relays instead of _relays, so that the list get updated if
# needed, since this method is used to know which fingerprints are in
# the consensus.
return [r.fingerprint for r in self.relays]
def random_relay(self):
return self.rng.choice(self.relays)
......
"""Unit tests for heartbeat"""
import logging
from sbws.lib import heartbeat
def test_total_measured_percent(conf, caplog):
hbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname'))
hbeat.register_consensus_fprs(['A', 'B', 'C'])
hbeat.register_measured_fpr('A')
hbeat.register_measured_fpr('B')
caplog.set_level(logging.INFO)
assert hbeat.previous_measurement_percent == 0
hbeat.print_heartbeat_message()
assert hbeat.previous_measurement_percent == 67
caplog.records[1].getMessage().find("Measured in total 2 (67%)")
caplog.records[2].getMessage().find("1 relays still not measured")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment