Commit 05eb9cdf authored by Ana Custura's avatar Ana Custura
Browse files

Update do_simple analysis param to new do_complete tgen semantics

parent 25bae0dc
......@@ -58,7 +58,7 @@ class Analysis(object):
except:
return None
def analyze(self, do_simple=True, date_filter=None):
def analyze(self, do_complete=False, date_filter=None):
if self.did_analysis:
return
......@@ -70,7 +70,7 @@ class Analysis(object):
if len(filepaths) > 0:
for filepath in filepaths:
logging.info("parsing log file at {0}".format(filepath))
parser.parse(util.DataSource(filepath), do_simple=do_simple)
parser.parse(util.DataSource(filepath), do_complete=do_complete)
if self.nickname is None:
parsed_name = parser.get_name()
......@@ -150,13 +150,13 @@ class Analysis(object):
def subproc_analyze_func(analysis_args):
signal(SIGINT, SIG_IGN) # ignore interrupts
a = analysis_args[0]
do_simple = analysis_args[1]
a.analyze(do_simple=do_simple)
do_complete = analysis_args[1]
a.analyze(do_complete=do_complete)
return a
class ParallelAnalysis(Analysis):
def analyze(self, search_path, do_simple=True, nickname=None, tgen_search_expressions=["tgen.*\.log"],
def analyze(self, search_path, do_complete=False, nickname=None, tgen_search_expressions=["tgen.*\.log"],
torctl_search_expressions=["torctl.*\.log"], num_subprocs=cpu_count()):
pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions)
......@@ -169,7 +169,7 @@ class ParallelAnalysis(Analysis):
a.add_tgen_file(tgen_filepath)
for torctl_filepath in torctl_filepaths:
a.add_torctl_file(torctl_filepath)
analysis_args = [a, do_simple]
analysis_args = [a, do_complete]
analysis_jobs.append(analysis_args)
analyses = None
......@@ -293,7 +293,7 @@ class Transfer(object):
class Parser(object, metaclass=ABCMeta):
@abstractmethod
def parse(self, source, do_simple):
def parse(self, source, do_complete):
pass
@abstractmethod
def get_data(self):
......@@ -321,7 +321,7 @@ class TGenParser(Parser):
# both the filter and the unix timestamp should be in UTC at this point
return util.do_dates_match(self.date_filter, date_to_check)
def __parse_line(self, line, do_simple):
def __parse_line(self, line, do_complete):
if self.name is None and re.search("Initializing traffic generator on host", line) is not None:
self.name = line.strip().split()[11]
......@@ -334,7 +334,7 @@ class TGenParser(Parser):
if not self.__is_date_valid(line_date):
return True
if not do_simple and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None:
if do_complete and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None:
# another run of tgen starts the id over counting up from 1
# if a prev transfer with the same id did not complete, we can be sure it never will
parts = line.strip().split()
......@@ -343,7 +343,7 @@ class TGenParser(Parser):
if transfer_id in self.state:
self.state.pop(transfer_id)
elif not do_simple and re.search("transfer-status", line) is not None:
elif do_complete and re.search("transfer-status", line) is not None:
status = TransferStatusEvent(line)
xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id))
xfer.add_event(status)
......@@ -351,7 +351,7 @@ class TGenParser(Parser):
elif re.search("transfer-complete", line) is not None:
complete = TransferSuccessEvent(line)
if not do_simple:
if do_complete:
xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id))
xfer.add_event(complete)
self.transfers[xfer.id] = xfer.get_data()
......@@ -369,7 +369,7 @@ class TGenParser(Parser):
elif re.search("transfer-error", line) is not None:
error = TransferErrorEvent(line)
if not do_simple:
if do_complete:
xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id))
xfer.add_event(error)
self.transfers[xfer.id] = xfer.get_data()
......@@ -382,12 +382,12 @@ class TGenParser(Parser):
return True
def parse(self, source, do_simple=True):
def parse(self, source, do_complete=False):
source.open()
for line in source:
# ignore line parsing errors
try:
if not self.__parse_line(line, do_simple):
if not self.__parse_line(line, do_complete):
break
except:
logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line))
......@@ -526,7 +526,7 @@ class TorCtlParser(Parser):
def __init__(self, date_filter=None):
''' date_filter should be given in UTC '''
self.do_simple = True
self.do_complete = False
self.bandwidth_summary = {'bytes_read':{}, 'bytes_written':{}}
self.circuits_state = {}
self.circuits = {}
......@@ -580,11 +580,11 @@ class TorCtlParser(Parser):
self.circuits_summary['buildtimes'].append(built - started)
if ended is not None and started is not None:
self.circuits_summary['lifetimes'].append(ended - started)
if not self.do_simple:
if self.do_complete:
self.circuits[cid] = data
self.circuits_state.pop(cid)
elif not self.do_simple and isinstance(event, CircMinorEvent):
elif self.do_complete and isinstance(event, CircMinorEvent):
if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED:
key = "{0}:{1}".format(event.event, event.purpose)
circ.add_event(key, arrival_dt)
......@@ -620,7 +620,7 @@ class TorCtlParser(Parser):
data = strm.get_data()
if data is not None:
if not self.do_simple:
if self.do_complete:
self.streams[sid] = data
self.streams_summary['lifetimes'].setdefault(stream_type, []).append(ended - started)
self.streams_state.pop(sid)
......@@ -664,7 +664,7 @@ class TorCtlParser(Parser):
elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None:
self.boot_succeeded = True
if self.do_simple is False or (self.do_simple is True and re.search("650\sBW", line) is not None):
if self.do_complete or (self.do_complete is False and re.search("650\sBW", line) is not None):
# parse with stem
timestamps, sep, raw_event_str = line.partition(" 650 ")
if sep == '':
......@@ -683,8 +683,8 @@ class TorCtlParser(Parser):
self.__handle_event(event, unix_ts)
return True
def parse(self, source, do_simple=True):
self.do_simple = do_simple
def parse(self, source, do_complete=False):
self.do_complete = do_complete
source.open(newline='\r\n')
for line in source:
# ignore line parsing errors
......
......@@ -281,8 +281,8 @@ files generated by this script will be written""",
analyze_parser.add_argument('-s', '--do-simple-parse',
help="""parse and export only summary statistics rather than full transfer/circuit/stream data""",
action="store_true", dest="do_simple",
default=False)
action="store_false", dest="do_complete",
default=True)
# visualize
visualize_parser = sub_parser.add_parser('visualize', description=DESC_VISUALIZE, help=HELP_VISUALIZE,
......@@ -387,7 +387,7 @@ def analyze(args):
analysis.add_tgen_file(args.tgen_logpath)
if args.torctl_logpath is not None:
analysis.add_torctl_file(args.torctl_logpath)
analysis.analyze(args.do_simple, date_filter=args.date_filter)
analysis.analyze(args.do_complete, date_filter=args.date_filter)
analysis.save(output_prefix=args.prefix, date_prefix=args.date_prefix)
elif args.tgen_logpath is not None and os.path.isdir(args.tgen_logpath) and args.torctl_logpath is not None and os.path.isdir(args.torctl_logpath):
......@@ -396,7 +396,7 @@ def analyze(args):
torctl_logs = reprocessing.collect_logs(args.torctl_logpath, '*torctl.log*')
log_pairs = reprocessing.match(tgen_logs, torctl_logs, args.date_filter)
logging.info("Found {0} matching log pairs to be reprocessed".format(len(log_pairs)))
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_simple)
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_complete)
else:
logging.error("Given paths were an unrecognized mix of file and directory paths, nothing will be analyzed")
......
......@@ -46,21 +46,21 @@ def match(tgen_logs, tor_logs, date_filter):
return log_pairs
def analyze_func(prefix, nick, do_simple, pair):
def analyze_func(prefix, nick, do_complete, pair):
analysis = Analysis(nickname=nick)
logging.info('Analysing pair for date {0}'.format(pair[2]))
analysis.add_tgen_file(pair[0])
analysis.add_torctl_file(pair[1])
analysis.analyze(do_simple=do_simple, date_filter=pair[2])
analysis.analyze(do_complete=do_complete, date_filter=pair[2])
analysis.save(output_prefix=prefix)
return 1
def multiprocess_logs(log_pairs, prefix, nick=None, do_simple=False):
def multiprocess_logs(log_pairs, prefix, nick=None, do_complete=False):
pool = Pool(cpu_count())
analyses = None
try:
func = partial(analyze_func, prefix, nick, do_simple)
func = partial(analyze_func, prefix, nick, do_complete)
mr = pool.map_async(func, log_pairs)
pool.close()
while not mr.ready():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment