GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Make models more configurable.

This patch removes the --oneshot subcommand and replaces it with several
new subcommands for OnionPerf's "measure" command:

    --tgen-start-pause          (Initial pause before file transfers.)
    --tgen-num-transfers        (Number of file transfers.)
    --tgen-intertransfer-pause  (Pause in between file transfers.)
    --tgen-transfer-size        (Size of each file transfer.)

By default, OnionPerf continues to run in "continuous" mode.  One can
simulate oneshot mode by running onionperf with the following flags:

    onionperf measure --tgen-num-transfers=1

In addition to the above subcommands, this patch improves the code base
by 1) adding a TGenConf class to hold TGen's configuration and by 2)
adding a TGenModelConf class to hold TGen's traffic model.

This fixes #33432.
parent 251fd9c2
......@@ -15,6 +15,16 @@ from stem.control import Controller
from stem.version import Version, Requirement, get_system_tor_version
from stem import __version__ as stem_version
class TGenConf(object):
"""Represents a TGen configuration, for both client and server."""
def __init__(self, listen_port=None, connect_ip=None, connect_port=None, tor_ctl_port=None, tor_socks_port=None):
self.listen_port = str(listen_port)
self.tor_ctl_port = tor_ctl_port
self.tor_socks_port = tor_socks_port
# TGen clients use connect_ip and connect_port.
self.connect_ip = connect_ip
self.connect_port = connect_port
# onionperf imports
from . import analysis, monitor, model, util
......@@ -173,12 +183,11 @@ def logrotate_thread_task(writables, tgen_writable, torctl_writable, docroot, ni
class Measurement(object):
def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, oneshot, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False):
def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False):
self.tor_bin_path = tor_bin_path
self.tgen_bin_path = tgen_bin_path
self.datadir_path = datadir_path
self.privatedir_path = privatedir_path
self.oneshot = oneshot
self.nickname = nickname
self.threads = None
self.done_event = None
......@@ -190,20 +199,30 @@ class Measurement(object):
self.torserver_conf_file = torserver_conf_file
self.single_onion = single_onion
def run(self, do_onion=True, do_inet=True, client_tgen_listen_port=58888, client_tgen_connect_ip='0.0.0.0', client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000,
server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001):
def run(self, do_onion=True, do_inet=True, tgen_model=None, tgen_client_conf=None, tgen_server_conf=None):
'''
only `server_tgen_listen_port` are "public" and need to be opened on the firewall.
if `client_tgen_connect_port` != `server_tgen_listen_port`, then you should have installed a forwarding rule in the firewall.
only `tgen_server_conf.listen_port` are "public" and need to be opened on the firewall.
if `tgen_client_conf.connect_port` != `tgen_server_conf.listen_port`, then you should have installed a forwarding rule in the firewall.
all ports need to be unique though, and unique among multiple onionperf instances.
here are some sane defaults:
client_tgen_listen_port=58888, client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000,
server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001
tgen_client_conf.listen_port=58888, tgen_client_conf.connect_port=8080, tgen_client_conf.tor_ctl_port=59050, tgen_client_conf.tor_socks_port=59000,
tgen_server_conf.listen_port=8080, tgen_server_conf.tor_ctl_port=59051, tgen_server_conf.tor_socks_port=59001
'''
self.threads = []
self.done_event = threading.Event()
if tgen_client_conf is None:
tgen_client_conf = TGenConf(listen_port=58888,
connect_ip='0.0.0.0',
connect_port=8080,
tor_ctl_port=59050,
tor_socks_port=59000)
if tgen_server_conf is None:
tgen_server_conf = TGenConf(listen_port=8080,
tor_ctl_port=59051,
tor_socks_port=59001)
# if ctrl-c is pressed, shutdown child processes properly
try:
# make sure stem and Tor supports ephemeral HS (version >= 0.2.7.1-alpha)
......@@ -225,52 +244,53 @@ class Measurement(object):
tgen_client_writable, torctl_client_writable = None, None
if do_onion or do_inet:
general_writables.append(self.__start_tgen_server(server_tgen_listen_port))
tgen_model.port = tgen_server_conf.listen_port
general_writables.append(self.__start_tgen_server(tgen_model))
if do_onion:
logging.info("Onion Service private keys will be placed in {0}".format(self.privatedir_path))
# one must not have an open socks port when running a single
# onion service. see tor's man page for more information.
if self.single_onion:
server_tor_socks_port = 0
tor_writable, torctl_writable = self.__start_tor_server(server_tor_ctl_port,
server_tor_socks_port,
{client_tgen_connect_port:server_tgen_listen_port})
tgen_server_conf.tor_socks_port = 0
tor_writable, torctl_writable = self.__start_tor_server(tgen_server_conf.tor_ctl_port,
tgen_server_conf.tor_socks_port,
{tgen_client_conf.connect_port:tgen_server_conf.listen_port})
general_writables.append(tor_writable)
general_writables.append(torctl_writable)
if do_onion or do_inet:
tor_writable, torctl_client_writable = self.__start_tor_client(client_tor_ctl_port, client_tor_socks_port)
tor_writable, torctl_client_writable = self.__start_tor_client(tgen_client_conf.tor_ctl_port, tgen_client_conf.tor_socks_port)
general_writables.append(tor_writable)
server_urls = []
if do_onion and self.hs_v3_service_id is not None:
server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, client_tgen_connect_port))
server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, tgen_client_conf.connect_port))
if do_inet:
connect_ip = client_tgen_connect_ip if client_tgen_connect_ip != '0.0.0.0' else util.get_ip_address()
server_urls.append("{0}:{1}".format(connect_ip, client_tgen_connect_port))
connect_ip = tgen_client_conf.connect_ip if tgen_client_conf.connect_ip != '0.0.0.0' else util.get_ip_address()
server_urls.append("{0}:{1}".format(connect_ip, tgen_client_conf.connect_port))
tgen_model.servers = server_urls
if do_onion or do_inet:
assert len(server_urls) > 0
tgen_client_writable = self.__start_tgen_client(server_urls, client_tgen_listen_port, client_tor_socks_port)
tgen_model.port = tgen_client_conf.listen_port
tgen_model.socks_port = tgen_client_conf.tor_socks_port
tgen_client_writable = self.__start_tgen_client(tgen_model)
self.__start_log_processors(general_writables, tgen_client_writable, torctl_client_writable)
logging.info("Bootstrapping finished, entering heartbeat loop")
time.sleep(1)
if self.oneshot:
logging.info("Onionperf is running in Oneshot mode. It will download a 5M file and shut down gracefully...")
while True:
# TODO add status update of some kind? maybe the number of files in the www directory?
# logging.info("Heartbeat: {0} downloads have completed successfully".format(self.__get_download_count(tgen_client_writable.filename)))
if self.oneshot:
if tgen_model.num_transfers:
downloads = 0
while True:
downloads = self.__get_download_count(tgen_client_writable.filename)
if downloads >= 1:
logging.info("Onionperf has downloaded a 5M file in oneshot mode, and will now shut down.")
break
time.sleep(1)
if downloads >= tgen_model.num_transfers:
logging.info("Onionperf has downloaded %d files and will now shut down." % tgen_model.num_transfers)
break
else:
continue
break
......@@ -320,35 +340,25 @@ class Measurement(object):
logrotate.start()
self.threads.append(logrotate)
def __start_tgen_client(self, server_urls, tgen_port, socks_port):
return self.__start_tgen("client", tgen_port, socks_port, server_urls)
def __start_tgen_client(self, tgen_model_conf):
return self.__start_tgen("client", tgen_model_conf)
def __start_tgen_server(self, tgen_port):
return self.__start_tgen("server", tgen_port)
def __start_tgen_server(self, tgen_model_conf):
return self.__start_tgen("server", tgen_model_conf)
def __start_tgen(self, name, tgen_port, socks_port=None, server_urls=None):
logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_port))
def __start_tgen(self, name, tgen_model_conf):
logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_model_conf.port))
tgen_datadir = "{0}/tgen-{1}".format(self.datadir_path, name)
if not os.path.exists(tgen_datadir): os.makedirs(tgen_datadir)
tgen_confpath = "{0}/tgen.graphml.xml".format(tgen_datadir)
if os.path.exists(tgen_confpath): os.remove(tgen_confpath)
if socks_port is None:
model.ListenModel(tgen_port="{0}".format(tgen_port)).dump_to_file(tgen_confpath)
logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_port))
if tgen_model_conf.socks_port is None:
model.ListenModel(tgen_port="{0}".format(tgen_model_conf.port)).dump_to_file(tgen_confpath)
logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_model_conf.port))
else:
tgen_model_args = {
"tgen_port": "{0}".format(tgen_port),
"tgen_servers": server_urls,
"socksproxy": "127.0.0.1:{0}".format(socks_port)
}
if self.oneshot:
tgen_model = model.OneshotModel(**tgen_model_args)
else:
tgen_model = model.TorperfModel(**tgen_model_args)
tgen_model = model.TorperfModel(tgen_model_conf)
tgen_model.dump_to_file(tgen_confpath)
tgen_logpath = "{0}/onionperf.tgen.log".format(tgen_datadir)
......
......@@ -41,6 +41,21 @@ class TGenLoadableModel(TGenModel):
model_instance = cls(graph)
return model_instance
class TGenModelConf(object):
"""Represents a TGen traffic model configuration."""
def __init__(self, initial_pause=0, num_transfers=1, transfer_size="5 MiB",
continuous_transfers=False, inter_transfer_pause=5, port=None, servers=[],
socks_port=None):
self.initial_pause = initial_pause
self.num_transfers = num_transfers
self.transfer_size = transfer_size
self.continuous_transfers = continuous_transfers
self.inter_transfer_pause = inter_transfer_pause
self.port = port
self.servers = servers
self.socks_port = socks_port
class GeneratableTGenModel(TGenModel, metaclass=ABCMeta):
@abstractmethod
......@@ -58,61 +73,74 @@ class ListenModel(GeneratableTGenModel):
g.add_node("start", serverport=self.tgen_port, loglevel="info", heartbeat="1 minute")
return g
class TorperfModel(GeneratableTGenModel):
def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None):
self.tgen_port = tgen_port
self.tgen_servers = tgen_servers
self.socksproxy = socksproxy
def __init__(self, config):
self.config = config
self.graph = self.generate()
def generate(self):
server_str = ','.join(self.tgen_servers)
server_str = ','.join(self.config.servers)
g = DiGraph()
if self.socksproxy is not None:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy)
if self.config.socks_port is not None:
g.add_node("start",
serverport=self.config.port,
peers=server_str,
loglevel="info",
heartbeat="1 minute",
socksproxy="127.0.0.1:{0}".format(self.config.socks_port))
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("pause", time="5 minutes")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_node("start",
serverport=self.config.port,
peers=server_str,
loglevel="info",
heartbeat="1 minute")
g.add_node("pause", time="%d seconds" % self.config.initial_pause)
g.add_edge("start", "pause")
# after the pause, we start another pause timer while *at the same time* choosing one of
# the file sizes and downloading it from one of the servers in the server pool
g.add_edge("pause", "pause")
# these are chosen with weighted probability, change edge 'weight' attributes to adjust probability
g.add_edge("pause", "stream5m")
return g
class OneshotModel(GeneratableTGenModel):
def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None):
self.tgen_port = tgen_port
self.tgen_servers = tgen_servers
self.socksproxy = socksproxy
self.graph = self.generate()
def generate(self):
server_str = ','.join(self.tgen_servers)
g = DiGraph()
if self.socksproxy is not None:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy)
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_edge("start", "stream5m")
g.add_edge("stream5m", "start")
# "One-shot mode," i.e., onionperf will stop after the given number of
# iterations. The idea is:
# start -> pause -> stream-1 -> pause-1 -> ... -> stream-n -> pause-n -> end
if self.config.num_transfers > 0:
for i in range(self.config.num_transfers):
g.add_node("stream-%d" % i,
sendsize="0",
recvsize=self.config.transfer_size,
timeout="15 seconds",
stallout="10 seconds")
g.add_node("pause-%d" % i,
time="%d seconds" % self.config.inter_transfer_pause)
g.add_edge("stream-%d" % i, "pause-%d" % i)
if i > 0:
g.add_edge("pause-%d" % (i-1), "stream-%d" % i)
g.add_node("end")
g.add_edge("pause", "stream-0")
g.add_edge("pause-%d" % (self.config.num_transfers - 1), "end")
# Continuous mode, i.e., onionperf will not stop. The idea is:
# start -> pause -> stream -> pause
# ^ |
# +-------+
elif self.config.continuous_transfers:
g.add_node("stream",
sendsize="0",
recvsize=self.config.transfer_size,
timeout="15 seconds",
stallout="10 seconds")
g.add_node("pause",
time="%d seconds" % self.config.inter_transfer_pause)
g.add_edge("pause", "stream")
g.add_edge("stream", "pause")
g.add_edge("pause", "stream")
return g
def dump_example_tgen_torperf_model(domain_name, onion_name):
# the server listens on 8888, the client uses Tor to come back directly, and using a hidden serv
server = ListenModel(tgen_port="8888")
......
......@@ -154,11 +154,6 @@ def main():
action="store", dest="tgenpath",
default=util.which("tgen"))
measure_parser.add_argument('--oneshot',
help="""Enables oneshot mode, onionperf closes on successfully downloading a file""",
action="store_true", dest="oneshot",
default=False)
measure_parser.add_argument('--additional-client-conf',
help="""Additional configuration lines for the Tor client, for example bridge lines""",
metavar="CONFIG", type=str,
......@@ -195,6 +190,30 @@ def main():
action="store", dest="tgenconnectport",
default=8080)
measure_parser.add_argument('--tgen-start-pause',
help="""the number of seconds TGen should wait before walking through its action graph""",
metavar="N", type=int,
action="store", dest="tgenstartpause",
default=5)
measure_parser.add_argument('--tgen-intertransfer-pause',
help="""the number of seconds TGen should wait in between two transfers""",
metavar="N", type=int,
action="store", dest="tgenintertransferpause",
default=300)
measure_parser.add_argument('--tgen-transfer-size',
help="""the size of the file transfer that TGen will perform (e.g., '5 MiB' or '10 KiB')""",
metavar="STRING", type=str,
action="store", dest="tgentransfersize",
default="5 MiB")
measure_parser.add_argument('--tgen-num-transfers',
help="""the number of file transfers that TGen will perform""",
metavar="N", type=int,
action="store", dest="tgennumtransfers",
default=0)
onion_or_inet_only_group = measure_parser.add_mutually_exclusive_group()
onion_or_inet_only_group.add_argument('-o', '--onion-only',
......@@ -327,7 +346,8 @@ def monitor(args):
writer.close()
def measure(args):
from onionperf.measurement import Measurement
from onionperf.measurement import Measurement, TGenConf
from onionperf.model import TGenModelConf
# check paths
args.torpath = util.find_path(args.torpath, "tor")
......@@ -347,12 +367,27 @@ def measure(args):
server_tor_ctl_port = util.get_random_free_port()
server_tor_socks_port = util.get_random_free_port()
tgen_client_conf = TGenConf(listen_port=client_tgen_port,
connect_ip=client_connect_ip,
connect_port=client_connect_port,
tor_ctl_port=client_tor_ctl_port,
tor_socks_port=client_tor_socks_port)
tgen_server_conf = TGenConf(listen_port=server_tgen_port,
tor_ctl_port=server_tor_ctl_port,
tor_socks_port=server_tor_socks_port)
tgen_model = TGenModelConf(initial_pause=args.tgenstartpause,
transfer_size=args.tgentransfersize,
num_transfers=args.tgennumtransfers,
continuous_transfers=args.tgennumtransfers == 0,
inter_transfer_pause=args.tgenintertransferpause)
meas = Measurement(args.torpath,
args.tgenpath,
args.prefix,
args.private_prefix,
args.nickname,
args.oneshot,
args.additional_client_conf,
args.torclient_conf_file,
args.torserver_conf_file,
......@@ -360,14 +395,9 @@ def measure(args):
meas.run(do_onion=not args.inet_only,
do_inet=not args.onion_only,
client_tgen_listen_port=client_tgen_port,
client_tgen_connect_ip=client_connect_ip,
client_tgen_connect_port=client_connect_port,
client_tor_ctl_port=client_tor_ctl_port,
client_tor_socks_port=client_tor_socks_port,
server_tgen_listen_port=server_tgen_port,
server_tor_ctl_port=server_tor_ctl_port,
server_tor_socks_port=server_tor_socks_port)
tgen_model=tgen_model,
tgen_client_conf=tgen_client_conf,
tgen_server_conf=tgen_server_conf)
else:
logging.info("Please fix path errors to continue")
......
......@@ -57,8 +57,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config_server = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
meas = measurement.Measurement(None, None, None, None, None, None,
"UseBridges 1\n", None, None)
meas = measurement.Measurement(None, None, None, None, None,
"UseBridges 1\n", None, None, False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......@@ -80,8 +80,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseBridges 1\n"
meas = measurement.Measurement(None, None, None, None, None, None, None,
absolute_data_path("config"), None)
meas = measurement.Measurement(None, None, None, None, None, None,
absolute_data_path("config"), None, False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......@@ -103,8 +103,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
meas = measurement.Measurement(None, None, None, None, None, None, None, None,
absolute_data_path("config"))
meas = measurement.Measurement(None, None, None, None, None, None, None,
absolute_data_path("config"), False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment