Commit 491e2dbb authored by Barkin Simsek's avatar Barkin Simsek 🐢
Browse files

Add job scheduler

parent aba597f5
import time
import logging
import argparse
from random import randint
import schedule
......@@ -28,6 +29,9 @@ logging.basicConfig(format="%(asctime)s %(module)s [%(levelname)s] %(message)s")
logger = logging.getLogger("captchamonitor")
logger.setLevel(logging.DEBUG)
# Add a random startup delay to prevent possible race conditions
time.sleep(randint(1, 5))
cm = CaptchaMonitor()
# Run in the specified mode
......@@ -39,6 +43,7 @@ elif args.update:
schedule.every().day.do(cm.update_domains)
schedule.every().hour.do(cm.update_relays)
schedule.every().day.do(cm.update_fetchers)
schedule.every().hour.do(cm.schedule_jobs)
# Run all scheduled jobs at the beginning
schedule.run_all()
......
......@@ -7,6 +7,7 @@ from captchamonitor.core.worker import Worker
from captchamonitor.utils.config import Config
from captchamonitor.utils.database import Database
from captchamonitor.utils.exceptions import ConfigInitError, DatabaseInitError
from captchamonitor.core.schedule_jobs import ScheduleJobs
from captchamonitor.core.update_relays import UpdateRelays
from captchamonitor.core.update_domains import UpdateDomains
from captchamonitor.utils.small_scripts import node_id
......@@ -64,11 +65,16 @@ class CaptchaMonitor:
# Obtain the session from database module
self.__db_session = self.__database.session()
def add_jobs(self) -> None:
def schedule_jobs(self) -> None:
"""
Adds new jobs to the database
"""
self.__logger.info("Adding new jobs")
self.__logger.info("Scheduling new jobs")
ScheduleJobs(
config=self.__config,
db_session=self.__db_session,
)
def worker(self) -> None:
"""
......
import time
import logging
from typing import Optional
from sqlalchemy.orm import sessionmaker
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import Relay, Domain, Fetcher, FetchQueue
class ScheduleJobs:
"""
Uses the list of domains, fetchers, and relays to inteligently schedule new
jobs for workers to process.
"""
def __init__(
self,
config: Config,
db_session: sessionmaker,
loop: Optional[bool] = True,
):
"""
Initializes a new job scheduler
:param config: The config class instance that contains global configuration values
:type config: Config
:param db_session: Database session used to connect to the database
:type db_session: sessionmaker
:param loop: Should I keep adding multiple batches of jobs, defaults to True
:type loop: bool, optional
"""
# Private class attributes
self.__logger = logging.getLogger(__name__) # pylint: disable=R0801
self.__config: Config = config
self.__db_session: sessionmaker = db_session
self.__job_queue_delay: float = float(self.__config["job_queue_delay"])
# Loop over the jobs
while loop:
self.schedule_next_batch()
time.sleep(self.__job_queue_delay)
def schedule_next_batch(self) -> None:
"""
Goes over all available domains and inserts a new job for fetching them
with Tor Browser and Firefox Browser
"""
# Get the list of domains
domains = self.__db_session.query(Domain).all()
tor_browser = (
self.__db_session.query(Fetcher)
.filter(Fetcher.method == "tor_browser")
.first()
)
firefox_browser = (
self.__db_session.query(Fetcher)
.filter(Fetcher.method == "firefox_browser")
.first()
)
relay = (
self.__db_session.query(Relay)
.filter(Relay.ipv4_exiting_allowed == True) # pylint: disable=C0121
.first()
)
for domain in domains:
new_job_tor_browser = FetchQueue(
url=f"https://www.{domain.domain}",
fetcher_id=tor_browser.id,
domain_id=domain.id,
relay_id=relay.id,
)
new_job_firefox_browser = FetchQueue(
url=f"https://www.{domain.domain}",
fetcher_id=firefox_browser.id,
domain_id=domain.id,
)
self.__db_session.add(new_job_tor_browser)
self.__db_session.add(new_job_firefox_browser)
# Save changes
self.__db_session.commit()
import unittest
from captchamonitor.utils.config import Config
from captchamonitor.utils.models import Relay, Domain, Fetcher, FetchQueue
from captchamonitor.utils.database import Database
from captchamonitor.core.schedule_jobs import ScheduleJobs
class TestScheduleJobs(unittest.TestCase):
def setUp(self):
self.config = Config()
self.database = Database(
self.config["db_host"],
self.config["db_port"],
self.config["db_name"],
self.config["db_user"],
self.config["db_password"],
)
self.db_session = self.database.session()
domain_1 = Domain(
domain="check.torproject.org",
supports_http=True,
supports_https=True,
supports_ftp=False,
supports_ipv4=True,
supports_ipv6=False,
requires_multiple_requests=True,
)
domain_2 = Domain(
domain="duckduckgo.com",
supports_http=True,
supports_https=True,
supports_ftp=False,
supports_ipv4=True,
supports_ipv6=False,
requires_multiple_requests=True,
)
test_relay = Relay(
fingerprint="A53C46F5B157DD83366D45A8E99A244934A14C46",
ipv4_address="128.31.0.13",
ipv4_exiting_allowed=True,
ipv6_exiting_allowed=False,
)
test_fetcher_non_tor = Fetcher(
method="firefox_browser", uses_tor=False, version="82"
)
test_fetcher_tor = Fetcher(method="tor_browser", uses_tor=True, version="82")
# Commit changes to the database
self.db_session.add(domain_1)
self.db_session.add(domain_2)
self.db_session.add(test_relay)
self.db_session.add(test_fetcher_non_tor)
self.db_session.add(test_fetcher_tor)
self.db_session.commit()
self.schedule_jobs = ScheduleJobs(
config=self.config,
db_session=self.db_session,
loop=False,
)
self.db_fetch_queue_query = self.db_session.query(FetchQueue)
def tearDown(self):
self.db_session.close()
def test_schedule_jobs_init(self):
# Make sure the table is empty
self.assertEqual(self.db_fetch_queue_query.count(), 0)
# Schedule jobs
self.schedule_jobs.schedule_next_batch()
# Check if jobs are scheduled
self.assertEqual(self.db_fetch_queue_query.count(), 4)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment