Commit 0a64de95 authored by Ana Custura's avatar Ana Custura
Browse files

Update Analysis and TGenParser classes to use TGenTools

parent 54911cd9
......@@ -16,48 +16,28 @@ from stem import CircEvent, CircStatus, CircPurpose, StreamStatus
from import CircuitEvent, CircMinorEvent, StreamEvent, BandwidthEvent, BuildTimeoutSetEvent
from stem.response import ControlMessage, convert
# tgentools imports
from tgentools.analysis import Analysis, TGenParser
# onionperf imports
from . import util
class Analysis(object):
class OPAnalysis(Analysis):
def __init__(self, nickname=None, ip_address=None):
self.nickname = nickname
self.measurement_ip = ip_address
self.hostname = gethostname().split('.')[0]
super().__init__(nickname, ip_address)
self.json_db = {'type':'onionperf', 'version':'2.0', 'data':{}}
self.tgen_filepaths = []
self.torctl_filepaths = []
self.date_filter = None
self.did_analysis = False
def add_tgen_file(self, filepath):
def add_torctl_file(self, filepath):
def get_nodes(self):
return list(self.json_db['data'].keys())
def get_tor_bandwidth_summary(self, node, direction):
return self.json_db['data'][node]['tor']['bandwidth_summary'][direction]
return None
def get_tgen_transfers(self, node):
return self.json_db['data'][node]['tgen']['transfers']
return None
def get_tgen_transfers_summary(self, node):
return self.json_db['data'][node]['tgen']['transfers_summary']
return None
def analyze(self, do_complete=False, date_filter=None):
if self.did_analysis:
......@@ -84,17 +64,11 @@ class Analysis(object):
if self.measurement_ip is None:
self.measurement_ip = "unknown"
self.json_db['data'].setdefault(self.nickname, {'measurement_ip': self.measurement_ip}).setdefault(json_db_key, parser.get_data())
self.json_db['data'].setdefault(self.nickname, {'measurement_ip' : self.measurement_ip}).setdefault(json_db_key, parser.get_data())
self.did_analysis = True
def merge(self, analysis):
for nickname in analysis.json_db['data']:
if nickname in self.json_db['data']:
raise Exception("Merge does not yet support multiple Analysis objects from the same node \
(add multiple files from the same node to the same Analysis object before calling analyze instead)")
self.json_db['data'][nickname] = analysis.json_db['data'][nickname]
def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None):
if filename is None:
......@@ -147,150 +121,6 @@ class Analysis(object):
analysis_instance.json_db = db
return analysis_instance
def subproc_analyze_func(analysis_args):
signal(SIGINT, SIG_IGN) # ignore interrupts
a = analysis_args[0]
do_complete = analysis_args[1]
return a
class ParallelAnalysis(Analysis):
def analyze(self, search_path, do_complete=False, nickname=None, tgen_search_expressions=["tgen.*\.log"],
torctl_search_expressions=["torctl.*\.log"], num_subprocs=cpu_count()):
pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions)"processing input from {0} nodes...".format(len(pathpairs)))
analysis_jobs = []
for (tgen_filepaths, torctl_filepaths) in pathpairs:
a = Analysis()
for tgen_filepath in tgen_filepaths:
for torctl_filepath in torctl_filepaths:
analysis_args = [a, do_complete]
analyses = None
pool = Pool(num_subprocs if num_subprocs > 0 else cpu_count())
mr = pool.map_async(subproc_analyze_func, analysis_jobs)
while not mr.ready(): mr.wait(1)
analyses = mr.get()
except KeyboardInterrupt:"interrupted, terminating process pool")
sys.exit()"merging {0} analysis results now...".format(len(analyses)))
while analyses is not None and len(analyses) > 0:
self.merge(analyses.pop())"done merging results: {0} total nicknames present in json db".format(len(self.json_db['data'])))
class TransferStatusEvent(object):
def __init__(self, line):
self.is_success = False
self.is_error = False
self.is_complete = False
parts = line.strip().split()
self.unix_ts_end = util.timestamp_to_seconds(parts[2])
transport_parts = parts[8].split(',')
self.endpoint_local = transport_parts[2]
self.endpoint_proxy = transport_parts[3]
self.endpoint_remote = transport_parts[4]
transfer_parts = parts[10].split(',')
# for id, combine the time with the transfer num; this is unique for each node,
# as long as the node was running tgen without restarting for 100 seconds or longer
# #self.transfer_id = "{0}-{1}".format(round(self.unix_ts_end, -2), transfer_num)
self.transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count
self.hostname_local = transfer_parts[2]
self.method = transfer_parts[3] # 'GET' or 'PUT'
self.filesize_bytes = int(transfer_parts[4])
self.hostname_remote = transfer_parts[5]
self.error_code = transfer_parts[8].split('=')[1]
self.total_bytes_read = int(parts[11].split('=')[1])
self.total_bytes_write = int(parts[12].split('=')[1])
# the commander is the side that sent the command,
# i.e., the side that is driving the download, i.e., the client side
progress_parts = parts[13].split('=')
self.is_commander = (self.method == 'GET' and 'read' in progress_parts[0]) or \
(self.method == 'PUT' and 'write' in progress_parts[0])
self.payload_bytes_status = int(progress_parts[1].split('/')[0])
self.unconsumed_parts = None if len(parts) < 16 else parts[15:]
self.elapsed_seconds = {}
class TransferCompleteEvent(TransferStatusEvent):
def __init__(self, line):
super(TransferCompleteEvent, self).__init__(line)
self.is_complete = True
i = 0
elapsed_seconds = 0.0
# match up self.unconsumed_parts[0:11] with the events in the transfer_steps enum
for k in ['socket_create', 'socket_connect', 'proxy_init', 'proxy_choice', 'proxy_request',
'proxy_response', 'command', 'response', 'first_byte', 'last_byte', 'checksum']:
# parse out the elapsed time value
keyval = self.unconsumed_parts[i]
i += 1
val = float(int(keyval.split('=')[1]))
if val >= 0.0:
elapsed_seconds = val / 1000000.0 # usecs to secs
self.elapsed_seconds.setdefault(k, elapsed_seconds)
self.unix_ts_start = self.unix_ts_end - elapsed_seconds
class TransferSuccessEvent(TransferCompleteEvent):
def __init__(self, line):
super(TransferSuccessEvent, self).__init__(line)
self.is_success = True
class TransferErrorEvent(TransferCompleteEvent):
def __init__(self, line):
super(TransferErrorEvent, self).__init__(line)
self.is_error = True
class Transfer(object):
def __init__(self, tid): = tid
self.last_event = None
self.payload_progress = {decile:None for decile in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]}
self.payload_bytes = {partial:None for partial in [10240, 20480, 51200, 102400, 204800, 512000, 1048576, 2097152, 5242880]}
def add_event(self, status_event):
progress_frac = float(status_event.payload_bytes_status) / float(status_event.filesize_bytes)
progress = float(status_event.payload_bytes_status)
for partial in sorted(self.payload_bytes.keys()):
if progress >= partial and self.payload_bytes[partial] is None:
self.payload_bytes[partial] = status_event.unix_ts_end
for decile in sorted(self.payload_progress.keys()):
if progress_frac >= decile and self.payload_progress[decile] is None:
self.payload_progress[decile] = status_event.unix_ts_end
self.last_event = status_event
def get_data(self):
e = self.last_event
if e is None or not e.is_complete:
return None
d = e.__dict__
if not e.is_error:
d['elapsed_seconds']['payload_progress'] = {decile: round(self.payload_progress[decile] - e.unix_ts_start, 6) for decile in self.payload_progress if self.payload_progress[decile] is not None}
d['elapsed_seconds']['payload_bytes'] = {partial: round(self.payload_bytes[partial] - e.unix_ts_start, 6) for partial in self.payload_bytes if self.payload_bytes[partial] is not None}
return d
class Parser(object, metaclass=ABCMeta):
def parse(self, source, do_complete):
......@@ -302,103 +132,6 @@ class Parser(object, metaclass=ABCMeta):
def get_name(self):
class TGenParser(Parser):
def __init__(self, date_filter=None):
''' date_filter should be given in UTC '''
self.state = {}
self.transfers = {}
self.transfers_summary = {'time_to_first_byte':{}, 'time_to_last_byte':{}, 'errors':{}} = None
self.date_filter = date_filter
def __is_date_valid(self, date_to_check):
if self.date_filter is None:
# we are not asked to filter, so every date is valid
return True
# we are asked to filter, so the line is only valid if the date matches the filter
# both the filter and the unix timestamp should be in UTC at this point
return util.do_dates_match(self.date_filter, date_to_check)
def __parse_line(self, line, do_complete):
if is None and"Initializing traffic generator on host", line) is not None: = line.strip().split()[11]
if self.date_filter is not None:
parts = line.split(' ', 3)
if len(parts) < 4: # the 3rd is the timestamp, the 4th is the rest of the line
return True
unix_ts = float(parts[2])
line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
if not self.__is_date_valid(line_date):
return True
if do_complete and"state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None:
# another run of tgen starts the id over counting up from 1
# if a prev transfer with the same id did not complete, we can be sure it never will
parts = line.strip().split()
transfer_parts = parts[7].strip().split(',')
transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count
if transfer_id in self.state:
elif do_complete and"transfer-status", line) is not None:
status = TransferStatusEvent(line)
xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id))
elif"transfer-complete", line) is not None:
complete = TransferSuccessEvent(line)
if do_complete:
xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id))
self.transfers[] = xfer.get_data()
filesize, second = complete.filesize_bytes, int(complete.unix_ts_end)
fb_secs = complete.elapsed_seconds['first_byte'] - complete.elapsed_seconds['command']
lb_secs = complete.elapsed_seconds['last_byte'] - complete.elapsed_seconds['command']
fb_list = self.transfers_summary['time_to_first_byte'].setdefault(filesize, {}).setdefault(second, [])
lb_list = self.transfers_summary['time_to_last_byte'].setdefault(filesize, {}).setdefault(second, [])
elif"transfer-error", line) is not None:
error = TransferErrorEvent(line)
if do_complete:
xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id))
self.transfers[] = xfer.get_data()
err_code, filesize, second = error.error_code, error.filesize_bytes, int(error.unix_ts_end)
err_list = self.transfers_summary['errors'].setdefault(err_code, {}).setdefault(second, [])
return True
def parse(self, source, do_complete=False):
for line in source:
# ignore line parsing errors
if not self.__parse_line(line, do_complete):
logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line))
def get_data(self):
return {'transfers':self.transfers, 'transfers_summary': self.transfers_summary}
def get_name(self):
class TorStream(object):
def __init__(self, sid):
