Commit 582ddfb0 authored by Rob Jansen's avatar Rob Jansen
Browse files

in progress overhaul

added tor-ctl-logger as monitor mode, parsing analysis mode for
both tgen and tor output, model generation mode for creating
default tgen models, and some util functions
parent 3e370de7
#!/usr/bin/python
import matplotlib; matplotlib.use('Agg') # for systems without X11
from matplotlib.backends.backend_pdf import PdfPages
import sys, os, argparse, pylab, numpy, time
from subprocess import Popen, PIPE
DESCRIPTION="""
A utility to help analyze onionperf output. Currently, this parses tgen transfer complete messages, and plots the results to a PDF file that is saved in the current directory.
"""
pylab.rcParams.update({
'backend': 'PDF',
'font.size': 16,
'figure.max_num_figures' : 50,
'figure.figsize': (6,4.5),
'figure.dpi': 100.0,
'figure.subplot.left': 0.10,
'figure.subplot.right': 0.95,
'figure.subplot.bottom': 0.13,
'figure.subplot.top': 0.92,
'grid.color': '0.1',
'axes.grid' : True,
'axes.titlesize' : 'small',
'axes.labelsize' : 'small',
'axes.formatter.limits': (-4,4),
'xtick.labelsize' : 'small',
'ytick.labelsize' : 'small',
'lines.linewidth' : 2.0,
'lines.markeredgewidth' : 0.5,
'lines.markersize' : 10,
'legend.fontsize' : 'x-small',
'legend.fancybox' : False,
'legend.shadow' : False,
'legend.ncol' : 1.0,
'legend.borderaxespad' : 0.5,
'legend.numpoints' : 1,
'legend.handletextpad' : 0.5,
'legend.handlelength' : 1.6,
'legend.labelspacing' : .75,
'legend.markerscale' : 1.0,
'ps.useafm' : True,
'pdf.use14corefonts' : True,
# 'text.usetex' : True,
})
def main():
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
help="""The PATH to the shadow.log file, which may be '-'for STDIN, or may end in '.xz' to enable inline xz decompression""",
metavar="PATH",
action="store", dest="logpath")
args = parser.parse_args()
args.logpath = os.path.abspath(os.path.expanduser(args.logpath))
if args.logpath.endswith("-"):
args.datasource = sys.stdin
elif args.logpath.endswith(".xz"):
args.xzproc = Popen(["xz", "--decompress", "--stdout", args.logpath], stdout=PIPE)
args.datasource = args.xzproc.stdout
else: args.datasource = open(args.logpath, 'r')
for line in args.datasource:
try:
parts = line.strip('\n').split(' ')
parse(args, parts)
except: continue
if 'xzproc' in args: args.xzproc.wait()
page = PdfPages("onionperf.results.{0}.pdf".format(time.strftime("%Y-%m-%d_%H:%M:%S")))
plot(args, page)
page.close()
def parse(args, parts):
if len(parts) < 19: return
if 'transfer-complete' not in parts[5] and 'transfer-error' not in parts[5]: return
ioparts = parts[12].split('=')
iodirection = ioparts[0]
if 'read' not in iodirection: return
tstamp = float(parts[2])
bytes = int(ioparts[1].split('/')[0])
if 'results' not in args: args.results = {'firstbyte':{}, 'lastbyte':{}, 'errors':{}}
if 'transfer-complete' in parts[5]:
cmdtime = int(parts[14].split('=')[1])/1000.0
rsptime = int(parts[15].split('=')[1])/1000.0
fbtime = int(parts[16].split('=')[1])/1000.0
lbtime = int(parts[17].split('=')[1])/1000.0
chktime = int(parts[18].split('=')[1])/1000.0
if bytes not in args.results['firstbyte']: args.results['firstbyte'][bytes] = []
args.results['firstbyte'][bytes].append((tstamp, fbtime-cmdtime))
if bytes not in args.results['lastbyte']: args.results['lastbyte'][bytes] = []
args.results['lastbyte'][bytes].append((tstamp, lbtime-cmdtime))
elif 'transfer-error' in parts[5]:
code = parts[9].strip('()').split('-')[7].split('=')[1]
if code not in args.results['errors']: args.results['errors'][code] = []
args.results['errors'][code].append((tstamp, bytes))
def plot(args, page):
colors = {51200:'r', 1048576:'b', 5242880:'g'}
hours = {}
# first byte, time series
d = {}
for bytes in args.results['firstbyte']:
for (ts, t) in args.results['firstbyte'][bytes]: d[ts] = t
pylab.figure()
keys = sorted(d.keys())
num_hours = (keys[-1]-keys[0])/3600.0
x = [(k - keys[0])/3600.0 for k in keys]
y = [d[k] for k in keys]
for i in x:
if int(i) not in hours: hours[int(i)] = True
y_ma = movingaverage(y, int(num_hours/10.0))
pylab.scatter(x, y, c='k', linewidths=0, s=3.0, alpha=0.5, label='raw')
pylab.plot(x, y_ma, 'r-', label='smoothed')
pylab.xlabel("Time Span (h)")
pylab.xlim(xmin=0, xmax=num_hours)
pylab.ylabel("Download Time (s)")
pylab.ylim(ymin=0, ymax=10.0)
pylab.title("OnionPerf: time to download first byte")
pylab.legend(loc="best")
page.savefig()
pylab.close()
# first byte, cdf
d = {}
for bytes in args.results['firstbyte']:
if bytes not in [51200, 1048576, 5242880]: continue
if bytes not in d: d[bytes] = {}
for (ts, t) in args.results['firstbyte'][bytes]: d[bytes][ts] = t
pylab.figure()
maxx = 0
for bytes in d:
keys = sorted(d[bytes].keys())
num_hours = (keys[-1]-keys[0])/3600.0
vals = [d[bytes][k] for k in keys]
x, y = getcdf(vals)
pylab.plot(x, y, '-', c=colors[bytes], label="{0} KiB".format(int(bytes/1024.0)))
if x[-1] > maxx: maxx = x[-1]
pylab.xlabel("Download Time (s)")
pylab.xlim(xmin=0, xmax=maxx)
pylab.ylabel("Cumulative Fraction")
pylab.ylim(ymin=0, ymax=1.0)
pylab.title("OnionPerf: time to download first byte")
pylab.legend(loc="lower right")
page.savefig()
pylab.close()
# last byte, time series
d = {}
for bytes in args.results['lastbyte']:
if bytes not in [51200, 1048576, 5242880]: continue
if bytes not in d: d[bytes] = {}
for (ts, t) in args.results['lastbyte'][bytes]: d[bytes][ts] = t
pylab.figure()
for bytes in d:
keys = sorted(d[bytes].keys())
num_hours = (keys[-1]-keys[0])/3600.0
x = [(k - keys[0])/3600.0 for k in keys]
y = [d[bytes][k] for k in keys]
y_ma = movingaverage(y, int(num_hours/10.0))
pylab.scatter(x, y, c=colors[bytes], edgecolor=colors[bytes], linewidths=0, s=3.0, alpha=0.5)
pylab.plot(x, y_ma, '-', c=colors[bytes], label="{0} KiB".format(int(bytes/1024.0)))
pylab.xlabel("Time Span (h)")
pylab.xlim(xmin=0, xmax=num_hours)
pylab.ylabel("Download Time (s)")
pylab.ylim(ymin=0, ymax=60.0)
pylab.title("OnionPerf: time to download last byte")
pylab.legend(loc="best")
page.savefig()
pylab.close()
# last byte, cdf
pylab.figure()
maxx = 0
for bytes in d:
keys = sorted(d[bytes].keys())
num_hours = (keys[-1]-keys[0])/3600.0
vals = [d[bytes][k] for k in keys]
x, y = getcdf(vals)
pylab.plot(x, y, '-', c=colors[bytes], label="{0} KiB".format(int(bytes/1024.0)))
if x[-1] > maxx: maxx = x[-1]
pylab.xlabel("Download Time (s)")
pylab.xlim(xmin=0, xmax=maxx)
pylab.ylabel("Cumulative Fraction")
pylab.ylim(ymin=0, ymax=1.0)
pylab.title("OnionPerf: time to download last byte")
pylab.legend(loc="lower right")
page.savefig()
pylab.close()
# errors, time series
d = {51200:{}, 1048576:{}, 5242880:{}}
for h in hours:
for b in d: d[b][int(h)] = 0
mints = None
for code in args.results['errors']:
for (ts, bytes) in args.results['errors'][code]:
if bytes not in d: continue
if mints == None: mints = ts
h = int((ts-mints)/3600.0)
d[bytes][h] += 1
pylab.figure()
maxy = 1
for bytes in d:
x = sorted(d[bytes].keys())
y = [d[bytes][k] for k in x]
if max(y) > maxy: maxy = max(y)
pylab.scatter(x, y, c=colors[bytes], edgecolor=colors[bytes], s=10.0, label="{0} KiB".format(int(bytes/1024.0)))
pylab.xlabel("Time Span (h)")
pylab.xlim(xmin=0, xmax=num_hours)
pylab.ylabel("Number of Errors Per Hour")
pylab.ylim(ymin=0, ymax=maxy)
pylab.title("OnionPerf: number of errors")
pylab.legend(loc="best")
page.savefig()
pylab.close()
# helper - compute the window_size moving average over the data in interval
def movingaverage(interval, window_size):
window = numpy.ones(int(window_size))/float(window_size)
return numpy.convolve(interval, window, 'same')
## helper - cumulative fraction for y axis
def cf(d): return pylab.arange(1.0,float(len(d))+1.0)/float(len(d))
## helper - return step-based CDF x and y values
## only show to the 99th percentile by default
def getcdf(data, shownpercentile=0.99, maxpoints=10000.0):
data.sort()
frac = cf(data)
k = len(data)/maxpoints
x, y, lasty = [], [], 0.0
for i in xrange(int(round(len(data)*shownpercentile))):
if i % k > 1.0: continue
assert not numpy.isnan(data[i])
x.append(data[i])
y.append(lasty)
x.append(data[i])
y.append(frac[i])
lasty = frac[i]
return x, y
if __name__ == '__main__': sys.exit(main())
__all__ = [
'analysis',
'measurement',
'model',
'util',
'visualization',
]
'''
Created on Oct 1, 2015
@author: rob
'''
from abc import ABCMeta, abstractmethod, abstractproperty
from __builtin__ import classmethod
import sys, os, re, json
from multiprocessing import Pool, cpu_count
from signal import signal, SIGINT, SIG_IGN
import util
class Analysis(object):
'''
A utility to help analyze onionperf output. Currently, this parses tgen transfer complete messages, and plots the results to a PDF file that is saved in the current directory.
'''
__metaclass__ = ABCMeta
@abstractproperty
def default_filename(self):
pass
@abstractmethod
def new_parser(self, filepath):
pass
def __init__(self):
self.result = None
def analyze_file(self, filepath):
source = util.DataSource(filepath)
parser = self.new_parser(source)
parser.parse()
self.result = parser.merge([])
@classmethod
def __analyze_subproc_func(cls, parser):
signal(SIGINT, SIG_IGN) # ignore interrupts
parser.parse()
return parser
def analyze_directory(self, search_path, search_expressions, num_subprocs=1):
logfilepaths = Analysis.__find_file_paths(search_path, search_expressions)
print >> sys.stderr, "processing input from {0} files...".format(len(logfilepaths))
if num_subprocs <= 0: num_subprocs = cpu_count()
pool = Pool(num_subprocs)
parsers = [self.new_parser(util.DataSource(filepath)) for filepath in logfilepaths]
try:
mr = pool.map_async(Analysis.__analyze_subproc_func, parsers)
pool.close()
while not mr.ready(): mr.wait(1)
parsers = mr.get()
except KeyboardInterrupt:
print >> sys.stderr, "interrupted, terminating process pool"
pool.terminate()
pool.join()
sys.exit()
if parsers is not None and len(parsers) > 0:
parser = parsers.pop()
self.result = parser.merge(parsers)
@classmethod
def __find_file_paths(cls, searchpath, patterns):
paths = []
if searchpath.endswith("/-"): paths.append("-")
else:
for root, dirs, files in os.walk(searchpath):
for name in files:
found = False
fpath = os.path.join(root, name)
fbase = os.path.basename(fpath)
for pattern in patterns:
if re.search(pattern, fbase): found = True
if found: paths.append(fpath)
return paths
def dump_to_file(self, filename, output_prefix=os.getcwd(), compress=True):
if self.result == None:
print >> sys.stderr, "we have no analysis results to dump!"
return
print >> sys.stderr, "dumping stats in {0}".format(output_prefix)
if not os.path.exists(output_prefix): os.makedirs(output_prefix)
filepath = "{0}/{1}".format(output_prefix, filename)
output = util.DataSink(filepath, compress=compress)
output.open()
json.dump(self.result, output.get(), sort_keys=True, separators=(',', ': '), indent=2)
output.close()
print >> sys.stderr, "all done dumping stats to {0}".format(filepath)
@classmethod
def from_file(cls, input_prefix=os.getcwd(), filename=None):
analysis_instance = cls()
if filename is None:
filename = analysis_instance.default_filename
logpath = os.path.abspath(os.path.expanduser("{0}/{1}".format(input_prefix, filename)))
if not os.path.exists(logpath):
print >> sys.stderr, "unable to load analysis results from log file at '{0}'".format(logpath)
if not logpath.endswith(".xz"):
logpath += ".xz"
print >> sys.stderr, "trying '{0}'".format(logpath)
if not os.path.exists(logpath):
print >> sys.stderr, "unable to load analysis results from log file at '{0}'".format(logpath)
return None
s = util.DataSource(logpath)
s.open()
analysis_instance.result = json.load(s.get())
s.close()
return analysis_instance
# data = prune_data(data, skiptime, rskiptime)
# tgendata.append((data['nodes'], label, lfcycle.next()))
class TGenAnalysis(Analysis):
@property
def default_filename(self):
return "stats.tgen.json"
def new_parser(self, source):
return TGenParser(source)
class TorAnalysis(Analysis):
@property
def default_filename(self):
return "stats.tor.json"
def new_parser(self, source):
return TorParser(source)
class Parser(object):
__metaclass__ = ABCMeta
@abstractmethod
def parse(self, source):
pass
@abstractmethod
def merge(self, parsers):
pass
class TGenParser(Parser):
def __init__(self, source):
self.source = source
self.data = {'firstbyte':{}, 'lastbyte':{}, 'errors':{}}
self.parsed_name = None
self.num_successes = 0
self.num_errors = 0
def parse(self):
self.source.open()
for line in self.source.get():
if self.parsed_name is None and re.search("Initializing traffic generator on host", line) is not None:
self.parsed_name = line.strip().split()[11]
elif re.search("transfer-complete", line) is not None or re.search("transfer-error", line) is not None:
parts = line.strip().split()
if len(parts) < 26: continue
sim_seconds = util.timestamp_to_seconds(parts[2])
second = int(sim_seconds)
ioparts = parts[13].split('=')
iodirection = ioparts[0]
if 'read' not in iodirection: return None # this is a server, do we want its stats?
bytes = int(ioparts[1].split('/')[0])
if 'transfer-complete' in parts[6]:
self.num_successes += 1
cmdtime = int(parts[21].split('=')[1]) / 1000.0
rsptime = int(parts[22].split('=')[1]) / 1000.0
fbtime = int(parts[23].split('=')[1]) / 1000.0
lbtime = int(parts[24].split('=')[1]) / 1000.0
chktime = int(parts[25].split('=')[1]) / 1000.0
if bytes not in self.data['firstbyte']: self.data['firstbyte'][bytes] = {}
if second not in self.data['firstbyte'][bytes]: self.data['firstbyte'][bytes][second] = []
self.data['firstbyte'][bytes][second].append(fbtime - cmdtime)
if bytes not in self.data['lastbyte']: self.data['lastbyte'][bytes] = {}
if second not in self.data['lastbyte'][bytes]: self.data['lastbyte'][bytes][second] = []
self.data['lastbyte'][bytes][second].append(lbtime - cmdtime)
elif 'transfer-error' in parts[6]:
self.num_errors += 1
code = parts[10].strip('()').split('-')[7].split('=')[1]
if code not in self.data['errors']: self.data['errors'][code] = {}
if second not in self.data['errors'][code]: self.data['errors'][code][second] = []
self.data['errors'][code][second].append(bytes)
self.source.close()
def merge(self, parsers):
d = {'nodes':{}}
name_count, noname_count, success_count, error_count = 0, 0, 0, 0
parsers.append(self)
print >> sys.stderr, "merging {0} parsed results now...".format(len(parsers))
for parser in parsers:
if parser is None:
continue
if parser.parsed_name is None:
noname_count += 1
continue
name_count += 1
d['nodes'][parser.parsed_name] = parser.data
success_count += parser.num_successes
error_count += parser.num_errors
print >> sys.stderr, "done merging results: {0} total successes, {1} total errors, {2} files with names, {3} files without names".format(success_count, error_count, name_count, noname_count)
return d
class TorParser(Parser):
def __init__(self, source):
self.source = source
self.data = {'bytes_read':{}, 'bytes_written':{}}
self.name = None
self.boot_succeeded = False
self.total_read = 0
self.total_write = 0
def parse(self):
self.source.open()
for line in self.source.get():
if self.name is None and re.search("Starting torctl program on host", line) is not None:
parts = line.strip().split()
if len(parts) < 11: continue
self.name = parts[10]
elif not self.boot_succeeded and re.search("Bootstrapped 100", line) is not None:
self.boot_succeeded = True
elif self.boot_succeeded and re.search("\s650\sBW\s", line) is not None:
parts = line.strip().split()
if len(parts) < 11: continue
if 'Outbound' in line: print line
second = int(float(parts[2]))
bwr = int(parts[9])
bww = int(parts[10])
if second not in self.data['bytes_read']: self.data['bytes_read'][second] = 0
self.data['bytes_read'][second] += bwr
self.total_read += bwr
if second not in self.data['bytes_written']: self.data['bytes_written'][second] = 0
self.data['bytes_written'][second] += bww
self.total_write += bww
self.source.close()
# XXX this is a hack to try to get the name
# a better approach would be get the Tor nickname, from ctl port?
if self.name is None: self.name = os.path.dirname(self.source.filename)
def merge(self, parsers):
d = {'nodes':{}}
name_count, noname_count, success_count, error_count, total_read, total_write = 0, 0, 0, 0, 0, 0
parsers.append(self)
print >> sys.stderr, "merging {0} parsed results now...".format(len(parsers))
for parser in parsers:
if parser is None:
continue
if parser.name is not None:
name_count += 1
else:
noname_count += 1
continue
if parser.boot_succeeded:
success_count += 1
else:
error_count += 1
print >> sys.stderr, "warning: tor running on host '{0}' did not fully bootstrap".format(parser.name)
continue
d['nodes'][parser.name] = parser.data
total_read += parser.total_read
total_write += parser.total_write
print >> sys.stderr, "done merging results: {0} boot success count, {1} boot error count, {2} files with names, {3} files without names, {4} total bytes read, {5} total bytes written".format(success_count, error_count, name_count, noname_count, total_read, total_write)
return d
'''
from stem.response import ControlMessage, convert
msg = ControlMessage.from_str("650 BW 1532 2656\r\n")
convert('EVENT', msg)
print msg
'''
#!/usr/bin/env python
'''
Created on Oct 1, 2015
@author: rob
'''
import sys, os, shutil, subprocess, multiprocessing, argparse, logging, time
from signal import signal, SIGINT, SIG_IGN, SIGTERM
......@@ -10,107 +14,12 @@ from stem import process, control
from stem.control import EventType, Controller
from stem.util import str_tools