Skip to content
Snippets Groups Projects
Commit 3b13e13a authored by Matt Traudt's avatar Matt Traudt
Browse files

Lock ResultDump data; some additional logging/comments; code style

parent 54ab727b
No related branches found
No related tags found
No related merge requests found
......@@ -78,6 +78,7 @@ class CircuitBuilder:
raise PathLengthException()
c = self.controller
assert stem_utils.is_controller_okay(c)
self.log.debug('Building', [p[0:8] for p in path])
for _ in range(0, 3):
try:
circ_id = c.new_circuit(path, await_build=True)
......@@ -179,6 +180,7 @@ class GapsCircuitBuilder(CircuitBuilder):
continue
relay = stem_utils.fp_or_nick_to_relay(self.controller, fp)
if not relay:
self.log.debug('Failed to get descriptor for relay', fp)
return None
new_path.append(relay)
return new_path
......@@ -216,9 +218,10 @@ class GapsCircuitBuilder(CircuitBuilder):
insert_relays = self._random_sample_relays(
num_missing, [r for r in path if r is not None])
if insert_relays is None:
self.log.warn('Problem building a circuit to satisfy',
[r.nickname if r else None for r in path], 'with available '
'relays in the network')
self.log.warn(
'Problem building a circuit to satisfy',
[r.nickname if r else None for r in path], 'with available '
'relays in the network')
return None
assert len(insert_relays) == num_missing
path = [r.fingerprint if r else insert_relays.pop().fingerprint
......
......@@ -4,6 +4,7 @@ import json
from glob import glob
from threading import Thread
from threading import Event
from threading import RLock
from queue import Queue
from queue import Empty
from datetime import date
......@@ -94,16 +95,21 @@ class ResultDump:
self.fresh_days = args.data_period
self.datadir = args.result_directory
self.end_event = end_event
self.data = None
self.data_lock = RLock()
self.thread = Thread(target=self.enter)
self.queue = Queue()
self.thread.start()
def store_result(self, result):
''' Call from ResultDump thread '''
assert isinstance(result, Result)
self.data.append(result)
self.data = self._trim_stale_data(self.data)
with self.data_lock:
self.data.append(result)
self.data = self._trim_stale_data(self.data)
def write_result(self, result):
''' Call from ResultDump thread '''
assert isinstance(result, Result)
dt = date.fromtimestamp(result.time)
ext = '.txt'
......@@ -113,6 +119,7 @@ class ResultDump:
fd.write('{}\n'.format(str(result)))
def _load_data_file(self, fname):
''' Call from ResultDump thread '''
assert os.path.isfile(fname)
d = []
with open(fname, 'rt') as fd:
......@@ -122,6 +129,7 @@ class ResultDump:
return d
def _trim_stale_data(self, in_data):
''' Call from ResultDump thread '''
data = []
oldest_allowed = time.time() - (self.fresh_days*24*60*60)
for result in in_data:
......@@ -130,7 +138,8 @@ class ResultDump:
self.log.debug('Keeping {}/{} data'.format(len(data), len(in_data)))
return data
def load_fresh_data(self):
def _load_fresh_data(self):
''' Call from ResultDump thread '''
data = []
today = date.fromtimestamp(time.time())
# Load a day extra. It's okay: we'll trim it afterward. This should
......@@ -147,7 +156,9 @@ class ResultDump:
return data
def enter(self):
self.data = self.load_fresh_data()
''' Main loop for the ResultDump thread '''
with self.data_lock:
self.data = self._load_fresh_data()
while not (self.end_event.is_set() and self.queue.empty()):
try:
event = self.queue.get(timeout=1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment