Commit 969bfbbf authored by George Kadianakis's avatar George Kadianakis Committed by juga
Browse files

Bake more details into the heartbeat module and out of the main loop.

parent ea37bc9f
......@@ -33,7 +33,7 @@ import requests
import random
from .. import settings
from ..lib import heartbeat
from ..lib.heartbeat import Heartbeat
rng = random.SystemRandom()
log = logging.getLogger(__name__)
......@@ -479,12 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
measured.
"""
# Variable to count total progress in the last days:
# In case it is needed to see which relays are not being measured,
# store their fingerprint, not only their number.
measured_fp_set = set()
measured_percent = 0
main_loop_tstart = time.monotonic()
hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
......@@ -496,6 +491,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
# long, set it here and not outside the loop.
pending_results = []
loop_tstart = time.time()
# Register relay fingerprints to the heartbeat module
hbeat.register_consensus_fprs(relay_list.relays_fingerprints)
for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
......@@ -511,7 +510,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
[args, conf, destinations, circuit_builder, relay_list,
target], {}, callback, callback_err)
pending_results.append(async_result)
measured_fp_set.add(async_result)
# Register this measurement to the heartbeat module
hbeat.register_measured_fpr(target.fingerprint)
# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
......@@ -519,10 +521,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
num_relays_to_measure = len(pending_results)
wait_for_results(num_relays_to_measure, pending_results)
measured_percent = heartbeat.total_measured_percent(
measured_percent, relay_list.relays_fingerprints, measured_fp_set,
main_loop_tstart, conf.getpath('paths', 'state_fname')
)
# Print the heartbeat message
hbeat.print_heartbeat_message()
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
......
......@@ -9,38 +9,61 @@ from ..util.state import State
log = logging.getLogger(__name__)
# NOTE tech-debt: this could go be tracked globally as a singleton
consensus_fp_set = set()
class Heartbeat(object):
"""
Tracks current status of sbws and is capable of printing periodic
information about the current state
"""
def total_measured_percent(measured_percent, relays_fingerprints,
measured_fp_set, main_loop_tstart, state_path):
"""Returns the new percentage of the different relays that were measured.
def __init__(self, state_path):
# Variable to count total progress in the last days:
# In case it is needed to see which relays are not being measured,
# store their fingerprint, not only their number.
self.measured_fp_set = set()
self.consensus_fp_set = set()
self.measured_percent = 0
self.main_loop_tstart = time.monotonic()
This way it can be known whether the scanner is making progress measuring
all the Network.
self.state_dict = State(state_path)
Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
global consensus_fp_set
# NOTE: in a future refactor make State a singleton in __init__.py
state_dict = State(state_path)
loops_count = state_dict.get('recent_priority_list_count', 0)
# Store all the relays seen in all the consensuses.
[consensus_fp_set.add(r) for r in relays_fingerprints]
not_measured_fp_set = consensus_fp_set.difference(measured_fp_set)
main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60
new_measured_percent = round(
len(measured_fp_set) / len(consensus_fp_set) * 100)
log.info("Run %s main loops.", loops_count)
log.info("Measured in total %s (%s%%) unique relays in %s minutes",
len(measured_fp_set), new_measured_percent, main_loop_tdelta)
log.info("%s relays still not measured.", len(not_measured_fp_set))
# The case when it is equal will only happen when all the relays have been
# measured.
if (new_measured_percent <= measured_percent):
log.warning("There is no progress measuring relays!.")
return new_measured_percent
self.previous_measurement_percent = 0
def register_measured_fpr(self, async_result):
self.measured_fp_set.add(async_result)
def register_consensus_fprs(self, relay_fprs):
for r in relay_fprs:
self.consensus_fp_set.add(r)
def print_heartbeat_message(self):
"""Print the new percentage of the different relays that were measured.
This way it can be known whether the scanner is making progress
measuring all the Network.
Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
loops_count = self.state_dict.get('recent_priority_list_count', 0)
not_measured_fp_set = self.consensus_fp_set.difference(
self.measured_fp_set
)
main_loop_tdelta = (time.monotonic() - self.main_loop_tstart) / 60
new_measured_percent = round(
len(self.measured_fp_set) / len(self.consensus_fp_set) * 100
)
log.info("Run %s main loops.", loops_count)
log.info("Measured in total %s (%s%%) unique relays in %s minutes",
len(self.measured_fp_set), new_measured_percent,
main_loop_tdelta)
log.info("%s relays still not measured.", len(not_measured_fp_set))
# The case when it is equal will only happen when all the relays
# have been measured.
if (new_measured_percent <= self.previous_measurement_percent):
log.warning("There is no progress measuring relays!.")
self.previous_measurement_percent = new_measured_percent
"""Unit tests for heartbeat"""
import logging
import time
from sbws.lib import heartbeat
def test_total_measured_percent(conf, caplog):
measured_percent = 0
measured_fp_set = set(['A', 'B'])
main_loop_tstart = time.monotonic()
relays_fingerprints = set(['A', 'B', 'C'])
hbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname'))
hbeat.register_consensus_fprs(['A', 'B', 'C'])
hbeat.register_measured_fpr('A')
hbeat.register_measured_fpr('B')
caplog.set_level(logging.INFO)
new_measured_percent = heartbeat.total_measured_percent(
measured_percent, relays_fingerprints, measured_fp_set,
main_loop_tstart, conf.getpath('paths', 'state_fname')
)
assert new_measured_percent == 67
assert hbeat.previous_measurement_percent == 0
hbeat.print_heartbeat_message()
assert hbeat.previous_measurement_percent == 67
caplog.records[1].getMessage().find("Measured in total 2 (67%)")
caplog.records[2].getMessage().find("1 relays still not measured")
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment