Commit f20c2ebb authored by Matt Traudt's avatar Matt Traudt
Browse files

Generalize the loading of results from disk, and use the new funcs

parent 55cba83f
from sbws.globals import (fail_hard, is_initted)
from sbws.lib.resultdump import Result
from sbws.lib.resultdump import ResultError
from sbws.lib.resultdump import ResultSuccess
from sbws.lib.resultdump import load_recent_results_in_datadir
from sbws.lib.resultdump import group_results_by_relay
from argparse import ArgumentDefaultsHelpFormatter
from statistics import median
import os
import json
import time
def read_result_file(fname, starting_dict=None):
data = starting_dict if starting_dict else {}
with open(fname, 'rt') as fd:
for line in fd:
d = json.loads(line)
res = Result.from_dict(d)
if isinstance(res, ResultError):
continue
assert isinstance(res, ResultSuccess)
fp = d['fingerprint']
if fp not in data:
data[fp] = []
data[fp].append(res)
return data
class V3BWLine:
def __init__(self, fp, bw, nick, rtts):
self.fp = fp
......@@ -102,12 +86,10 @@ def main(args, conf, log_):
if args.scale_constant < 1:
fail_hard('--scale-constant must be positive')
data_fnames = sorted(os.listdir(datadir), reverse=True)
data_fnames = data_fnames[0:14]
data_fnames = [os.path.join(datadir, f) for f in data_fnames]
data = {}
for fname in data_fnames:
data = read_result_file(fname, data)
fresh_days = conf.getint('general', 'data_period')
results = load_recent_results_in_datadir(
fresh_days, datadir, success_only=True, log_fn=log.debug)
data = group_results_by_relay(results)
data_lines = [result_data_to_v3bw_line(data, fp) for fp in data]
data_lines = sorted(data_lines, key=lambda d: d.bw, reverse=True)
data_lines = scale_lines(args, data_lines)
......
......@@ -2,6 +2,8 @@ from sbws.globals import (fail_hard, is_initted)
from sbws.lib.resultdump import Result
from sbws.lib.resultdump import ResultError
from sbws.lib.resultdump import ResultSuccess
from sbws.lib.resultdump import load_recent_results_in_datadir
from sbws.lib.resultdump import group_results_by_relay
from argparse import ArgumentDefaultsHelpFormatter
import os
import json
......@@ -59,10 +61,8 @@ def main(args, conf, log_):
if not os.path.isdir(datadir):
fail_hard(datadir, 'does not exist', log=log)
data_fnames = sorted(os.listdir(datadir), reverse=True)
data_fnames = data_fnames[0:14]
data_fnames = [os.path.join(datadir, f) for f in data_fnames]
data = {}
for fname in data_fnames:
data = read_result_file(fname, data)
fresh_days = conf.getint('general', 'data_period')
results = load_recent_results_in_datadir(
fresh_days, datadir, success_only=False, log_fn=log.debug)
data = group_results_by_relay(results)
print_stats(data)
......@@ -13,6 +13,76 @@ from enum import Enum
from stem.descriptor.router_status_entry import RouterStatusEntryV3
def group_results_by_relay(results, starting_dict=None):
''' Given a list of Results, sort them by the relay fingerprint that they
measured and return the resulting dict. Optionally start with the given
dict instead of an empty one. '''
data = starting_dict if starting_dict else {}
assert isinstance(data, dict)
assert isinstance(results, list)
for result in results:
assert isinstance(result, Result)
fp = result.fingerprint
if fp not in data:
data[fp] = []
data[fp].append(result)
return data
def load_result_file(fname, success_only=False, log_fn=print):
''' Reads in all lines from the given file, and parses them into Result
structures (or subclasses of Result). Optionally only keeps ResultSuccess.
Returns all kept Results as a list. This function does not care about the
age of the results '''
assert os.path.isfile(fname)
d = []
with open(fname, 'rt') as fd:
for line in fd:
r = Result.from_dict(json.loads(line.strip()))
if success_only and isinstance(r, ResultError):
continue
d.append(r)
log_fn('Read', len(d), 'lines from', fname)
return d
def trim_results(fresh_days, results, log_fn=print):
''' Given a result list, remove all Results that are no longer valid and
return the new list '''
assert isinstance(fresh_days, int)
assert isinstance(results, list)
data_period = fresh_days * 24*60*60
oldest_allowed = time.time() - data_period
out_results = []
for result in results:
if result.time >= oldest_allowed:
out_results.append(result)
log_fn('Keeping {}/{} results'.format(len(out_results), len(results)))
return out_results
def load_recent_results_in_datadir(fresh_days, datadir, success_only=False,
log_fn=print):
''' Given a data directory, read all results files in it that could have
results in them that are still valid. Trim them, and return the valid
Results as a list '''
assert isinstance(fresh_days, int)
assert os.path.isdir(datadir)
results = []
today = date.fromtimestamp(time.time())
data_period = fresh_days + 2
oldest_day = today - timedelta(days=data_period)
working_day = oldest_day
while working_day <= today:
pattern = os.path.join(datadir, '**', '{}*.txt'.format(working_day))
for fname in glob(pattern, recursive=True):
results.extend(load_result_file(fname, success_only=success_only,
log_fn=log_fn))
working_day += timedelta(days=1)
results = trim_results(fresh_days, results, log_fn=log_fn)
return results
class _StrEnum(str, Enum):
pass
......@@ -236,6 +306,7 @@ class ResultDump:
def __init__(self, args, conf, log, end_event):
assert os.path.isdir(conf['paths']['datadir'])
assert isinstance(end_event, Event)
self.conf = conf
self.log = log
self.fresh_days = conf.getint('general', 'data_period')
self.datadir = conf['paths']['datadir']
......@@ -251,7 +322,8 @@ class ResultDump:
assert isinstance(result, Result)
with self.data_lock:
self.data.append(result)
self.data = self._trim_stale_data(self.data)
self.data = trim_results(self.fresh_days, self.data,
self.log.debug)
def write_result(self, result):
''' Call from ResultDump thread '''
......@@ -263,47 +335,11 @@ class ResultDump:
with open(result_fname, 'at') as fd:
fd.write('{}\n'.format(str(result)))
def _load_data_file(self, fname):
''' Call from ResultDump thread '''
assert os.path.isfile(fname)
d = []
with open(fname, 'rt') as fd:
for line in fd:
d.append(Result.from_dict(json.loads(line.strip())))
self.log.info('Read', len(d), 'lines from', fname)
return d
def _trim_stale_data(self, in_data):
''' Call from ResultDump thread '''
data = []
oldest_allowed = time.time() - (self.fresh_days*24*60*60)
for result in in_data:
if result.time >= oldest_allowed:
data.append(result)
self.log.debug('Keeping {}/{} data'.format(len(data), len(in_data)))
return data
def _load_fresh_data(self):
''' Call from ResultDump thread '''
data = []
today = date.fromtimestamp(time.time())
# Load a day extra. It's okay: we'll trim it afterward. This should
# conver any timezone weirdness.
oldest_day = today - timedelta(days=self.fresh_days+1)
working_day = oldest_day
while working_day <= today:
pattern = os.path.join(
self.datadir, '**', '{}*.txt'.format(working_day))
for fname in glob(pattern, recursive=True):
data.extend(self._load_data_file(fname))
working_day += timedelta(days=1)
data = self._trim_stale_data(data)
return data
def enter(self):
''' Main loop for the ResultDump thread '''
with self.data_lock:
self.data = self._load_fresh_data()
self.data = load_recent_results_in_datadir(
self.fresh_days, self.datadir, self.log.debug)
while not (self.end_event.is_set() and self.queue.empty()):
try:
event = self.queue.get(timeout=1)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment