GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Commit c707674b authored by Karsten Loesing's avatar Karsten Loesing

Merge branch 'phw-enhancement-33432-3' into develop

parents dfec0b89 b8f1e5c2
......@@ -2,6 +2,10 @@
- Add `onionperf measure --drop-guards` parameter to use and drop
guards after a given number of hours. Implements #33399.
- Remove the `onionperf measure --oneshot` switch and replace it with
new switches `--tgen-pause-initial`, `--tgen-pause-between`,
`--tgen-transfer-size`, and `--tgen-num-transfers ` to further
configure the generated TGen model.
# Changes in version 0.6 - 2020-08-08
......
......@@ -15,6 +15,16 @@ from stem.control import Controller
from stem.version import Version, Requirement, get_system_tor_version
from stem import __version__ as stem_version
class TGenConf(object):
"""Represents a TGen configuration, for both client and server."""
def __init__(self, listen_port=None, connect_ip=None, connect_port=None, tor_ctl_port=None, tor_socks_port=None):
self.listen_port = str(listen_port)
self.tor_ctl_port = tor_ctl_port
self.tor_socks_port = tor_socks_port
# TGen clients use connect_ip and connect_port.
self.connect_ip = connect_ip
self.connect_port = connect_port
# onionperf imports
from . import analysis, monitor, model, util
......@@ -40,10 +50,11 @@ def readline_thread_task(instream, q):
# wait for lines from stdout until the EOF
for line in iter(instream.readline, b''): q.put(line)
def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_str, ready_ev):
def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_str, ready_ev, no_relaunch):
# launch or re-launch our sub process until we are told to stop
# if we fail too many times in too short of time, give up and exit
# launch or re-launch (or don't re-launch, if no_relaunch is set) our sub
# process until we are told to stop if we fail too many times in too short
# of time, give up and exit
failure_times = []
pause_time_seconds = 0
while done_ev.is_set() is False:
......@@ -95,6 +106,10 @@ def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_s
subp.wait()
elif done_ev.is_set():
logging.info("command '{}' finished as expected".format(cmd))
elif no_relaunch:
logging.info("command '{}' finished on its own".format(cmd))
# our command finished on its own. time to terminate.
done_ev.set()
else:
logging.warning("command '{}' finished before expected".format(cmd))
now = time.time()
......@@ -173,12 +188,11 @@ def logrotate_thread_task(writables, tgen_writable, torctl_writable, docroot, ni
class Measurement(object):
def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, oneshot, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False, drop_guards_interval_hours=0):
def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False, drop_guards_interval_hours=0):
self.tor_bin_path = tor_bin_path
self.tgen_bin_path = tgen_bin_path
self.datadir_path = datadir_path
self.privatedir_path = privatedir_path
self.oneshot = oneshot
self.nickname = nickname
self.threads = None
self.done_event = None
......@@ -191,20 +205,30 @@ class Measurement(object):
self.single_onion = single_onion
self.drop_guards_interval_hours = drop_guards_interval_hours
def run(self, do_onion=True, do_inet=True, client_tgen_listen_port=58888, client_tgen_connect_ip='0.0.0.0', client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000,
server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001):
def run(self, do_onion=True, do_inet=True, tgen_model=None, tgen_client_conf=None, tgen_server_conf=None):
'''
only `server_tgen_listen_port` are "public" and need to be opened on the firewall.
if `client_tgen_connect_port` != `server_tgen_listen_port`, then you should have installed a forwarding rule in the firewall.
only `tgen_server_conf.listen_port` are "public" and need to be opened on the firewall.
if `tgen_client_conf.connect_port` != `tgen_server_conf.listen_port`, then you should have installed a forwarding rule in the firewall.
all ports need to be unique though, and unique among multiple onionperf instances.
here are some sane defaults:
client_tgen_listen_port=58888, client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000,
server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001
tgen_client_conf.listen_port=58888, tgen_client_conf.connect_port=8080, tgen_client_conf.tor_ctl_port=59050, tgen_client_conf.tor_socks_port=59000,
tgen_server_conf.listen_port=8080, tgen_server_conf.tor_ctl_port=59051, tgen_server_conf.tor_socks_port=59001
'''
self.threads = []
self.done_event = threading.Event()
if tgen_client_conf is None:
tgen_client_conf = TGenConf(listen_port=58888,
connect_ip='0.0.0.0',
connect_port=8080,
tor_ctl_port=59050,
tor_socks_port=59000)
if tgen_server_conf is None:
tgen_server_conf = TGenConf(listen_port=8080,
tor_ctl_port=59051,
tor_socks_port=59001)
# if ctrl-c is pressed, shutdown child processes properly
try:
# make sure stem and Tor supports ephemeral HS (version >= 0.2.7.1-alpha)
......@@ -226,54 +250,49 @@ class Measurement(object):
tgen_client_writable, torctl_client_writable = None, None
if do_onion or do_inet:
general_writables.append(self.__start_tgen_server(server_tgen_listen_port))
tgen_model.port = tgen_server_conf.listen_port
general_writables.append(self.__start_tgen_server(tgen_model))
if do_onion:
logging.info("Onion Service private keys will be placed in {0}".format(self.privatedir_path))
# one must not have an open socks port when running a single
# onion service. see tor's man page for more information.
if self.single_onion:
server_tor_socks_port = 0
tor_writable, torctl_writable = self.__start_tor_server(server_tor_ctl_port,
server_tor_socks_port,
{client_tgen_connect_port:server_tgen_listen_port})
tgen_server_conf.tor_socks_port = 0
tor_writable, torctl_writable = self.__start_tor_server(tgen_server_conf.tor_ctl_port,
tgen_server_conf.tor_socks_port,
{tgen_client_conf.connect_port:tgen_server_conf.listen_port})
general_writables.append(tor_writable)
general_writables.append(torctl_writable)
if do_onion or do_inet:
tor_writable, torctl_client_writable = self.__start_tor_client(client_tor_ctl_port, client_tor_socks_port)
tor_writable, torctl_client_writable = self.__start_tor_client(tgen_client_conf.tor_ctl_port, tgen_client_conf.tor_socks_port)
general_writables.append(tor_writable)
server_urls = []
if do_onion and self.hs_v3_service_id is not None:
server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, client_tgen_connect_port))
server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, tgen_client_conf.connect_port))
if do_inet:
connect_ip = client_tgen_connect_ip if client_tgen_connect_ip != '0.0.0.0' else util.get_ip_address()
server_urls.append("{0}:{1}".format(connect_ip, client_tgen_connect_port))
connect_ip = tgen_client_conf.connect_ip if tgen_client_conf.connect_ip != '0.0.0.0' else util.get_ip_address()
server_urls.append("{0}:{1}".format(connect_ip, tgen_client_conf.connect_port))
tgen_model.servers = server_urls
if do_onion or do_inet:
assert len(server_urls) > 0
tgen_client_writable = self.__start_tgen_client(server_urls, client_tgen_listen_port, client_tor_socks_port)
tgen_model.port = tgen_client_conf.listen_port
tgen_model.socks_port = tgen_client_conf.tor_socks_port
tgen_client_writable = self.__start_tgen_client(tgen_model)
self.__start_log_processors(general_writables, tgen_client_writable, torctl_client_writable)
logging.info("Bootstrapping finished, entering heartbeat loop")
time.sleep(1)
if self.oneshot:
logging.info("Onionperf is running in Oneshot mode. It will download a 5M file and shut down gracefully...")
while True:
# TODO add status update of some kind? maybe the number of files in the www directory?
# logging.info("Heartbeat: {0} downloads have completed successfully".format(self.__get_download_count(tgen_client_writable.filename)))
if self.oneshot:
downloads = 0
while True:
downloads = self.__get_download_count(tgen_client_writable.filename)
if downloads >= 1:
logging.info("Onionperf has downloaded a 5M file in oneshot mode, and will now shut down.")
break
else:
continue
if tgen_model.num_transfers:
# This function blocks until our TGen client process
# terminated on its own.
self.__wait_for_tgen_client()
break
if self.__is_alive():
......@@ -321,35 +340,25 @@ class Measurement(object):
logrotate.start()
self.threads.append(logrotate)
def __start_tgen_client(self, server_urls, tgen_port, socks_port):
return self.__start_tgen("client", tgen_port, socks_port, server_urls)
def __start_tgen_client(self, tgen_model_conf):
return self.__start_tgen("client", tgen_model_conf)
def __start_tgen_server(self, tgen_port):
return self.__start_tgen("server", tgen_port)
def __start_tgen_server(self, tgen_model_conf):
return self.__start_tgen("server", tgen_model_conf)
def __start_tgen(self, name, tgen_port, socks_port=None, server_urls=None):
logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_port))
def __start_tgen(self, name, tgen_model_conf):
logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_model_conf.port))
tgen_datadir = "{0}/tgen-{1}".format(self.datadir_path, name)
if not os.path.exists(tgen_datadir): os.makedirs(tgen_datadir)
tgen_confpath = "{0}/tgen.graphml.xml".format(tgen_datadir)
if os.path.exists(tgen_confpath): os.remove(tgen_confpath)
if socks_port is None:
model.ListenModel(tgen_port="{0}".format(tgen_port)).dump_to_file(tgen_confpath)
logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_port))
if tgen_model_conf.socks_port is None:
model.ListenModel(tgen_port="{0}".format(tgen_model_conf.port)).dump_to_file(tgen_confpath)
logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_model_conf.port))
else:
tgen_model_args = {
"tgen_port": "{0}".format(tgen_port),
"tgen_servers": server_urls,
"socksproxy": "127.0.0.1:{0}".format(socks_port)
}
if self.oneshot:
tgen_model = model.OneshotModel(**tgen_model_args)
else:
tgen_model = model.TorperfModel(**tgen_model_args)
tgen_model = model.TorperfModel(tgen_model_conf)
tgen_model.dump_to_file(tgen_confpath)
tgen_logpath = "{0}/onionperf.tgen.log".format(tgen_datadir)
......@@ -357,7 +366,10 @@ class Measurement(object):
logging.info("Logging TGen {1} process output to {0}".format(tgen_logpath, name))
tgen_cmd = "{0} {1}".format(self.tgen_bin_path, tgen_confpath)
tgen_args = (tgen_cmd, tgen_datadir, tgen_writable, self.done_event, None, None, None)
# If we're running in "one-shot mode", TGen client will terminate on
# its own and we don't need our watchdog to restart the process.
no_relaunch = (name == "client" and tgen_model_conf.num_transfers)
tgen_args = (tgen_cmd, tgen_datadir, tgen_writable, self.done_event, None, None, None, no_relaunch)
tgen_watchdog = threading.Thread(target=watchdog_thread_task, name="tgen_{0}_watchdog".format(name), args=tgen_args)
tgen_watchdog.start()
self.threads.append(tgen_watchdog)
......@@ -455,7 +467,7 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
tor_stdin_bytes = str_tools._to_bytes(tor_config)
tor_ready_str = "Bootstrapped 100"
tor_ready_ev = threading.Event()
tor_args = (tor_cmd, tor_datadir, tor_writable, self.done_event, tor_stdin_bytes, tor_ready_str, tor_ready_ev)
tor_args = (tor_cmd, tor_datadir, tor_writable, self.done_event, tor_stdin_bytes, tor_ready_str, tor_ready_ev, False)
tor_watchdog = threading.Thread(target=watchdog_thread_task, name="tor_{0}_watchdog".format(name), args=tor_args)
tor_watchdog.start()
self.threads.append(tor_watchdog)
......@@ -482,14 +494,13 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
return tor_writable, torctl_writable
def __get_download_count(self, tgen_logpath):
count = 0
if tgen_logpath is not None and os.path.exists(tgen_logpath):
with open(tgen_logpath, 'r') as fin:
for line in fin:
if re.search("transfer-complete", line) is not None:
count += 1
return count
def __wait_for_tgen_client(self):
logging.info("Waiting for TGen client to finish.")
for t in self.threads:
if t.getName() == "tgen_client_watchdog":
while t.is_alive():
time.sleep(1)
logging.info("TGen client finished.")
def __is_alive(self):
all_alive = True
......
......@@ -41,6 +41,21 @@ class TGenLoadableModel(TGenModel):
model_instance = cls(graph)
return model_instance
class TGenModelConf(object):
"""Represents a TGen traffic model configuration."""
def __init__(self, pause_initial=300, num_transfers=1, transfer_size="5 MiB",
continuous_transfers=False, pause_between=300, port=None, servers=[],
socks_port=None):
self.pause_initial = pause_initial
self.pause_between = pause_between
self.num_transfers = num_transfers
self.transfer_size = transfer_size
self.continuous_transfers = continuous_transfers
self.port = port
self.servers = servers
self.socks_port = socks_port
class GeneratableTGenModel(TGenModel, metaclass=ABCMeta):
@abstractmethod
......@@ -58,61 +73,58 @@ class ListenModel(GeneratableTGenModel):
g.add_node("start", serverport=self.tgen_port, loglevel="info", heartbeat="1 minute")
return g
class TorperfModel(GeneratableTGenModel):
def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None):
self.tgen_port = tgen_port
self.tgen_servers = tgen_servers
self.socksproxy = socksproxy
self.graph = self.generate()
def generate(self):
server_str = ','.join(self.tgen_servers)
g = DiGraph()
if self.socksproxy is not None:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy)
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("pause", time="5 minutes")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_edge("start", "pause")
# after the pause, we start another pause timer while *at the same time* choosing one of
# the file sizes and downloading it from one of the servers in the server pool
g.add_edge("pause", "pause")
# these are chosen with weighted probability, change edge 'weight' attributes to adjust probability
g.add_edge("pause", "stream5m")
return g
class OneshotModel(GeneratableTGenModel):
class TorperfModel(GeneratableTGenModel):
def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None):
self.tgen_port = tgen_port
self.tgen_servers = tgen_servers
self.socksproxy = socksproxy
def __init__(self, config):
self.config = config
self.graph = self.generate()
def generate(self):
server_str = ','.join(self.tgen_servers)
server_str = ','.join(self.config.servers)
g = DiGraph()
if self.socksproxy is not None:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy)
if self.config.socks_port is not None:
g.add_node("start",
serverport=self.config.port,
peers=server_str,
loglevel="info",
heartbeat="1 minute",
socksproxy="127.0.0.1:{0}".format(self.config.socks_port))
else:
g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute")
g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds")
g.add_edge("start", "stream5m")
g.add_edge("stream5m", "start")
g.add_node("start",
serverport=self.config.port,
peers=server_str,
loglevel="info",
heartbeat="1 minute")
g.add_node("pause_initial",
time="%d seconds" % self.config.pause_initial)
g.add_node("stream",
sendsize="0",
recvsize=self.config.transfer_size,
timeout="270 seconds",
stallout="0 seconds")
g.add_node("pause_between",
time="%d seconds" % self.config.pause_between)
g.add_edge("start", "pause_initial")
g.add_edge("pause_initial", "stream")
g.add_edge("pause_initial", "pause_between")
g.add_edge("pause_between", "stream")
g.add_edge("pause_between", "pause_between")
# only add an end node if we need to stop
if not self.config.continuous_transfers:
# one-shot mode, i.e., end after configured number of transfers
g.add_node("end",
count="%d" % self.config.num_transfers)
# check for end condition after every transfer
g.add_edge("stream", "end")
return g
def dump_example_tgen_torperf_model(domain_name, onion_name):
# the server listens on 8888, the client uses Tor to come back directly, and using a hidden serv
server = ListenModel(tgen_port="8888")
......
......@@ -154,11 +154,6 @@ def main():
action="store", dest="tgenpath",
default=util.which("tgen"))
measure_parser.add_argument('--oneshot',
help="""Enables oneshot mode, onionperf closes on successfully downloading a file""",
action="store_true", dest="oneshot",
default=False)
measure_parser.add_argument('--additional-client-conf',
help="""Additional configuration lines for the Tor client, for example bridge lines""",
metavar="CONFIG", type=str,
......@@ -195,6 +190,30 @@ def main():
action="store", dest="tgenconnectport",
default=8080)
measure_parser.add_argument('--tgen-pause-initial',
help="""the number of seconds TGen should wait before walking through its action graph""",
metavar="N", type=int,
action="store", dest="tgenpauseinitial",
default=300)
measure_parser.add_argument('--tgen-pause-between',
help="""the number of seconds TGen should wait in between two transfers""",
metavar="N", type=int,
action="store", dest="tgenpausebetween",
default=300)
measure_parser.add_argument('--tgen-transfer-size',
help="""the size of the file transfer that TGen will perform (e.g., '5 MiB' or '10 KiB')""",
metavar="STRING", type=str,
action="store", dest="tgentransfersize",
default="5 MiB")
measure_parser.add_argument('--tgen-num-transfers',
help="""the number of file transfers that TGen will perform""",
metavar="N", type=int,
action="store", dest="tgennumtransfers",
default=0)
measure_parser.add_argument('--drop-guards',
help="""Use and drop guards every N > 0 hours, or do not use guards at all if N = 0""",
metavar="N", type=type_nonnegative_integer,
......@@ -333,7 +352,8 @@ def monitor(args):
writer.close()
def measure(args):
from onionperf.measurement import Measurement
from onionperf.measurement import Measurement, TGenConf
from onionperf.model import TGenModelConf
# check paths
args.torpath = util.find_path(args.torpath, "tor")
......@@ -353,12 +373,27 @@ def measure(args):
server_tor_ctl_port = util.get_random_free_port()
server_tor_socks_port = util.get_random_free_port()
tgen_client_conf = TGenConf(listen_port=client_tgen_port,
connect_ip=client_connect_ip,
connect_port=client_connect_port,
tor_ctl_port=client_tor_ctl_port,
tor_socks_port=client_tor_socks_port)
tgen_server_conf = TGenConf(listen_port=server_tgen_port,
tor_ctl_port=server_tor_ctl_port,
tor_socks_port=server_tor_socks_port)
tgen_model = TGenModelConf(pause_initial=args.tgenpauseinitial,
transfer_size=args.tgentransfersize,
num_transfers=args.tgennumtransfers,
continuous_transfers=args.tgennumtransfers == 0,
pause_between=args.tgenpausebetween)
meas = Measurement(args.torpath,
args.tgenpath,
args.prefix,
args.private_prefix,
args.nickname,
args.oneshot,
args.additional_client_conf,
args.torclient_conf_file,
args.torserver_conf_file,
......@@ -367,14 +402,9 @@ def measure(args):
meas.run(do_onion=not args.inet_only,
do_inet=not args.onion_only,
client_tgen_listen_port=client_tgen_port,
client_tgen_connect_ip=client_connect_ip,
client_tgen_connect_port=client_connect_port,
client_tor_ctl_port=client_tor_ctl_port,
client_tor_socks_port=client_tor_socks_port,
server_tgen_listen_port=server_tgen_port,
server_tor_ctl_port=server_tor_ctl_port,
server_tor_socks_port=server_tor_socks_port)
tgen_model=tgen_model,
tgen_client_conf=tgen_client_conf,
tgen_server_conf=tgen_server_conf)
else:
logging.info("Please fix path errors to continue")
......
......@@ -57,8 +57,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config_server = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
meas = measurement.Measurement(None, None, None, None, None, None,
"UseBridges 1\n", None, None)
meas = measurement.Measurement(None, None, None, None, None,
"UseBridges 1\n", None, None, False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......@@ -80,8 +80,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseBridges 1\n"
meas = measurement.Measurement(None, None, None, None, None, None, None,
absolute_data_path("config"), None)
meas = measurement.Measurement(None, None, None, None, None, None,
absolute_data_path("config"), None, False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......@@ -103,8 +103,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\
WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
meas = measurement.Measurement(None, None, None, None, None, None, None, None,
absolute_data_path("config"))
meas = measurement.Measurement(None, None, None, None, None, None, None,
absolute_data_path("config"), False)
config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client")
config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server")
assert_equals(config_client, known_config)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment