Commit a2ef62c2 authored by Karsten Loesing's avatar Karsten Loesing
Browse files

Merge branch 'task-40005' into develop

parents c761ac65 c1cd60e0
......@@ -10,7 +10,7 @@ from abc import ABCMeta, abstractmethod
# stem imports
from stem import CircEvent, CircStatus, CircPurpose, StreamStatus
from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BandwidthEvent, BuildTimeoutSetEvent
from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BuildTimeoutSetEvent
from stem.response import ControlMessage, convert
# tgentools imports
......@@ -29,25 +29,19 @@ class OPAnalysis(Analysis):
def add_torctl_file(self, filepath):
self.torctl_filepaths.append(filepath)
def get_tor_bandwidth_summary(self, node, direction):
try:
return self.json_db['data'][node]['tor']['bandwidth_summary'][direction]
except:
return None
def analyze(self, do_complete=False, date_filter=None):
def analyze(self, date_filter=None):
if self.did_analysis:
return
self.date_filter = date_filter
tgen_parser = TGenParser(date_filter=self.date_filter)
super().analyze(do_complete=True)
torctl_parser = TorCtlParser(date_filter=self.date_filter)
for (filepaths, parser, json_db_key) in [(self.tgen_filepaths, tgen_parser, 'tgen'), (self.torctl_filepaths, torctl_parser, 'tor')]:
for (filepaths, parser, json_db_key) in [(self.torctl_filepaths, torctl_parser, 'tor')]:
if len(filepaths) > 0:
for filepath in filepaths:
logging.info("parsing log file at {0}".format(filepath))
parser.parse(util.DataSource(filepath), do_complete=do_complete)
parser.parse(util.DataSource(filepath))
if self.nickname is None:
parsed_name = parser.get_name()
......@@ -134,7 +128,7 @@ class OPAnalysis(Analysis):
class Parser(object, metaclass=ABCMeta):
@abstractmethod
def parse(self, source, do_complete):
def parse(self, source):
pass
@abstractmethod
def get_data(self):
......@@ -270,14 +264,10 @@ class TorCtlParser(Parser):
def __init__(self, date_filter=None):
''' date_filter should be given in UTC '''
self.do_complete = False
self.bandwidth_summary = {'bytes_read':{}, 'bytes_written':{}}
self.circuits_state = {}
self.circuits = {}
self.circuits_summary = {'buildtimes':[], 'lifetimes':[]}
self.streams_state = {}
self.streams = {}
self.streams_summary = {'lifetimes':{}}
self.name = None
self.boot_succeeded = False
self.build_timeout_last = None
......@@ -320,15 +310,10 @@ class TorCtlParser(Parser):
data = circ.get_data()
if data is not None:
if built is not None and started is not None and len(data['path']) == 3:
self.circuits_summary['buildtimes'].append(built - started)
if ended is not None and started is not None:
self.circuits_summary['lifetimes'].append(ended - started)
if self.do_complete:
self.circuits[cid] = data
self.circuits[cid] = data
self.circuits_state.pop(cid)
elif self.do_complete and isinstance(event, CircMinorEvent):
elif isinstance(event, CircMinorEvent):
if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED:
key = "{0}:{1}".format(event.event, event.purpose)
circ.add_event(key, arrival_dt)
......@@ -364,15 +349,9 @@ class TorCtlParser(Parser):
data = strm.get_data()
if data is not None:
if self.do_complete:
self.streams[sid] = data
self.streams_summary['lifetimes'].setdefault(stream_type, []).append(ended - started)
self.streams[sid] = data
self.streams_state.pop(sid)
def __handle_bw(self, event, arrival_dt):
self.bandwidth_summary['bytes_read'][int(arrival_dt)] = event.read
self.bandwidth_summary['bytes_written'][int(arrival_dt)] = event.written
def __handle_buildtimeout(self, event, arrival_dt):
self.build_timeout_last = event.timeout
self.build_quantile_last = event.quantile
......@@ -382,8 +361,6 @@ class TorCtlParser(Parser):
self.__handle_circuit(event, arrival_dt)
elif isinstance(event, StreamEvent):
self.__handle_stream(event, arrival_dt)
elif isinstance(event, BandwidthEvent):
self.__handle_bw(event, arrival_dt)
elif isinstance(event, BuildTimeoutSetEvent):
self.__handle_buildtimeout(event, arrival_dt)
......@@ -408,27 +385,26 @@ class TorCtlParser(Parser):
elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None:
self.boot_succeeded = True
if self.do_complete or (self.do_complete is False and re.search("650\sBW", line) is not None):
# parse with stem
timestamps, sep, raw_event_str = line.partition(" 650 ")
if sep == '':
return True
# parse with stem
timestamps, sep, raw_event_str = line.partition(" 650 ")
if sep == '':
return True
# event.arrived_at is also available but at worse granularity
unix_ts = float(timestamps.strip().split()[2])
# event.arrived_at is also available but at worse granularity
unix_ts = float(timestamps.strip().split()[2])
# check if we should ignore the line
line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
if not self.__is_date_valid(line_date):
return True
# check if we should ignore the line
line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
if not self.__is_date_valid(line_date):
return True
event = ControlMessage.from_str("{0} {1}".format(sep.strip(), raw_event_str))
convert('EVENT', event)
self.__handle_event(event, unix_ts)
event = ControlMessage.from_str("{0} {1}".format(sep.strip(), raw_event_str))
convert('EVENT', event)
self.__handle_event(event, unix_ts)
return True
def parse(self, source, do_complete=False):
self.do_complete = do_complete
def parse(self, source):
source.open(newline='\r\n')
for line in source:
# ignore line parsing errors
......@@ -442,9 +418,7 @@ class TorCtlParser(Parser):
source.close()
def get_data(self):
return {'circuits': self.circuits, 'circuits_summary': self.circuits_summary,
'streams':self.streams, 'streams_summary': self.streams_summary,
'bandwidth_summary': self.bandwidth_summary}
return {'circuits': self.circuits, 'streams': self.streams}
def get_name(self):
return self.name
......@@ -157,7 +157,7 @@ def logrotate_thread_task(writables, tgen_writable, torctl_writable, docroot, ni
anal.add_torctl_file(torctl_writable.rotate_file(filename_datetime=next_midnight))
# run the analysis, i.e. parse the files
anal.analyze(do_simple=False)
anal.analyze()
# save the results in onionperf json format in the www docroot
anal.save(output_prefix=docroot, do_compress=True, date_prefix=next_midnight.date())
......
......@@ -279,11 +279,6 @@ files generated by this script will be written""",
action="store", dest="date_prefix",
default=None)
analyze_parser.add_argument('-s', '--do-simple-parse',
help="""parse and export only summary statistics rather than full transfer/circuit/stream data""",
action="store_false", dest="do_complete",
default=True)
# visualize
visualize_parser = sub_parser.add_parser('visualize', description=DESC_VISUALIZE, help=HELP_VISUALIZE,
formatter_class=my_formatter_class)
......@@ -387,7 +382,7 @@ def analyze(args):
analysis.add_tgen_file(args.tgen_logpath)
if args.torctl_logpath is not None:
analysis.add_torctl_file(args.torctl_logpath)
analysis.analyze(args.do_complete, date_filter=args.date_filter)
analysis.analyze(date_filter=args.date_filter)
analysis.save(output_prefix=args.prefix, date_prefix=args.date_prefix)
elif args.tgen_logpath is not None and os.path.isdir(args.tgen_logpath) and args.torctl_logpath is not None and os.path.isdir(args.torctl_logpath):
......@@ -396,7 +391,7 @@ def analyze(args):
torctl_logs = reprocessing.collect_logs(args.torctl_logpath, '*torctl.log*')
log_pairs = reprocessing.match(tgen_logs, torctl_logs, args.date_filter)
logging.info("Found {0} matching log pairs to be reprocessed".format(len(log_pairs)))
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_complete)
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname)
else:
logging.error("Given paths were an unrecognized mix of file and directory paths, nothing will be analyzed")
......
......@@ -46,21 +46,21 @@ def match(tgen_logs, tor_logs, date_filter):
return log_pairs
def analyze_func(prefix, nick, do_complete, pair):
def analyze_func(prefix, nick, pair):
analysis = OPAnalysis(nickname=nick)
logging.info('Analysing pair for date {0}'.format(pair[2]))
analysis.add_tgen_file(pair[0])
analysis.add_torctl_file(pair[1])
analysis.analyze(do_complete=do_complete, date_filter=pair[2])
analysis.analyze(date_filter=pair[2])
analysis.save(output_prefix=prefix)
return 1
def multiprocess_logs(log_pairs, prefix, nick=None, do_complete=False):
def multiprocess_logs(log_pairs, prefix, nick=None):
pool = Pool(cpu_count())
analyses = None
try:
func = partial(analyze_func, prefix, nick, do_complete)
func = partial(analyze_func, prefix, nick)
mr = pool.map_async(func, log_pairs)
pool.close()
while not mr.ready():
......
......@@ -61,7 +61,7 @@ def test_log_match_with_wrong_filter_date():
def test_analyze_func_json():
pair = (DATA_DIR + 'logs/onionperf_2019-01-10_23:59:59.tgen.log', DATA_DIR + 'logs/onionperf_2019-01-10_23:59:59.torctl.log', datetime.datetime(2019, 1, 10, 0, 0))
work_dir = tempfile.mkdtemp()
reprocessing.analyze_func(work_dir, None, False, pair)
reprocessing.analyze_func(work_dir, None, pair)
json_file = os.path.join(work_dir, "2019-01-10.onionperf.analysis.json.xz")
assert(os.path.exists(json_file))
shutil.rmtree(work_dir)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment