Skip to content
Snippets Groups Projects
Commit 78d90195 authored by Matt Traudt's avatar Matt Traudt
Browse files

Everybody should use pastlylogger instead of print

parent c9273e60
No related branches found
No related tags found
No related merge requests found
......@@ -35,14 +35,16 @@ class CircuitBuilder:
them, but CircuitBuilder will keep track of existing circuits and close
them when it is deleted.
'''
def __init__(self, args, controller=None, close_circuits_on_exit=True):
def __init__(self, args, log, controller=None,
close_circuits_on_exit=True):
if controller is None:
self.controller = stem_utils.init_controller(
port=args.control[1] if args.control[0] == 'port' else None,
path=args.control[1] if args.control[0] == 'socket' else None)
else:
self.controller = controller
self.relay_list = RelayList(args, controller=self.controller)
self.log = log
self.relay_list = RelayList(args, log, controller=self.controller)
self.built_circuits = set()
self.close_circuits_on_exit = close_circuits_on_exit
......@@ -80,7 +82,7 @@ class CircuitBuilder:
try:
circ_id = c.new_circuit(path, await_build=True)
except (InvalidRequest, CircuitExtensionFailed) as e:
print(e)
self.log.info(e)
continue
self.built_circuits.add(circ_id)
return circ_id
......@@ -125,8 +127,9 @@ class GuardedCircuitBuilder(CircuitBuilder):
for g in guards]
if len(self.guards) > len([g for g in self.guards if g]):
self.guards = [g for g in self.guards if g]
print('Warning: couldn\'t find descriptors for all guards. Only '
'using:', ', '.join([g.nickname for g in self.guards]))
self.log.warn('Warning: couldn\'t find descriptors for all '
'guards. Only using:',
', '.join([g.nickname for g in self.guards]))
assert len(self.guards) > 0
def build_circuit(self, length=3):
......@@ -213,14 +216,14 @@ class GapsCircuitBuilder(CircuitBuilder):
insert_relays = self._random_sample_relays(
num_missing, [r for r in path if r is not None])
if insert_relays is None:
print('Problem building a circuit to satisfy',
self.log.warn('Problem building a circuit to satisfy',
[r.nickname if r else None for r in path], 'with available '
'relays in the network')
return None
assert len(insert_relays) == num_missing
path = [r.fingerprint if r else insert_relays.pop().fingerprint
for r in path]
#print('building', '->'.join([r[0:8] for r in path]))
#self.log.info('building', '->'.join([r[0:8] for r in path]))
return self._build_circuit_impl(path)
......
from datetime import datetime
from threading import Lock, current_thread
class PastlyLogger:
"""
PastlyLogger - logging class inspired by Tor's logging API
error, warn, etc. are file names to open for logging.
If a log level doesn't have a file name given for it, messages destined
for that level cascade down to the next noisiest level.
Example 1: warn=foo.txt, debug=bar.txt
error and warn messages go to foo.txt, all other messages to bar.txt
Example 2: notice=baz.txt
error, warn, and notice messages go to baz.txt, all others are lost
overwrite is a list of log levels that should overwrite their log file
when opening instead of appending.
Example: notice=a.txt, info=b.txt, overwrite=['info']
error, warn, and notice messages are appended to a.txt;
b.txt is overwritten and info messages are appended to it;
all debug messages are lost
log_threads tells the logger whether or not to log thread names
log_levels tells the logger whether or not to log the level (notice, info,
warn, etc.)
log_date tells the logger whether or not to log the date
default tells the logger what level to log at when called with
log('foobar') instead of log.info('foobar')
"""
def __init__(self, error=None, warn=None, notice=None, info=None,
debug=None, overwrite=[], log_threads=False, default='notice',
log_levels=True, log_date=True):
self.log_threads = log_threads
self.log_levels = log_levels
self.log_date = log_date
assert default in ['debug', 'info', 'notice', 'warn', 'error']
self.default_level = default
# buffering=1 means line-based buffering
if error:
self.error_fd = open(error, 'w' if 'error' in overwrite else 'a',
buffering=1)
self.error_fd_mutex = Lock()
else:
self.error_fd = None
self.error_fd_mutex = None
if warn:
self.warn_fd = open(warn, 'w' if 'warn' in overwrite else 'a',
buffering=1)
self.warn_fd_mutex = Lock()
else:
self.warn_fd = None
self.warn_fd_mutex = None
if notice:
self.notice_fd = open(notice, 'w' if 'notice' in overwrite else
'a', buffering=1)
self.notice_fd_mutex = Lock()
else:
self.notice_fd = None
self.notice_fd_mutex = None
if info:
self.info_fd = open(info, 'w' if 'info' in overwrite else 'a',
buffering=1)
self.info_fd_mutex = Lock()
else:
self.info_fd = None
self.info_fd_mutex = None
if debug:
self.debug_fd = open(debug, 'w' if 'debug' in overwrite else 'a',
buffering=1)
self.debug_fd_mutex = Lock()
else:
self.debug_fd = None
self.debug_fd_mutex = None
self.debug('Creating PastlyLogger instance')
def __call__(self, *s):
if self.default_level == 'debug': return self.debug(*s)
elif self.default_level == 'info': return self.info(*s)
elif self.default_level == 'notice': return self.notice(*s)
elif self.default_level == 'warn': return self.warn(*s)
elif self.default_level == 'error': return self.error(*s)
def __del__(self):
self.debug('Deleting PastlyLogger instance')
self.flush()
if self.error_fd: self.error_fd.close()
if self.warn_fd: self.warn_fd.close()
if self.notice_fd: self.notice_fd.close()
if self.info_fd: self.info_fd.close()
if self.debug_fd: self.debug_fd.close()
self.error_fd, self.warn_fd = None, None
self.notice_fd, self.info_fd, self.debug_fd = None, None, None
if self.error_fd_mutex:
if not self.error_fd_mutex.acquire(blocking=False):
self.error_fd_mutex.release()
if self.warn_fd_mutex:
if not self.warn_fd_mutex.acquire(blocking=False):
self.warn_fd_mutex.release()
if self.notice_fd_mutex:
if not self.notice_fd_mutex.acquire(blocking=False):
self.notice_fd_mutex.release()
if self.info_fd_mutex:
if not self.info_fd_mutex.acquire(blocking=False):
self.info_fd_mutex.release()
if self.debug_fd_mutex:
if not self.debug_fd_mutex.acquire(blocking=False):
self.debug_fd_mutex.release()
def _log_file(fd, lock, log_levels, log_threads, log_date, level, *s):
assert fd
prefix = []
if log_date: prefix.append('[{}]'.format(datetime.now()))
if log_levels: prefix.append('[{}]'.format(level))
if log_threads: prefix.append('[{}]'.format(current_thread().name))
prefix = ' '.join(prefix)
s = ' '.join([str(s_) for s_ in s])
if prefix: s = ' '.join([prefix, s])
lock.acquire()
fd.write('{}\n'.format(s))
lock.release()
def flush(self):
if self.error_fd: self.error_fd.flush()
if self.warn_fd: self.warn_fd.flush()
if self.notice_fd: self.notice_fd.flush()
if self.info_fd: self.info_fd.flush()
if self.debug_fd: self.debug_fd.flush()
def debug(self, *s, level='debug'):
if self.debug_fd: return PastlyLogger._log_file(
self.debug_fd, self.debug_fd_mutex, self.log_levels,
self.log_threads, self.log_date, level, *s)
return None
def info(self, *s, level='info'):
if self.info_fd: return PastlyLogger._log_file(
self.info_fd, self.info_fd_mutex, self.log_levels,
self.log_threads, self.log_date, level, *s)
else: return self.debug(*s, level=level)
def notice(self, *s, level='notice'):
if self.notice_fd: return PastlyLogger._log_file(
self.notice_fd, self.notice_fd_mutex, self.log_levels,
self.log_threads, self.log_date, level, *s)
else: return self.info(*s, level=level)
def warn(self, *s, level='warn'):
if self.warn_fd: return PastlyLogger._log_file(
self.warn_fd, self.warn_fd_mutex, self.log_levels,
self.log_threads, self.log_date, level, *s)
else: return self.notice(*s, level=level)
def error(self, *s, level='error'):
if self.error_fd: return PastlyLogger._log_file(
self.error_fd, self.error_fd_mutex, self.log_levels,
self.log_threads, self.log_date, level, *s)
else: return self.warn(*s, level=level)
# pylama:ignore=E701
......@@ -11,13 +11,14 @@ class RelayList:
'''
REFRESH_INTERVAL = 300 # seconds
def __init__(self, args, controller=None):
def __init__(self, args, log, controller=None):
if controller is None:
self._controller = stem_utils.init_controller(
port=args.control[1] if args.control[0] == 'port' else None,
path=args.control[1] if args.control[0] == 'socket' else None)
else:
self._controller = controller
self.log = log
self._refresh()
@property
......
......@@ -68,9 +68,10 @@ class Result:
class ResultDump:
''' Runs the enter() method in a new thread and collects new Results on its
queue. Writes them to daily result files in the data directory '''
def __init__(self, datadir, end_event):
def __init__(self, log, datadir, end_event):
assert os.path.isdir(datadir)
assert isinstance(end_event, Event)
self.log = log
self.datadir = datadir
self.end_event = end_event
self.thread = Thread(target=self.enter)
......@@ -96,10 +97,10 @@ class ResultDump:
if result is None:
continue
elif not isinstance(result, Result):
print('failure', result, type(result))
self.log.warn('failure', result, type(result))
continue
fp = result.fingerprint
nick = result.nickname
self.write_result(result)
dls = result.downloads
print(fp, nick, dls)
self.log.debug(fp, nick, dls)
......@@ -13,9 +13,11 @@ from lib.circuitbuilder import GapsCircuitBuilder as CB
from lib.resultdump import ResultDump
from lib.resultdump import Result
from lib.relaylist import RelayList
from lib.pastlylogger import PastlyLogger
end_event = Event()
stream_building_lock = RLock()
log = PastlyLogger(debug='/dev/stdout', overwrite=['debug'], log_threads=True)
# maximum we want to read per read() call
MAX_RECV_PER_READ = 1*1024*1024
......@@ -27,7 +29,7 @@ def fail_hard(*s):
''' Optionally log something to stdout ... and then exit as fast as
possible '''
if s:
print(*s)
log.error(*s)
exit(1)
......@@ -54,9 +56,9 @@ def socket_connect(s, addr, port):
IPv6. Works with IPv4 and hostnames '''
try:
s.connect((addr, port))
print('connected to', addr, port, 'via', s.fileno())
log.debug('Connected to', addr, port, 'via', s.fileno())
except (socks.GeneralProxyError, socks.ProxyConnectionError) as e:
print(e)
log.warn(e)
return False
return True
......@@ -71,7 +73,7 @@ def tell_server_amount(sock, expected_amount):
try:
sock.send(bytes(amount, 'utf-8'))
except socket.timeout as e:
print(e)
log.info(e)
return False
return True
......@@ -86,7 +88,7 @@ def timed_recv_from_server(sock, yet_to_read):
try:
read_this_time = len(sock.recv(limit))
except socket.timeout as e:
print(e)
log.info(e)
return
if read_this_time == 0:
return
......@@ -103,12 +105,12 @@ def measure_rtt_to_server(sock):
for _ in range(0, 10):
start_time = time.time()
if not tell_server_amount(sock, 1):
print('Unable to ping server on', sock.fileno())
log.info('Unable to ping server on', sock.fileno())
return
amount_read = len(sock.recv(1))
end_time = time.time()
if amount_read == 0:
print('No pong from server on', sock.fileno())
log.info('No pong from server on', sock.fileno())
return
rtts.append(end_time - start_time)
return rtts
......@@ -124,7 +126,7 @@ def measure_relay(args, cb, rl, relay):
# A function that attaches all streams that gets created on
# connect() to the given circuit
listener = stem_utils.attach_stream_to_circuit_listener(
cb.controller, circ_id)
cb.controller, circ_id, log_fn=log.debug)
with stream_building_lock:
# Tell stem about our listener so it can attach the stream to the
# circuit when we connect()
......@@ -134,9 +136,10 @@ def measure_relay(args, cb, rl, relay):
# This call blocks until we are connected (or give up). We get attched
# to the right circuit in the background.
connected = socket_connect(s, args.server_host, args.server_port)
stem_utils.remove_event_listener(cb.controller, listener)
stem_utils.remove_event_listener(cb.controller, listener,
log_fn=log.info)
if not connected:
print('Unable to connect to', args.server_host, args.server_port)
log.info('Unable to connect to', args.server_host, args.server_port)
cb.close_circuit(circ_id)
return
# FIRST: measure the end-to-end RTT many times
......@@ -194,7 +197,7 @@ def result_putter_error(target):
measurement -- and return that function so it can be used by someone else
'''
def closure(err):
print('Unhandled exception caught while measuring {}: {} {}'.format(
log.warn('Unhandled exception caught while measuring {}: {} {}'.format(
target.nickname, type(err), err))
return closure
......@@ -202,10 +205,11 @@ def result_putter_error(target):
def test_speedtest(args):
controller = stem_utils.init_controller(
port=args.control[1] if args.control[0] == 'port' else None,
path=args.control[1] if args.control[0] == 'socket' else None)
cb = CB(args, controller=controller)
rl = RelayList(args, controller=controller)
rd = ResultDump(args.result_directory, end_event)
path=args.control[1] if args.control[0] == 'socket' else None,
log_fn=log.debug)
cb = CB(args, log, controller=controller)
rl = RelayList(args, log, controller=controller)
rd = ResultDump(log, args.result_directory, end_event)
max_pending_results = args.threads
pool = Pool(max_pending_results)
pending_results = []
......@@ -221,10 +225,10 @@ def test_speedtest(args):
while len(pending_results) >= max_pending_results:
time.sleep(5)
pending_results = [r for r in pending_results if not r.ready()]
print('Waiting for all results')
log.notice('Waiting for all results')
for r in pending_results:
r.wait()
print('Got all results')
log.notice('Got all results')
def main(args):
......
......@@ -3,6 +3,9 @@ from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
import socket
import time
from threading import Thread
from lib.pastlylogger import PastlyLogger
log = PastlyLogger(debug='/dev/stdout', overwrite=['debug'], log_threads=True)
MAX_SEND_PER_WRITE = 100*1024*1024
MAX_SEND_PER_WRITE = 4096
......@@ -18,7 +21,7 @@ def read_line(s):
try:
c = s.recv(1)
except ConnectionResetError as e:
print(e)
log.info(e)
return None
if not c:
return chars
......@@ -32,7 +35,7 @@ def read_line(s):
def close_socket(s):
try:
print('Closing fd', s.fileno())
log.info('Closing fd', s.fileno())
s.shutdown(socket.SHUT_RDWR)
s.close()
except OSError:
......@@ -50,14 +53,14 @@ def get_send_amount(sock):
def write_to_client(sock, amount):
''' Returns True if successful; else False '''
print('Sending client no.', sock.fileno(), amount, 'bytes')
log.info('Sending client no.', sock.fileno(), amount, 'bytes')
while amount > 0:
amount_this_time = min(MAX_SEND_PER_WRITE, amount)
amount -= amount_this_time
try:
sock.send(b'a' * amount_this_time)
except (ConnectionResetError, BrokenPipeError) as e:
print('fd', sock.fileno(), ':', e)
log.info('fd', sock.fileno(), ':', e)
return False
return True
......@@ -67,7 +70,7 @@ def new_thread(sock):
while True:
send_amount = get_send_amount(sock)
if send_amount is None:
print('Couldn\'t get an amount to send to', sock.fileno())
log.info('Couldn\'t get an amount to send to', sock.fileno())
close_socket(sock)
return
write_to_client(sock, send_amount)
......@@ -75,24 +78,25 @@ def new_thread(sock):
thread = Thread(target=closure)
return thread
def main(args):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
h = (args.bind_ip, args.bind_port)
print('binding to', h)
log.notice('binding to', h)
while True:
try:
server.bind(h)
except OSError as e:
print(e)
log.warn(e)
time.sleep(5)
else:
break
print('listening on', h)
log.notice('listening on', h)
server.listen(5)
try:
while True:
sock, addr = server.accept()
print('accepting connection from', addr, 'as', sock.fileno())
log.info('accepting connection from', addr, 'as', sock.fileno())
t = new_thread(sock)
t.start()
except KeyboardInterrupt:
......@@ -108,4 +112,3 @@ if __name__ == '__main__':
parser.add_argument('bind_port', type=int, default=4444)
args = parser.parse_args()
main(args)
......@@ -22,18 +22,19 @@ def fp_or_nick_to_relay(controller, fp_nick):
return controller.get_network_status(fp_nick, default=None)
def attach_stream_to_circuit_listener(controller, circ_id):
def attach_stream_to_circuit_listener(controller, circ_id, log_fn=print):
''' Returns a function that should be given to add_event_listener(). It
looks for newly created streams and attaches them to the given circ_id '''
assert is_controller_okay(controller)
def closure_stream_event_listener(st):
if st.status == 'NEW' and st.purpose == 'USER':
print('Attaching stream {} to circ {}'.format(st.id, circ_id))
log_fn('Attaching stream {} to circ {}'.format(st.id, circ_id))
try:
controller.attach_stream(st.id, circ_id)
except (UnsatisfiableRequest, InvalidRequest) as e:
print('Couldn\'t attach stream to circ {}:'.format(circ_id), e)
log_fn('Couldn\'t attach stream to circ {}:'.format(circ_id),
e)
else:
pass
return closure_stream_event_listener
......@@ -44,14 +45,15 @@ def add_event_listener(controller, func, event):
controller.add_event_listener(func, event)
def remove_event_listener(controller, func):
def remove_event_listener(controller, func, log_fn=print):
if not is_controller_okay(controller):
print('Warning: controller not okay so not trying to remove event')
log_fn('Warning: controller not okay so not trying to remove event')
return
controller.remove_event_listener(func)
def init_controller(port=None, path=None, set_custom_stream_settings=True):
def init_controller(port=None, path=None, set_custom_stream_settings=True,
log_fn=print):
# make sure only one is set
assert port is not None or path is not None
assert not (port is not None and path is not None)
......@@ -68,7 +70,7 @@ def init_controller(port=None, path=None, set_custom_stream_settings=True):
if not c:
return None
assert c is not None
print('Connected to Tor via', port if port else path)
log_fn('Connected to Tor via', port if port else path)
if set_custom_stream_settings:
c.set_conf('__DisablePredictedCircuits', '1')
c.set_conf('__LeaveStreamsUnattached', '1')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment