Loading sbws/core/scanner.py +15 −1 Original line number Diff line number Diff line ''' Measure the relays. ''' import queue import signal import sys Loading Loading @@ -377,8 +378,21 @@ def _next_expected_amount(expected_amount, result_time, download_times, def result_putter(result_dump): ''' Create a function that takes a single argument -- the measurement result -- and return that function so it can be used by someone else ''' def closure(measurement_result): return result_dump.queue.put(measurement_result) # Since result_dump thread is calling queue.get() every second, # the queue should be full for only 1 second. # This call blocks at maximum timeout seconds. try: result_dump.queue.put(measurement_result, timeout=3) except queue.Full: # The result would be lost, the scanner will continue working. log.warning( "The queue with measurements is full, when adding %s.\n" "It is possible that the thread that get them to " "write them to the disk (ResultDump.enter) is stalled.", measurement_result ) return closure Loading sbws/lib/resultdump.py +16 −1 Original line number Diff line number Diff line Loading @@ -580,7 +580,22 @@ class ResultDump: log.info(msg) def enter(self): ''' Main loop for the ResultDump thread ''' """Main loop for the ResultDump thread. When there are results in the queue, queue.get will get them until there are not anymore or timeout happen. For every result it gets, it process it and store in the filesystem, which takes ~1 millisecond and will not trigger the timeout. It can then store in the filesystem ~1000 results per second. I does not accept any other data type than Results or list of Results, therefore is not possible to put big data types in the queue. If there are not any results in the queue, it waits 1 second and checks again. """ with self.data_lock: self.data = load_recent_results_in_datadir( self.fresh_days, self.datadir) Loading tests/unit/conftest.py +20 −0 Original line number Diff line number Diff line Loading @@ -267,3 +267,23 @@ def sbwshome_success_result_two_relays(sbwshome_only_datadir, conf): write_result_to_datadir(RESULT_SUCCESS2, dd) write_result_to_datadir(RESULT_SUCCESS2, dd) return sbwshome_only_datadir @pytest.fixture(scope='function') def end_event(): import threading return threading.Event() @pytest.fixture(scope='function') def rd(args, conf, end_event): from sbws.lib.resultdump import ResultDump # in Travis the next line gives the error: # TypeError: __init__() takes 3 positional arguments but 4 were given # No idea why. # Returning None to disable the test in case ResultDump can not be # initialized. try: return ResultDump(args, conf, end_event) except TypeError: return None tests/unit/core/test_scanner.py 0 → 100644 +28 −0 Original line number Diff line number Diff line """Unit tests for scanner.py.""" import pytest from sbws.core.scanner import result_putter def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event): if rd is None: pytest.skip("ResultDump is None") # Put one item in the queue callback = result_putter(rd) callback(result_success) assert rd.queue.qsize() == 1 # Make queue maxsize 1, so that it'll be full after the first callback. # The second callback will wait 1 second, then the queue will be empty # again. rd.queue.maxsize = 1 callback(result_success) # after putting 1 result, the queue will be full assert rd.queue.qsize() == 1 assert rd.queue.full() # it's still possible to put another results, because the callback will # wait 1 second and the queue will be empty again. callback(result_success) assert rd.queue.qsize() == 1 assert rd.queue.full() end_event.set() Loading
sbws/core/scanner.py +15 −1 Original line number Diff line number Diff line ''' Measure the relays. ''' import queue import signal import sys Loading Loading @@ -377,8 +378,21 @@ def _next_expected_amount(expected_amount, result_time, download_times, def result_putter(result_dump): ''' Create a function that takes a single argument -- the measurement result -- and return that function so it can be used by someone else ''' def closure(measurement_result): return result_dump.queue.put(measurement_result) # Since result_dump thread is calling queue.get() every second, # the queue should be full for only 1 second. # This call blocks at maximum timeout seconds. try: result_dump.queue.put(measurement_result, timeout=3) except queue.Full: # The result would be lost, the scanner will continue working. log.warning( "The queue with measurements is full, when adding %s.\n" "It is possible that the thread that get them to " "write them to the disk (ResultDump.enter) is stalled.", measurement_result ) return closure Loading
sbws/lib/resultdump.py +16 −1 Original line number Diff line number Diff line Loading @@ -580,7 +580,22 @@ class ResultDump: log.info(msg) def enter(self): ''' Main loop for the ResultDump thread ''' """Main loop for the ResultDump thread. When there are results in the queue, queue.get will get them until there are not anymore or timeout happen. For every result it gets, it process it and store in the filesystem, which takes ~1 millisecond and will not trigger the timeout. It can then store in the filesystem ~1000 results per second. I does not accept any other data type than Results or list of Results, therefore is not possible to put big data types in the queue. If there are not any results in the queue, it waits 1 second and checks again. """ with self.data_lock: self.data = load_recent_results_in_datadir( self.fresh_days, self.datadir) Loading
tests/unit/conftest.py +20 −0 Original line number Diff line number Diff line Loading @@ -267,3 +267,23 @@ def sbwshome_success_result_two_relays(sbwshome_only_datadir, conf): write_result_to_datadir(RESULT_SUCCESS2, dd) write_result_to_datadir(RESULT_SUCCESS2, dd) return sbwshome_only_datadir @pytest.fixture(scope='function') def end_event(): import threading return threading.Event() @pytest.fixture(scope='function') def rd(args, conf, end_event): from sbws.lib.resultdump import ResultDump # in Travis the next line gives the error: # TypeError: __init__() takes 3 positional arguments but 4 were given # No idea why. # Returning None to disable the test in case ResultDump can not be # initialized. try: return ResultDump(args, conf, end_event) except TypeError: return None
tests/unit/core/test_scanner.py 0 → 100644 +28 −0 Original line number Diff line number Diff line """Unit tests for scanner.py.""" import pytest from sbws.core.scanner import result_putter def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event): if rd is None: pytest.skip("ResultDump is None") # Put one item in the queue callback = result_putter(rd) callback(result_success) assert rd.queue.qsize() == 1 # Make queue maxsize 1, so that it'll be full after the first callback. # The second callback will wait 1 second, then the queue will be empty # again. rd.queue.maxsize = 1 callback(result_success) # after putting 1 result, the queue will be full assert rd.queue.qsize() == 1 assert rd.queue.full() # it's still possible to put another results, because the callback will # wait 1 second and the queue will be empty again. callback(result_success) assert rd.queue.qsize() == 1 assert rd.queue.full() end_event.set()