Commit 76d600f7 authored by Barkin Simsek's avatar Barkin Simsek 🐢
Browse files

Add skeleton for data analyzer class

parent 7b750ceb
Pipeline #7427 passed with stages
in 2 minutes and 35 seconds
......@@ -8,7 +8,7 @@ build:
docker-compose build
up:
docker-compose up -d --scale cm-worker=2 --scale cm-update=1
docker-compose up -d --scale cm-worker=2 --scale cm-update=1 --scale cm-analyze=1
down:
ifneq ($(shell docker ps -f ancestor="captchamonitor-tor-container" -q),)
......@@ -18,11 +18,11 @@ endif
docker-compose down --remove-orphans
test: down
docker-compose up -d --scale cm-worker=0 --scale cm-update=0
docker-compose up -d --scale cm-worker=0 --scale cm-update=0 --scale cm-analyze=0
docker-compose run --rm --no-deps --entrypoint="pytest -v --reruns 3 --reruns-delay 3 --cov=/src/captchamonitor/ --cov-report term-missing" captchamonitor /tests
logs:
docker-compose logs --tail=100 captchamonitor cm-worker cm-update
docker-compose logs --tail=100 captchamonitor cm-worker cm-update cm-analyze
init: check_root
@echo "\e[93m>> Creating .env file\e[0m"
......
......@@ -37,6 +37,15 @@ services:
- -m
- captchamonitor
- --worker
cm-analyze:
<<: *captchamonitor_base_service
restart: always
entrypoint:
- python
- -m
- captchamonitor
- --analyze
cm-update:
<<: *captchamonitor_base_service
......
......@@ -15,6 +15,13 @@ parser.add_argument(
default=False,
help="Run an instance of worker",
)
parser.add_argument(
"-a",
"--analyze",
action="store_true",
default=False,
help="Run an instance of data analyzer",
)
parser.add_argument(
"-u",
"--update",
......@@ -38,6 +45,9 @@ cm = CaptchaMonitor()
if args.worker:
logger.info("Intializing CAPTCHA Monitor in worker mode")
cm.worker()
elif args.analyse:
logger.info("Intializing CAPTCHA Monitor in data analysis mode")
cm.analyze()
elif args.update:
logger.info("Intializing CAPTCHA Monitor in update mode")
schedule.every().day.do(cm.update_domains)
......
......@@ -5,6 +5,7 @@ from typing import Optional
from captchamonitor.core.worker import Worker
from captchamonitor.utils.config import Config
from captchamonitor.core.analyzer import Analyzer
from captchamonitor.utils.database import Database
from captchamonitor.utils.exceptions import ConfigInitError, DatabaseInitError
from captchamonitor.core.schedule_jobs import ScheduleJobs
......@@ -65,30 +66,6 @@ class CaptchaMonitor:
# Obtain the session from database module
self.__db_session = self.__database.session()
def schedule_jobs(self) -> None:
"""
Adds new jobs to the database
"""
self.__logger.info("Scheduling new jobs")
ScheduleJobs(
config=self.__config,
db_session=self.__db_session,
)
def worker(self) -> None:
"""
Fetches a job from the database and processes it using Tor Browser or
other specified browsers. Inserts the result back into the database.
"""
self.__logger.info("Running worker %s", self.__node_id)
Worker(
worker_id=self.__node_id,
config=self.__config,
db_session=self.__db_session,
)
def update_domains(self) -> None:
"""
Updates the list of domains in the database
......@@ -113,12 +90,42 @@ class CaptchaMonitor:
UpdateFetchers(config=self.__config, db_session=self.__db_session)
def schedule_jobs(self) -> None:
"""
Adds new jobs to the database
"""
self.__logger.info("Scheduling new jobs")
ScheduleJobs(
config=self.__config,
db_session=self.__db_session,
)
def worker(self) -> None:
"""
Fetches a job from the database and processes it using Tor Browser or
other specified browsers. Inserts the result back into the database.
"""
self.__logger.info("Running worker %s", self.__node_id)
Worker(
worker_id=self.__node_id,
config=self.__config,
db_session=self.__db_session,
)
def analyze(self) -> None:
"""
Analyses the data recorded in the database
Analyzes the data recorded in the database
"""
self.__logger.debug("Started data analysis")
Analyzer(
analyzer_id=self.__node_id,
config=self.__config,
db_session=self.__db_session,
)
def __del__(self) -> None:
"""
Do cleaning before going out of scope
......
import logging
from sqlalchemy.orm import sessionmaker
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import Domain, Fetcher, FetchCompleted
class Analyzer:
"""
Analyses completed jobs
"""
def __init__(
self,
analyzer_id: str,
config: Config,
db_session: sessionmaker,
) -> None:
"""
Initializes a new analyser
:param analyzer_id: Analyser ID assigned for this analyser
:type analyzer_id: str
:param config: The config class instance that contains global configuration values
:type config: Config
:param db_session: Database session used to connect to the database
:type db_session: sessionmaker
"""
# Private class attributes for analyser
self.__logger = logging.getLogger(__name__)
self.__config: Config = config
self.__db_session: sessionmaker = db_session
self.__analyzer_id: str = analyzer_id
self.__loop_over_domains()
def __loop_over_domains(self) -> None:
"""
Loop over the domain list and get corresponding website data from the database
"""
# pylint: disable=C0121,W0104
domains = self.__db_session.query(Domain).all()
for domain in domains:
query_by_domain = (
self.__db_session.query(FetchCompleted)
.join(Fetcher)
.filter(FetchCompleted.ref_domain == domain)
)
tor = query_by_domain.filter(Fetcher.uses_tor == True).first()
non_tor = query_by_domain.filter(Fetcher.uses_tor == False).first()
exit_relay = tor.ref_relay
# Use the data for analysis
tor.html_data
tor.http_requests
non_tor.html_data
non_tor.http_requests
exit_relay.fingerprint
import unittest
from captchamonitor.core.worker import Worker
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import (
Relay,
Domain,
Fetcher,
FetchQueue,
FetchCompleted,
)
from captchamonitor.core.analyzer import Analyzer
from captchamonitor.utils.database import Database
class TestAnalyzer(unittest.TestCase):
def setUp(self):
self.config = Config()
self.database = Database(
self.config["db_host"],
self.config["db_port"],
self.config["db_name"],
self.config["db_user"],
self.config["db_password"],
)
self.db_session = self.database.session()
self.analyzer_id = "0"
self.test_domain = "duckduckgo.com"
self.worker = Worker(
worker_id="0",
config=self.config,
db_session=self.db_session,
loop=False,
)
domain = Domain(
domain=self.test_domain,
supports_http=True,
supports_https=True,
supports_ftp=False,
supports_ipv4=True,
supports_ipv6=False,
requires_multiple_requests=True,
)
test_relay = Relay(
fingerprint="A53C46F5B157DD83366D45A8E99A244934A14C46",
ipv4_address="128.31.0.13",
ipv4_exiting_allowed=True,
ipv6_exiting_allowed=False,
)
test_fetcher_non_tor = Fetcher(
method="firefox_browser", uses_tor=False, version="82"
)
test_fetcher_tor = Fetcher(method="tor_browser", uses_tor=True, version="82")
queue_non_tor = FetchQueue(
url=f"https://www.{self.test_domain}",
fetcher_id=1,
domain_id=1,
)
queue_tor = FetchQueue(
url=f"https://www.{self.test_domain}",
fetcher_id=2,
domain_id=1,
relay_id=1,
)
# Commit changes to the database
self.db_session.add(domain)
self.db_session.add(test_relay)
self.db_session.add(test_fetcher_non_tor)
self.db_session.add(test_fetcher_tor)
self.db_session.add(queue_non_tor)
self.db_session.add(queue_tor)
self.db_session.commit()
# Process the non tor job
self.worker.process_next_job()
# Process the tor job
self.worker.process_next_job()
# Make sure that the jobs are processed
self.assertEqual(self.db_session.query(FetchCompleted).count(), 2)
def tearDown(self):
self.db_session.close()
def test_analyzer_init(self):
Analyzer(
analyzer_id=self.analyzer_id,
config=self.config,
db_session=self.db_session,
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment