Commit a11531b5 authored by Karsten Loesing's avatar Karsten Loesing
Browse files

Merge branch 'tgen-1.0.0' into develop

parents adece5d5 aea46c56
......@@ -70,21 +70,20 @@ In this case the resulting `tor` binary can be found in `~/tor/src/app/tor` and
### TGen
OnionPerf uses TGen to generate traffic on client and server side for its measurements. Installing dependencies, cloning TGen to a subdirectory in the user's home directory, checking out version 0.0.1, and building TGen is done as follows:
OnionPerf uses TGen to generate traffic on client and server side for its measurements. Installing dependencies, cloning TGen to a subdirectory in the user's home directory, and building TGen is done as follows:
```shell
sudo apt install cmake libglib2.0-dev libigraph0-dev make
cd ~/
git clone https://github.com/shadow/tgen.git
cd tgen/
git checkout -b v0.0.1 v0.0.1
mkdir build
cd build/
cmake ..
make
```
The TGen binary will be contained in `~/tgen/build/tgen`, which is also the path that needs to be passed to OnionPerf's `--tgen` parameter when doing measurements.
The TGen binary will be contained in `~/tgen/build/src/tgen`, which is also the path that needs to be passed to OnionPerf's `--tgen` parameter when doing measurements.
### OnionPerf
......@@ -281,7 +280,7 @@ The PDF output file contains visualizations of the following metrics:
The CSV output file contains the same data that is visualized in the PDF file. It contains the following columns:
- `transfer_id` is the identifier used in the TGen client logs which may be useful to look up more details about a specific measurement.
- `id` is the identifier used in the TGen client logs which may be useful to look up more details about a specific measurement.
- `error_code` is an optional error code if a measurement did not succeed.
- `filesize_bytes` is the requested file size in bytes.
- `label` is the data set label as given in the `--data/-d` parameter to the `visualize` mode.
......
......@@ -4,11 +4,8 @@
See LICENSE for licensing information
'''
import sys, os, re, json, datetime, logging
import os, re, json, datetime, logging
from multiprocessing import Pool, cpu_count
from signal import signal, SIGINT, SIG_IGN
from socket import gethostname
from abc import ABCMeta, abstractmethod
# stem imports
......@@ -16,49 +13,29 @@ from stem import CircEvent, CircStatus, CircPurpose, StreamStatus
from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BandwidthEvent, BuildTimeoutSetEvent
from stem.response import ControlMessage, convert
# tgentools imports
from tgentools.analysis import Analysis, TGenParser
# onionperf imports
from . import util
class Analysis(object):
class OPAnalysis(Analysis):
def __init__(self, nickname=None, ip_address=None):
self.nickname = nickname
self.measurement_ip = ip_address
self.hostname = gethostname().split('.')[0]
self.json_db = {'type':'onionperf', 'version':'2.0', 'data':{}}
self.tgen_filepaths = []
super().__init__(nickname, ip_address)
self.json_db = {'type': 'onionperf', 'version': '3.0', 'data': {}}
self.torctl_filepaths = []
self.date_filter = None
self.did_analysis = False
def add_tgen_file(self, filepath):
self.tgen_filepaths.append(filepath)
def add_torctl_file(self, filepath):
self.torctl_filepaths.append(filepath)
def get_nodes(self):
return list(self.json_db['data'].keys())
def get_tor_bandwidth_summary(self, node, direction):
try:
return self.json_db['data'][node]['tor']['bandwidth_summary'][direction]
except:
return None
def get_tgen_transfers(self, node):
try:
return self.json_db['data'][node]['tgen']['transfers']
except:
return None
def get_tgen_transfers_summary(self, node):
try:
return self.json_db['data'][node]['tgen']['transfers_summary']
except:
return None
def analyze(self, do_simple=True, date_filter=None):
def analyze(self, do_complete=False, date_filter=None):
if self.did_analysis:
return
......@@ -70,7 +47,7 @@ class Analysis(object):
if len(filepaths) > 0:
for filepath in filepaths:
logging.info("parsing log file at {0}".format(filepath))
parser.parse(util.DataSource(filepath), do_simple=do_simple)
parser.parse(util.DataSource(filepath), do_complete=do_complete)
if self.nickname is None:
parsed_name = parser.get_name()
......@@ -85,16 +62,11 @@ class Analysis(object):
self.measurement_ip = "unknown"
self.json_db['data'].setdefault(self.nickname, {'measurement_ip': self.measurement_ip}).setdefault(json_db_key, parser.get_data())
self.json_db['data'][self.nickname]["tgen"].pop("heartbeats")
self.json_db['data'][self.nickname]["tgen"].pop("init_ts")
self.json_db['data'][self.nickname]["tgen"].pop("stream_summary")
self.did_analysis = True
def merge(self, analysis):
for nickname in analysis.json_db['data']:
if nickname in self.json_db['data']:
raise Exception("Merge does not yet support multiple Analysis objects from the same node \
(add multiple files from the same node to the same Analysis object before calling analyze instead)")
else:
self.json_db['data'][nickname] = analysis.json_db['data'][nickname]
def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None):
if filename is None:
......@@ -118,6 +90,19 @@ class Analysis(object):
logging.info("done!")
def get_tgen_streams(self, node):
try:
return self.json_db['data'][node]['tgen']['streams']
except:
return None
def get_tgen_transfers(self, node):
try:
return self.json_db['data'][node]['tgen']['transfers']
except:
return None
@classmethod
def load(cls, filename="onionperf.analysis.json.xz", input_prefix=os.getcwd()):
filepath = os.path.abspath(os.path.expanduser("{0}".format(filename)))
......@@ -139,7 +124,7 @@ class Analysis(object):
if 'type' not in db or 'version' not in db:
logging.warning("'type' or 'version' not present in database")
return None
elif db['type'] != 'onionperf' or str(db['version']) >= '3.':
elif db['type'] != 'onionperf' or str(db['version']) >= '4.':
logging.warning("type or version not supported (type={0}, version={1})".format(db['type'], db['version']))
return None
else:
......@@ -147,153 +132,9 @@ class Analysis(object):
analysis_instance.json_db = db
return analysis_instance
def subproc_analyze_func(analysis_args):
signal(SIGINT, SIG_IGN) # ignore interrupts
a = analysis_args[0]
do_simple = analysis_args[1]
a.analyze(do_simple=do_simple)
return a
class ParallelAnalysis(Analysis):
def analyze(self, search_path, do_simple=True, nickname=None, tgen_search_expressions=["tgen.*\.log"],
torctl_search_expressions=["torctl.*\.log"], num_subprocs=cpu_count()):
pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions)
logging.info("processing input from {0} nodes...".format(len(pathpairs)))
analysis_jobs = []
for (tgen_filepaths, torctl_filepaths) in pathpairs:
a = Analysis()
for tgen_filepath in tgen_filepaths:
a.add_tgen_file(tgen_filepath)
for torctl_filepath in torctl_filepaths:
a.add_torctl_file(torctl_filepath)
analysis_args = [a, do_simple]
analysis_jobs.append(analysis_args)
analyses = None
pool = Pool(num_subprocs if num_subprocs > 0 else cpu_count())
try:
mr = pool.map_async(subproc_analyze_func, analysis_jobs)
pool.close()
while not mr.ready(): mr.wait(1)
analyses = mr.get()
except KeyboardInterrupt:
logging.info("interrupted, terminating process pool")
pool.terminate()
pool.join()
sys.exit()
logging.info("merging {0} analysis results now...".format(len(analyses)))
while analyses is not None and len(analyses) > 0:
self.merge(analyses.pop())
logging.info("done merging results: {0} total nicknames present in json db".format(len(self.json_db['data'])))
class TransferStatusEvent(object):
def __init__(self, line):
self.is_success = False
self.is_error = False
self.is_complete = False
parts = line.strip().split()
self.unix_ts_end = util.timestamp_to_seconds(parts[2])
transport_parts = parts[8].split(',')
self.endpoint_local = transport_parts[2]
self.endpoint_proxy = transport_parts[3]
self.endpoint_remote = transport_parts[4]
transfer_parts = parts[10].split(',')
# for id, combine the time with the transfer num; this is unique for each node,
# as long as the node was running tgen without restarting for 100 seconds or longer
# #self.transfer_id = "{0}-{1}".format(round(self.unix_ts_end, -2), transfer_num)
self.transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count
self.hostname_local = transfer_parts[2]
self.method = transfer_parts[3] # 'GET' or 'PUT'
self.filesize_bytes = int(transfer_parts[4])
self.hostname_remote = transfer_parts[5]
self.error_code = transfer_parts[8].split('=')[1]
self.total_bytes_read = int(parts[11].split('=')[1])
self.total_bytes_write = int(parts[12].split('=')[1])
# the commander is the side that sent the command,
# i.e., the side that is driving the download, i.e., the client side
progress_parts = parts[13].split('=')
self.is_commander = (self.method == 'GET' and 'read' in progress_parts[0]) or \
(self.method == 'PUT' and 'write' in progress_parts[0])
self.payload_bytes_status = int(progress_parts[1].split('/')[0])
self.unconsumed_parts = None if len(parts) < 16 else parts[15:]
self.elapsed_seconds = {}
class TransferCompleteEvent(TransferStatusEvent):
def __init__(self, line):
super(TransferCompleteEvent, self).__init__(line)
self.is_complete = True
i = 0
elapsed_seconds = 0.0
# match up self.unconsumed_parts[0:11] with the events in the transfer_steps enum
for k in ['socket_create', 'socket_connect', 'proxy_init', 'proxy_choice', 'proxy_request',
'proxy_response', 'command', 'response', 'first_byte', 'last_byte', 'checksum']:
# parse out the elapsed time value
keyval = self.unconsumed_parts[i]
i += 1
val = float(int(keyval.split('=')[1]))
if val >= 0.0:
elapsed_seconds = val / 1000000.0 # usecs to secs
self.elapsed_seconds.setdefault(k, elapsed_seconds)
self.unix_ts_start = self.unix_ts_end - elapsed_seconds
del(self.unconsumed_parts)
class TransferSuccessEvent(TransferCompleteEvent):
def __init__(self, line):
super(TransferSuccessEvent, self).__init__(line)
self.is_success = True
class TransferErrorEvent(TransferCompleteEvent):
def __init__(self, line):
super(TransferErrorEvent, self).__init__(line)
self.is_error = True
class Transfer(object):
def __init__(self, tid):
self.id = tid
self.last_event = None
self.payload_progress = {decile:None for decile in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]}
self.payload_bytes = {partial:None for partial in [10240, 20480, 51200, 102400, 204800, 512000, 1048576, 2097152, 5242880]}
def add_event(self, status_event):
progress_frac = float(status_event.payload_bytes_status) / float(status_event.filesize_bytes)
progress = float(status_event.payload_bytes_status)
for partial in sorted(self.payload_bytes.keys()):
if progress >= partial and self.payload_bytes[partial] is None:
self.payload_bytes[partial] = status_event.unix_ts_end
for decile in sorted(self.payload_progress.keys()):
if progress_frac >= decile and self.payload_progress[decile] is None:
self.payload_progress[decile] = status_event.unix_ts_end
self.last_event = status_event
def get_data(self):
e = self.last_event
if e is None or not e.is_complete:
return None
d = e.__dict__
if not e.is_error:
d['elapsed_seconds']['payload_progress'] = {decile: round(self.payload_progress[decile] - e.unix_ts_start, 6) for decile in self.payload_progress if self.payload_progress[decile] is not None}
d['elapsed_seconds']['payload_bytes'] = {partial: round(self.payload_bytes[partial] - e.unix_ts_start, 6) for partial in self.payload_bytes if self.payload_bytes[partial] is not None}
return d
class Parser(object, metaclass=ABCMeta):
@abstractmethod
def parse(self, source, do_simple):
def parse(self, source, do_complete):
pass
@abstractmethod
def get_data(self):
......@@ -302,105 +143,8 @@ class Parser(object, metaclass=ABCMeta):
def get_name(self):
pass
class TGenParser(Parser):
def __init__(self, date_filter=None):
''' date_filter should be given in UTC '''
self.state = {}
self.transfers = {}
self.transfers_summary = {'time_to_first_byte':{}, 'time_to_last_byte':{}, 'errors':{}}
self.name = None
self.date_filter = date_filter
def __is_date_valid(self, date_to_check):
if self.date_filter is None:
# we are not asked to filter, so every date is valid
return True
else:
# we are asked to filter, so the line is only valid if the date matches the filter
# both the filter and the unix timestamp should be in UTC at this point
return util.do_dates_match(self.date_filter, date_to_check)
def __parse_line(self, line, do_simple):
if self.name is None and re.search("Initializing traffic generator on host", line) is not None:
self.name = line.strip().split()[11]
if self.date_filter is not None:
parts = line.split(' ', 3)
if len(parts) < 4: # the 3rd is the timestamp, the 4th is the rest of the line
return True
unix_ts = float(parts[2])
line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
if not self.__is_date_valid(line_date):
return True
if not do_simple and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None:
# another run of tgen starts the id over counting up from 1
# if a prev transfer with the same id did not complete, we can be sure it never will
parts = line.strip().split()
transfer_parts = parts[7].strip().split(',')
transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count
if transfer_id in self.state:
self.state.pop(transfer_id)
elif not do_simple and re.search("transfer-status", line) is not None:
status = TransferStatusEvent(line)
xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id))
xfer.add_event(status)
elif re.search("transfer-complete", line) is not None:
complete = TransferSuccessEvent(line)
if not do_simple:
xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id))
xfer.add_event(complete)
self.transfers[xfer.id] = xfer.get_data()
self.state.pop(complete.transfer_id)
filesize, second = complete.filesize_bytes, int(complete.unix_ts_end)
fb_secs = complete.elapsed_seconds['first_byte'] - complete.elapsed_seconds['command']
lb_secs = complete.elapsed_seconds['last_byte'] - complete.elapsed_seconds['command']
fb_list = self.transfers_summary['time_to_first_byte'].setdefault(filesize, {}).setdefault(second, [])
fb_list.append(fb_secs)
lb_list = self.transfers_summary['time_to_last_byte'].setdefault(filesize, {}).setdefault(second, [])
lb_list.append(lb_secs)
elif re.search("transfer-error", line) is not None:
error = TransferErrorEvent(line)
if not do_simple:
xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id))
xfer.add_event(error)
self.transfers[xfer.id] = xfer.get_data()
self.state.pop(error.transfer_id)
err_code, filesize, second = error.error_code, error.filesize_bytes, int(error.unix_ts_end)
err_list = self.transfers_summary['errors'].setdefault(err_code, {}).setdefault(second, [])
err_list.append(filesize)
return True
def parse(self, source, do_simple=True):
source.open()
for line in source:
# ignore line parsing errors
try:
if not self.__parse_line(line, do_simple):
break
except:
logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line))
continue
source.close()
def get_data(self):
return {'transfers':self.transfers, 'transfers_summary': self.transfers_summary}
def get_name(self):
return self.name
class Stream(object):
class TorStream(object):
def __init__(self, sid):
self.stream_id = sid
self.circuit_id = None
......@@ -460,7 +204,7 @@ class Stream(object):
' '.join(['%s=%s' % (event, arrived_at)
for (event, arrived_at) in sorted(self.elapsed_seconds, key=lambda item: item[1])])))
class Circuit(object):
class TorCircuit(object):
def __init__(self, cid):
self.circuit_id = cid
self.unix_ts_start = None
......@@ -526,7 +270,7 @@ class TorCtlParser(Parser):
def __init__(self, date_filter=None):
''' date_filter should be given in UTC '''
self.do_simple = True
self.do_complete = False
self.bandwidth_summary = {'bytes_read':{}, 'bytes_written':{}}
self.circuits_state = {}
self.circuits = {}
......@@ -543,7 +287,7 @@ class TorCtlParser(Parser):
def __handle_circuit(self, event, arrival_dt):
# first make sure we have a circuit object
cid = int(event.id)
circ = self.circuits_state.setdefault(cid, Circuit(cid))
circ = self.circuits_state.setdefault(cid, TorCircuit(cid))
is_hs_circ = True if event.purpose in (CircPurpose.HS_CLIENT_INTRO, CircPurpose.HS_CLIENT_REND, \
CircPurpose.HS_SERVICE_INTRO, CircPurpose.HS_SERVICE_REND) else False
......@@ -580,11 +324,11 @@ class TorCtlParser(Parser):
self.circuits_summary['buildtimes'].append(built - started)
if ended is not None and started is not None:
self.circuits_summary['lifetimes'].append(ended - started)
if not self.do_simple:
if self.do_complete:
self.circuits[cid] = data
self.circuits_state.pop(cid)
elif not self.do_simple and isinstance(event, CircMinorEvent):
elif self.do_complete and isinstance(event, CircMinorEvent):
if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED:
key = "{0}:{1}".format(event.event, event.purpose)
circ.add_event(key, arrival_dt)
......@@ -597,7 +341,7 @@ class TorCtlParser(Parser):
def __handle_stream(self, event, arrival_dt):
sid = int(event.id)
strm = self.streams_state.setdefault(sid, Stream(sid))
strm = self.streams_state.setdefault(sid, TorStream(sid))
if event.circ_id is not None:
strm.set_circ_id(event.circ_id)
......@@ -620,7 +364,7 @@ class TorCtlParser(Parser):
data = strm.get_data()
if data is not None:
if not self.do_simple:
if self.do_complete:
self.streams[sid] = data
self.streams_summary['lifetimes'].setdefault(stream_type, []).append(ended - started)
self.streams_state.pop(sid)
......@@ -664,7 +408,7 @@ class TorCtlParser(Parser):
elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None:
self.boot_succeeded = True
if self.do_simple is False or (self.do_simple is True and re.search("650\sBW", line) is not None):
if self.do_complete or (self.do_complete is False and re.search("650\sBW", line) is not None):
# parse with stem
timestamps, sep, raw_event_str = line.partition(" 650 ")
if sep == '':
......@@ -683,8 +427,8 @@ class TorCtlParser(Parser):
self.__handle_event(event, unix_ts)
return True
def parse(self, source, do_simple=True):
self.do_simple = do_simple
def parse(self, source, do_complete=False):
self.do_complete = do_complete
source.open(newline='\r\n')
for line in source:
# ignore line parsing errors
......
......@@ -74,7 +74,7 @@ class TorperfModel(GeneratableTGenModel):
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("pause", time="5 minutes")
g.add_node("transfer5m", type="get", protocol="tcp", size="5 MiB", timeout="270 seconds", stallout="0 seconds")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_edge("start", "pause")
......@@ -83,7 +83,7 @@ class TorperfModel(GeneratableTGenModel):
g.add_edge("pause", "pause")
# these are chosen with weighted probability, change edge 'weight' attributes to adjust probability
g.add_edge("pause", "transfer5m")
g.add_edge("pause", "stream5m")
return g
......@@ -103,10 +103,10 @@ class OneshotModel(GeneratableTGenModel):
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy)
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("transfer5m", type="get", protocol="tcp", size="5 MiB", timeout="15 seconds", stallout="10 seconds")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_edge("start", "transfer5m")
g.add_edge("transfer5m", "start")
g.add_edge("start", "stream5m")
g.add_edge("stream5m", "start")
return g
......
......@@ -281,8 +281,8 @@ files generated by this script will be written""",
analyze_parser.add_argument('-s', '--do-simple-parse',
help="""parse and export only summary statistics rather than full transfer/circuit/stream data""",
action="store_true", dest="do_simple",
default=False)
action="store_false", dest="do_complete",
default=True)
# visualize
visualize_parser = sub_parser.add_parser('visualize', description=DESC_VISUALIZE, help=HELP_VISUALIZE,
......@@ -381,13 +381,13 @@ def analyze(args):
logging.warning("No logfile paths were given, nothing will be analyzed")
elif (args.tgen_logpath is None or os.path.isfile(args.tgen_logpath)) and (args.torctl_logpath is None or os.path.isfile(args.torctl_logpath)):
from onionperf.analysis import Analysis
analysis = Analysis(nickname=args.nickname, ip_address=args.ip_address)
from onionperf.analysis import OPAnalysis
analysis = OPAnalysis(nickname=args.nickname, ip_address=args.ip_address)
if args.tgen_logpath is not None:
analysis.add_tgen_file(args.tgen_logpath)
if args.torctl_logpath is not None:
analysis.add_torctl_file(args.torctl_logpath)
analysis.analyze(args.do_simple, date_filter=args.date_filter)
analysis.analyze(args.do_complete, date_filter=args.date_filter)
analysis.save(output_prefix=args.prefix, date_prefix=args.date_prefix)
elif args.tgen_logpath is not None and os.path.isdir(args.tgen_logpath) and args.torctl_logpath is not None and os.path.isdir(args.torctl_logpath):
......@@ -396,20 +396,20 @@ def analyze(args):
torctl_logs = reprocessing.collect_logs(args.torctl_logpath, '*torctl.log*')
log_pairs = reprocessing.match(tgen_logs, torctl_logs, args.date_filter)
logging.info("Found {0} matching log pairs to be reprocessed".format(len(log_pairs)))
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_simple)
reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_complete)
else:
logging.error("Given paths were an unrecognized mix of file and directory paths, nothing will be analyzed")
def visualize(args):
from onionperf.visualization import TGenVisualization
from onionperf.analysis import Analysis
from onionperf.analysis import OPAnalysis
tgen_viz = TGenVisualization()
for (paths, label) in args.datasets:
analyses = []
for path in paths:
analysis = Analysis.load(filename=path)
analysis = OPAnalysis.load(filename=path)
if analysis is not None:
analyses.append(analysis)
tgen_viz.add_dataset(analyses, label)
......
from onionperf.analysis import Analysis
from onionperf.analysis import OPAnalysis
from onionperf import util
from functools import partial
from multiprocessing import Pool, cpu_count
......@@ -46,21 +46,21 @@ def match(tgen_logs, tor_logs, date_filter):
return log_pairs
def analyze_func(prefix, nick, do_simple, pair):
analysis = Analysis(nickname=nick)
def analyze_func(prefix, nick, do_complete, pair):
analysis = OPAnalysis(nickname=nick)
logging.info('Analysing pair for date {0}'.format(pair[2]))
analysis.add_tgen_file(pair[0])
analysis.add_torctl_file(pair[1])
analysis.analyze(do_simple