Commit 6edd956e authored by Barkin Simsek's avatar Barkin Simsek 🐢
Browse files

Add UpdateRelays class for updating list of relays in the db

parent 568f6be6
import logging
from typing import Dict, List
from datetime import datetime
from sqlalchemy.orm import sessionmaker
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import MetaData
from captchamonitor.utils.models import Relay, MetaData
from captchamonitor.utils.onionoo import Onionoo, OnionooRelayEntry
from captchamonitor.utils.collector import Collector
from captchamonitor.utils.consensus_parser import ConsensusV3Parser, ConsensusRelayEntry
class UpdateRelays:
......@@ -18,29 +21,47 @@ class UpdateRelays:
self,
config: Config,
db_session: sessionmaker,
auto_update: bool = True,
) -> None:
"""
Initializes UpdateRelays
:param config: The config class instance that contains global configuration values
:type config: Config
:param db_session: Database session used to connect to the database
:type db_session: sessionmaker
:param auto_update: Should I update the relay list when __init__ is called, defaults to True
:type auto_update: bool
"""
# Private class attributes
self.__logger = logging.getLogger(__name__)
self.__config: Config = config
self.__db_session: sessionmaker = db_session
self.__collector: Collector = Collector()
self.__datetime_format: str = "%Y-%m-%d-%H-00-00"
self.__current_datetime: datetime
self.__current_datetime_str: str
# Execute the private methods
self.__update_current_time()
self.__check_last_relay_update()
def __update_current_time(self) -> None:
if auto_update:
if self.__hours_since_last_update() >= 1:
self.__logger.info("Updating the relay list using the latest consensus")
self.update()
else:
self.__logger.info(
"Did not update the relay list since less than an hour passed since last update"
)
def __hours_since_last_update(self) -> int:
"""
Updates the current time
Calculates the number of hours passed since the last relay update
:return: Number of hours passed since last relay update
:rtype: int
"""
self.__current_datetime = datetime.now().strftime(
self.__datetime_format
)
self.__current_datetime_str = self.__current_datetime.strftime(
self.__datetime_format
)
current_datetime = datetime.now().replace(minute=0, second=0, microsecond=0)
last_datetime = self.__check_last_relay_update()
time_difference = current_datetime - last_datetime
return int(time_difference.total_seconds() // 3600)
def __check_last_relay_update(self) -> datetime:
"""
......@@ -51,22 +72,91 @@ class UpdateRelays:
"""
metadata_key = "last_relay_update_datetime"
query = self.__db_session.query(MetaData).filter(
MetaData.key == metadata_key
)
query = self.__db_session.query(MetaData).filter(MetaData.key == metadata_key)
# Check if it exists in the database
if query.count() == 0:
# Create a new one if it doesn't exist'
# Create a new one if it doesn't exist
current_datetime = datetime.now()
current_datetime_str = current_datetime.strftime(self.__datetime_format)
metadata = MetaData(
key=metadata_key, value=self.__current_datetime_str
key=metadata_key,
value=current_datetime_str,
)
self.__db_session.add(metadata)
return datetime.strptime(
self.__current_datetime_str, self.__datetime_format
return datetime.strptime(current_datetime_str, self.__datetime_format)
# Get and return the existing value
date_from_db = query.one().value
return datetime.strptime(date_from_db, self.__datetime_format)
def __insert_batch_into_db(
self,
onionoo_relay_data: List[OnionooRelayEntry],
parsed_consensus: Dict[str, ConsensusRelayEntry],
) -> None:
"""
Inserts given batch of data into the database
:param onionoo_relay_data: List of OnionooRelayEntry objects
:type onionoo_relay_data: List[OnionooRelayEntry]
:param parsed_consensus: Dictionary of ConsensusRelayEntry
:type parsed_consensus: Dict[str, ConsensusRelayEntry]
"""
# Iterate over the relays in consensus file
for onionoo_relay in onionoo_relay_data:
db_relay = Relay(
fingerprint=onionoo_relay.fingerprint,
ipv4_address=parsed_consensus[onionoo_relay.fingerprint].IP,
ipv6_address=parsed_consensus[onionoo_relay.fingerprint].IPv6,
ipv4_exiting_allowed=onionoo_relay.ipv4_exiting_allowed,
ipv6_exiting_allowed=onionoo_relay.ipv6_exiting_allowed,
country=onionoo_relay.country,
country_name=onionoo_relay.country_name,
continent=onionoo_relay.continent,
status=True,
nickname=onionoo_relay.nickname,
first_seen=onionoo_relay.first_seen,
last_seen=onionoo_relay.last_seen,
version=onionoo_relay.version,
asn=onionoo_relay.asn,
asn_name=onionoo_relay.asn_name,
platform=onionoo_relay.platform,
)
else:
# Get and return the existing value
date = query.one().value
return datetime.strptime(date, self.__datetime_format)
# Add to the database
self.__db_session.add(db_relay)
# Commit changes to the database
self.__db_session.commit()
def update(self, batch_size: int = 30) -> None:
"""
Gets the latest consensus and parses the list of relays in the consensus.
Later, adds the relays to the database. Performs this operation in batches
to not to overwhelm the Onionoo API.
:param batch_size: Number of relays to process in a single batch, defaults to 30
:type batch_size: int
"""
# Download the latest consensus
current_datetime = datetime.now()
consensus_file = self.__collector.get_consensus(current_datetime)
# Parse the consensus file
parsed_consensus = {
str(relay.fingerprint): relay
for relay in ConsensusV3Parser(consensus_file).relay_entries
}
relay_fingerprints = list(parsed_consensus.keys())
# Get relay information in chunks to not overwhelm Onionoo API
for i in range(0, len(relay_fingerprints), batch_size):
relay_batch = relay_fingerprints[i : i + batch_size]
# Get relays' details from Onionoo
onionoo_relay_data = Onionoo(relay_batch).relay_entries
self.__insert_batch_into_db(onionoo_relay_data, parsed_consensus)
......@@ -68,6 +68,7 @@ class ConsensusRelayEntry:
DirPort: int
bandwidth: float
flags: List
fingerprint: Optional[str] = None
guard_probability: float = 0.0
middle_probability: float = 0.0
exit_probability: float = 0.0
......
......@@ -33,6 +33,20 @@ class BaseModel(Model): # type: ignore
# fmt: on
class MetaData(BaseModel):
"""
Stores metadata related to CAPTCHA Monitor's progress accross runs
Uses Key:Value pairs for each metadata
"""
__tablename__ = "metadata"
# fmt: off
key = Column(String, unique=True, nullable=False) # Key for the metadata
value = Column(String) # Value for the metadata
# fmt: on
class URL(BaseModel):
"""
Stores list of tracked URLs and metadata related to them
......
......@@ -96,7 +96,10 @@ class Onionoo:
fingerprint = relay_data.get("fingerprint", None)
country = relay_data.get("country", None)
country_name = relay_data.get("country_name", None)
continent = coco.convert(names=country, to="continent")
if country_name is not None:
continent = coco.convert(names=country, to="continent")
else:
continent = None
nickname = relay_data.get("nickname", None)
version = relay_data.get("version", None)
asn = relay_data.get("as", None)
......
import unittest
from datetime import datetime
import pytest
from freezegun import freeze_time
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import Relay, MetaData
from captchamonitor.utils.onionoo import Onionoo
from captchamonitor.utils.database import Database
from captchamonitor.core.update_relays import UpdateRelays
from captchamonitor.utils.consensus_parser import ConsensusRelayEntry
class TestUpdateRelays(unittest.TestCase):
def setUp(self):
self.config = Config()
self.database = Database(
self.config["db_host"],
self.config["db_port"],
self.config["db_name"],
self.config["db_user"],
self.config["db_password"],
)
self.db_session = self.database.session()
self.db_metadata_query = self.db_session.query(MetaData)
self.db_relay_query = self.db_session.query(Relay)
self.csailmitexit_fpr = "A53C46F5B157DD83366D45A8E99A244934A14C46"
self.consensus_relay_entry = ConsensusRelayEntry(
nickname="csailmitexit",
identity="B2GX5y1BRdre5CumPWY/sdc6RPs",
digest="test",
publication=datetime.now(),
IP="127.0.0.1",
IPv6="4ffa:49b3:6b32:0bd7:c8af:45ee:5c22:16c5",
IPv6ORPort="0",
is_exit=True,
ORPort=0,
DirPort=0,
bandwidth=0,
flags=["test"],
)
def tearDown(self):
self.db_session.close()
def test_update_relays_init(self):
# Make sure there is not metadata present in db
self.assertEqual(self.db_metadata_query.count(), 0)
update_relays = UpdateRelays(
config=self.config, db_session=self.db_session, auto_update=False
)
self.assertEqual(self.db_metadata_query.count(), 1)
self.assertEqual(update_relays._UpdateRelays__hours_since_last_update(), 0)
# Call again in the simulated future
with freeze_time("2100-01-01"):
self.assertGreater(
update_relays._UpdateRelays__hours_since_last_update(), 100
)
def test__insert_batch_into_db(self):
update_relays = UpdateRelays(
config=self.config, db_session=self.db_session, auto_update=False
)
# Get Onionoo data
onionoo_relay_data = Onionoo([self.csailmitexit_fpr]).relay_entries
# Parse the consensus file
parsed_consensus = {self.csailmitexit_fpr: self.consensus_relay_entry}
# Check if the relay table is empty
self.assertEqual(self.db_relay_query.count(), 0)
update_relays._UpdateRelays__insert_batch_into_db(
onionoo_relay_data, parsed_consensus
)
# Check if the relay table was populated with correct data
self.assertEqual(self.db_relay_query.count(), 1)
self.assertEqual(self.db_relay_query.first().fingerprint, self.csailmitexit_fpr)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment