Loading nickname_checker.py 0 → 100755 +127 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # Copyright 2013-2021, Damian Johnson and The Tor Project # See LICENSE for licensing information """ Simple script that checks to see if there has been a sudden influx of new relays with a specific nickname. If so then this sends an email notification. """ import os import re import time import traceback import util from stem.descriptor.remote import DescriptorDownloader EMAIL_SUBJECT = 'Possible Sybil Attack - By Nickname' EMAIL_BODY = """\ ALERT: %i new relays have appeared with nickname matching '%s'. """ RELAY_ENTRY = """\ * %s (%s) Address: %s:%i Version: %s Exit Policy: %s """ NICKNAME_FILE = util.get_path('data', 'nickname') log = util.get_logger('nickname_checker') def main(): nicknames_to_check = load_nicknames() downloader = DescriptorDownloader(timeout = 60, validate = True) dry_run = False if not nicknames_to_check: log.debug("We don't have any nicknames so stop. No notifications will be sent.") return query = downloader.get_consensus() query.run(True) if query.error: log.warning("Unable to retrieve the consensus: %s" % query.error) return # Build a dictionnary with the loaded nicknames with each an empty table # that will contain match relays. matches = {} for name in nicknames_to_check: if name not in matches: matches[name] = [] num_found = 0 for entry in query: # For each nickname, match it as a regex. for nickname in nicknames_to_check: if re.search(nickname, entry.nickname): matches[nickname].append(entry) num_found += 1 log.debug("%i new relays found" % num_found) if not dry_run and num_found > 0 : log.debug("Sending a notification...") send_email(matches) def send_email(matches): # Constructs a mapping of nicknames to router status entries so we can # provide a listing that's sorted by nicknames. for nickname in matches: relays = matches[nickname] if len(relays) == 0: # Ignore empty relays for the nickname. continue relay_entries = [] for relay in relays: relay_entries.append(RELAY_ENTRY % (relay.nickname, relay.fingerprint, \ relay.address, relay.or_port, relay.version, relay.exit_policy)) try: body = EMAIL_BODY % (len(relays), nickname) body += "\n".join(relay_entries) util.send(EMAIL_SUBJECT, body = body) except Exception as exc: log.warning("Unable to send email: %s" % exc) def load_nicknames(): log.debug("Loading nicknames...") if not os.path.exists(NICKNAME_FILE): log.debug(" '%s' doesn't exist" % NICKNAME_FILE) return set() try: with open(NICKNAME_FILE) as nickname_file: nicknames = nickname_file.read().strip() if not nicknames: log.debug(" '%s' is empty" % NICKNAME_FILE) return set() nicknames = nicknames.splitlines() log.debug(" %i nicknames found" % len(nicknames)) return set(nicknames) except Exception as exc: log.debug(" unable to read '%s': %s" % (NICKNAME_FILE, exc)) return set() if __name__ == '__main__': try: main() except: msg = "nickname_checker.py failed with:\n\n%s" % traceback.format_exc() log.error(msg) util.send("Script Error", body = msg, to = [util.ERROR_ADDRESS]) Loading
nickname_checker.py 0 → 100755 +127 −0 Original line number Diff line number Diff line #!/usr/bin/env python3 # Copyright 2013-2021, Damian Johnson and The Tor Project # See LICENSE for licensing information """ Simple script that checks to see if there has been a sudden influx of new relays with a specific nickname. If so then this sends an email notification. """ import os import re import time import traceback import util from stem.descriptor.remote import DescriptorDownloader EMAIL_SUBJECT = 'Possible Sybil Attack - By Nickname' EMAIL_BODY = """\ ALERT: %i new relays have appeared with nickname matching '%s'. """ RELAY_ENTRY = """\ * %s (%s) Address: %s:%i Version: %s Exit Policy: %s """ NICKNAME_FILE = util.get_path('data', 'nickname') log = util.get_logger('nickname_checker') def main(): nicknames_to_check = load_nicknames() downloader = DescriptorDownloader(timeout = 60, validate = True) dry_run = False if not nicknames_to_check: log.debug("We don't have any nicknames so stop. No notifications will be sent.") return query = downloader.get_consensus() query.run(True) if query.error: log.warning("Unable to retrieve the consensus: %s" % query.error) return # Build a dictionnary with the loaded nicknames with each an empty table # that will contain match relays. matches = {} for name in nicknames_to_check: if name not in matches: matches[name] = [] num_found = 0 for entry in query: # For each nickname, match it as a regex. for nickname in nicknames_to_check: if re.search(nickname, entry.nickname): matches[nickname].append(entry) num_found += 1 log.debug("%i new relays found" % num_found) if not dry_run and num_found > 0 : log.debug("Sending a notification...") send_email(matches) def send_email(matches): # Constructs a mapping of nicknames to router status entries so we can # provide a listing that's sorted by nicknames. for nickname in matches: relays = matches[nickname] if len(relays) == 0: # Ignore empty relays for the nickname. continue relay_entries = [] for relay in relays: relay_entries.append(RELAY_ENTRY % (relay.nickname, relay.fingerprint, \ relay.address, relay.or_port, relay.version, relay.exit_policy)) try: body = EMAIL_BODY % (len(relays), nickname) body += "\n".join(relay_entries) util.send(EMAIL_SUBJECT, body = body) except Exception as exc: log.warning("Unable to send email: %s" % exc) def load_nicknames(): log.debug("Loading nicknames...") if not os.path.exists(NICKNAME_FILE): log.debug(" '%s' doesn't exist" % NICKNAME_FILE) return set() try: with open(NICKNAME_FILE) as nickname_file: nicknames = nickname_file.read().strip() if not nicknames: log.debug(" '%s' is empty" % NICKNAME_FILE) return set() nicknames = nicknames.splitlines() log.debug(" %i nicknames found" % len(nicknames)) return set(nicknames) except Exception as exc: log.debug(" unable to read '%s': %s" % (NICKNAME_FILE, exc)) return set() if __name__ == '__main__': try: main() except: msg = "nickname_checker.py failed with:\n\n%s" % traceback.format_exc() log.error(msg) util.send("Script Error", body = msg, to = [util.ERROR_ADDRESS])