GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still and

Commit fa1c0a4c authored by juga  's avatar juga
Browse files

fix: tests: Add tests loading results

in ResultDump and incrementing relay's monitoring KeyValues.
parent 5c45e321
...@@ -296,14 +296,6 @@ def end_event(): ...@@ -296,14 +296,6 @@ def end_event():
@pytest.fixture(scope='function') @pytest.fixture(scope='function')
def rd(args, conf, end_event): def rd(args, conf_results):
from sbws.lib.resultdump import ResultDump from sbws.lib.resultdump import ResultDump
# in Travis the next line gives the error: return ResultDump(args, conf_results)
# TypeError: __init__() takes 3 positional arguments but 4 were given
# No idea why.
# Returning None to disable the test in case ResultDump can not be
# initialized.
return ResultDump(args, conf, end_event)
except TypeError:
return None
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Unit tests for resultdump.""" """Unit tests for resultdump."""
from sbws.lib.resultdump import trim_results_ip_changed
import datetime
from sbws.lib.relaylist import Relay
from sbws.lib.resultdump import (
def test_trim_results_ip_changed_defaults(resultdict_ip_not_changed): def test_trim_results_ip_changed_defaults(resultdict_ip_not_changed):
...@@ -31,3 +41,57 @@ def test_trim_results_ip_changed_on_changed_ipv6(caplog, ...@@ -31,3 +41,57 @@ def test_trim_results_ip_changed_on_changed_ipv6(caplog,
assert record.levelname == 'WARNING' assert record.levelname == 'WARNING'
assert 'Reseting bandwidth results when IPv6 changes, ' \ assert 'Reseting bandwidth results when IPv6 changes, ' \
'is not yet implemented.\n' in caplog.text 'is not yet implemented.\n' in caplog.text
def test_resultdump(
rd, args, conf_results, controller, router_status, server_descriptor
from sbws import settings
relay = Relay(
r = ResultSuccess(
[], 2000, relay, ["A", "B"], "http://localhost/bw", "scanner_nick",
# Storing the result with `rd.queue.put` will not store the result to disk
# because the thread is not spawned with pytest.
results = rd.results_for_relay(relay)
# It has stored the result
assert 1 == len(results)
# The result has the correct attribute
assert 1 == len(results[0].relay_recent_priority_list)
# Store a second result for the sme relay
r = ResultError(
relay, ["A", "B"], "http://localhost/bw", "scanner_nick",
assert 2 == len(results)
assert 1 == len(results[1].relay_recent_priority_list)
def test_load(datadir):
results = load_result_file(str(datadir.join("results.txt")))
results = [v for values in results.values() for v in values]
r1 = results[1]
assert isinstance(r1, ResultSuccess)
assert isinstance(
r1.relay_recent_measurement_attempt[0], datetime.datetime
assert 2 == len(r1.relay_recent_measurement_attempt)
assert 3 == len(r1.relay_recent_priority_list)
assert 3 == len(r1.relay_in_recent_consensus)
r2 = results[2]
assert isinstance(r2, ResultErrorStream)
assert isinstance(
r2.relay_recent_measurement_attempt[0], datetime.datetime
assert 2 == len(r2.relay_recent_measurement_attempt)
assert 3 == len(r2.relay_recent_priority_list)
assert 3 == len(r2.relay_in_recent_consensus)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment