Commit 568f6be6 authored by Barkin Simsek's avatar Barkin Simsek 🐢
Browse files

Let Onionoo fetch details for multiple relays

parent ec9b1869
import logging
from datetime import datetime
from sqlalchemy.orm import sessionmaker
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import MetaData
from captchamonitor.utils.collector import Collector
class UpdateRelays:
"""
Fetches the latest consensus and inserts the relays listed there into the
database
"""
def __init__(
self,
config: Config,
db_session: sessionmaker,
) -> None:
# Private class attributes
self.__logger = logging.getLogger(__name__)
self.__config: Config = config
self.__db_session: sessionmaker = db_session
self.__collector: Collector = Collector()
self.__datetime_format: str = "%Y-%m-%d-%H-00-00"
self.__current_datetime: datetime
self.__current_datetime_str: str
# Execute the private methods
self.__update_current_time()
def __update_current_time(self) -> None:
"""
Updates the current time
"""
self.__current_datetime = datetime.now().strftime(
self.__datetime_format
)
self.__current_datetime_str = self.__current_datetime.strftime(
self.__datetime_format
)
def __check_last_relay_update(self) -> datetime:
"""
Connects to the database and check the last time relays were updated
:return: The last time relays were updated
:rtype: datetime
"""
metadata_key = "last_relay_update_datetime"
query = self.__db_session.query(MetaData).filter(
MetaData.key == metadata_key
)
# Check if it exists in the database
if query.count() == 0:
# Create a new one if it doesn't exist'
metadata = MetaData(
key=metadata_key, value=self.__current_datetime_str
)
self.__db_session.add(metadata)
return datetime.strptime(
self.__current_datetime_str, self.__datetime_format
)
else:
# Get and return the existing value
date = query.one().value
return datetime.strptime(date, self.__datetime_format)
......@@ -2,14 +2,35 @@ import json
import logging
from typing import Dict, List, Optional
from datetime import datetime, timezone
from dataclasses import dataclass
import requests
import country_converter as coco
from captchamonitor.utils.exceptions import (
OnionooConnectionError,
OnionooMissingRelayError,
)
from captchamonitor.utils.exceptions import OnionooConnectionError
@dataclass
class OnionooRelayEntry:
"""
Stores a Onionoo relay entry
"""
fingerprint: str
ipv4_exiting_allowed: bool
ipv6_exiting_allowed: bool
country: Optional[str]
country_name: Optional[str]
continent: Optional[str]
nickname: Optional[str]
first_seen: Optional[datetime]
last_seen: Optional[datetime]
version: Optional[str]
asn: Optional[str]
asn_name: Optional[str]
platform: Optional[str]
exit_policy_summary: Optional[dict]
exit_policy_v6_summary: Optional[dict]
class Onionoo:
......@@ -17,96 +38,85 @@ class Onionoo:
Uses Onionoo to get the details of the given relay
"""
def __init__(self, fingerprint: str) -> None:
def __init__(self, fingerprints: List[str]) -> None:
"""
Initialize, fetch, and parse the details
:param fingerprint: BASE64 encoded SHA256 hash of the relay
:type fingerprint: str
:param fingerprints: List of BASE64 encoded SHA256 hash of the relays
:type fingerprints: List[str]
"""
# Public class attributes
self.fingerprint: str = fingerprint
self.relay_data: Dict
self.ipv4_exiting_allowed: bool
self.ipv6_exiting_allowed: bool
self.country: Optional[str]
self.country_name: Optional[str]
self.continent: Optional[str]
self.nickname: Optional[str]
self.first_seen: Optional[datetime]
self.last_seen: Optional[datetime]
self.version: Optional[str]
self.asn: Optional[str]
self.asn_name: Optional[str]
self.platform: Optional[str]
self.exit_policy_summary: Optional[dict]
self.exit_policy_v6_summary: Optional[dict]
self.fingerprint_list: List[str] = fingerprints
self.relay_entries: List[OnionooRelayEntry] = []
# Private class attributes
self.__logger = logging.getLogger(__name__)
self.__lookup_fields: str = f"fingerprint,nickname,exit_policy_summary,exit_policy_v6_summary,first_seen,last_seen,country,country_name,as,as_name,version,platform&lookup={fingerprint}"
self.__fingerprint_list_str: str = "".join(
f"{str(fpr)}," for fpr in self.fingerprint_list
)[:-1]
self.__lookup_fields: str = f"fingerprint,nickname,exit_policy_summary,exit_policy_v6_summary,first_seen,last_seen,country,country_name,as,as_name,version,platform&lookup={self.__fingerprint_list_str}"
self.__lookup_url: str = (
f"https://onionoo.torproject.org/details?fields={self.__lookup_fields}"
)
self.__relay_data: Dict
self.__onionoo_datetime_format: str = "%Y-%m-%d %H:%M:%S"
self.__exit_ports: List[int] = [80, 443]
# Execute the private methods
self.__get_details()
self.__parse_details()
for relay in self.__relay_data:
self.relay_entries.append(self.__parse_details_of_relay(relay))
def __get_details(self) -> None:
"""
Performs a request to Onioon API
:raises OnionooMissingRelayError: If given relay is missing on Onionoo
:raises OnionooConnectionError: If cannot connect to the API
"""
try:
response = json.loads(requests.get(self.__lookup_url).text)
self.relay_data = response["relays"][0]
except IndexError as exception:
self.__logger.debug(
"Upps, this relay does not exist on Onionoo yet: %s",
self.fingerprint,
)
raise OnionooMissingRelayError from exception
self.__relay_data = response["relays"]
except Exception as exception:
self.__logger.debug("Could not connect to Onionoo: %s", exception)
raise OnionooConnectionError from exception
def __parse_details(self) -> None:
def __parse_details_of_relay(self, relay_data: Dict) -> OnionooRelayEntry:
"""
Parses the JSON response
Parses given Onionoo JSON response
:param relay_data: JSON data
:type relay_data: Dict
:return: OnionooRelayEntry object
:rtype: OnionooRelayEntry
"""
# TODO: Please refactor me into smaller functions
# pylint: disable=R0914
# Parse metadata
self.country = self.relay_data.get("country", None)
self.country_name = self.relay_data.get("country_name", None)
self.continent = coco.convert(names=self.country, to="continent")
self.nickname = self.relay_data.get("nickname", None)
self.version = self.relay_data.get("version", None)
self.asn = self.relay_data.get("as", None)
self.asn_name = self.relay_data.get("as_name", None)
self.platform = self.relay_data.get("platform", None)
fingerprint = relay_data.get("fingerprint", None)
country = relay_data.get("country", None)
country_name = relay_data.get("country_name", None)
continent = coco.convert(names=country, to="continent")
nickname = relay_data.get("nickname", None)
version = relay_data.get("version", None)
asn = relay_data.get("as", None)
asn_name = relay_data.get("as_name", None)
platform = relay_data.get("platform", None)
# Parse exit polices
self.exit_policy_summary = self.relay_data.get("exit_policy_summary", None)
self.exit_policy_v6_summary = self.relay_data.get(
"exit_policy_v6_summary", None
)
exit_policy_summary = relay_data.get("exit_policy_summary", None)
exit_policy_v6_summary = relay_data.get("exit_policy_v6_summary", None)
self.ipv4_exiting_allowed = self.__is_exiting_allowed(
self.exit_policy_summary, self.__exit_ports
ipv4_exiting_allowed = self.__is_exiting_allowed(
exit_policy_summary, self.__exit_ports
)
self.ipv6_exiting_allowed = self.__is_exiting_allowed(
self.exit_policy_v6_summary, self.__exit_ports
ipv6_exiting_allowed = self.__is_exiting_allowed(
exit_policy_v6_summary, self.__exit_ports
)
# Parse first and last seen fields
first_seen = self.relay_data.get("first_seen", None)
last_seen = self.relay_data.get("last_seen", None)
first_seen = relay_data.get("first_seen", None)
last_seen = relay_data.get("last_seen", None)
if first_seen is not None:
first_seen = datetime.strptime(
first_seen, self.__onionoo_datetime_format
......@@ -115,8 +125,24 @@ class Onionoo:
last_seen = datetime.strptime(
last_seen, self.__onionoo_datetime_format
).replace(tzinfo=timezone.utc)
self.first_seen = first_seen
self.last_seen = last_seen
return OnionooRelayEntry(
fingerprint=fingerprint,
ipv4_exiting_allowed=ipv4_exiting_allowed,
ipv6_exiting_allowed=ipv6_exiting_allowed,
country=country,
country_name=country_name,
continent=continent,
nickname=nickname,
first_seen=first_seen,
last_seen=last_seen,
version=version,
asn=asn,
asn_name=asn_name,
platform=platform,
exit_policy_summary=exit_policy_summary,
exit_policy_v6_summary=exit_policy_v6_summary,
)
def __is_exiting_allowed(
self, exit_policy_summary: Optional[Dict], exit_ports: List[int]
......
import unittest
import pytest
from captchamonitor.utils.onionoo import Onionoo
from captchamonitor.utils.exceptions import (
OnionooConnectionError,
OnionooMissingRelayError,
)
class TestOnionoo(unittest.TestCase):
def setUp(self):
self.valid_fingerprint = "A53C46F5B157DD83366D45A8E99A244934A14C46"
self.invalid_fingerprint = "2222222222222222222222222222222222222222"
self.csailmitexit_fpr = "A53C46F5B157DD83366D45A8E99A244934A14C46"
self.csailmitnoexit_fpr = "9715C81BA8C5B0C698882035F75C67D6D643DBE3"
self.exit_ports = [80, 443]
def test_init(self):
onionoo = Onionoo(self.valid_fingerprint)
def test_onionoo_init(self):
onionoo = Onionoo([self.csailmitexit_fpr])
relay = onionoo.relay_entries[0]
self.assertEqual(onionoo.fingerprint, self.valid_fingerprint)
self.assertEqual(onionoo.country.upper(), "US")
self.assertEqual(onionoo.continent, "America")
self.assertEqual(onionoo.nickname, "csailmitexit")
self.assertTrue(onionoo.ipv4_exiting_allowed)
self.assertFalse(onionoo.ipv6_exiting_allowed)
self.assertEqual(relay.fingerprint, self.csailmitexit_fpr)
self.assertEqual(relay.country.upper(), "US")
self.assertEqual(relay.continent, "America")
self.assertEqual(relay.nickname, "csailmitexit")
self.assertTrue(relay.ipv4_exiting_allowed)
self.assertFalse(relay.ipv6_exiting_allowed)
def test_invalid_fingerprint(self):
# Try intializing
with pytest.raises(OnionooMissingRelayError) as pytest_wrapped_e:
Onionoo(self.invalid_fingerprint)
def test_onionoo_init_multiple_relays(self):
onionoo = Onionoo([self.csailmitexit_fpr, self.csailmitnoexit_fpr])
# Check if the exception is correct
self.assertEqual(pytest_wrapped_e.type, OnionooMissingRelayError)
self.assertEqual(len(onionoo.relay_entries), 2)
def test_is_exiting_allowed(self):
onionoo = Onionoo(self.valid_fingerprint)
onionoo = Onionoo([self.csailmitexit_fpr])
self.assertFalse(onionoo._Onionoo__is_exiting_allowed({}, self.exit_ports))
......@@ -65,7 +57,7 @@ class TestOnionoo(unittest.TestCase):
)
def test_is_in_range(self):
onionoo = Onionoo(self.valid_fingerprint)
onionoo = Onionoo([self.csailmitexit_fpr])
port_list = ["22", "4661-4666", "6881-6999"]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment