Commit 236f7d4a authored by Matt Traudt's avatar Matt Traudt
Browse files

Find usable destinations; refactor a function to make that easier ...

Since connect_to_destination_over_circuit already determines if a
detination is usable, it made sense to move it and use it in more
places.

Fun fact:
During the process of determining if a destination is usable, a stream
is created to it. Given the same requests Session object, this stream
can be reused (and is by default in requests code) to make additional
requests to the web server. This is why the function is called
connect_to_destination_over_circuit. When first written, determining if
a destination is usbale was the side effect while opening the socket to
the destination was the primary effect.
parent d996a938
Loading
Loading
Loading
Loading
+9 −53
Original line number Diff line number Diff line
@@ -12,7 +12,6 @@ from ..lib.destination import DestinationList
# from ..util.sockio import (make_socket, close_socket)
from sbws.globals import (fail_hard, is_initted)
import sbws.util.stem as stem_utils
from stem.control import EventType
from argparse import ArgumentDefaultsHelpFormatter
from multiprocessing.dummy import Pool
from threading import Event
@@ -92,49 +91,6 @@ def measure_rtt_to_server(session, conf, dest, content_length):
    return rtts


def connect_to_destination_over_circuit(dest, circ_id, session, cont, conf):
    '''
    Connect to **dest* over the given **circ_id** using the given Requests
    **session**. Make sure everything seems in order. Return True and a
    dictionary of helpful information if we connected and everything looks
    fine.  Otherwise return False and a string stating what the issue is.

    :param dest Destination: the place to which we should connect
    :param circ_id str: the circuit we should connect over
    :param session Session: the Requests library session object to use to make
        the connection.
    :param cont Controller: them Stem library controller controlling Tor
    :returns: True and a dictionary if everything is in order and measurements
        should commence.  False and an error string otherwise.
    '''
    error_prefix = 'When sending HTTP HEAD to {}, '.format(dest.url)
    with stem_utils.stream_building_lock:
        listener = stem_utils.attach_stream_to_circuit_listener(cont, circ_id)
        stem_utils.add_event_listener(cont, listener, EventType.STREAM)
        try:
            # TODO:
            # - What other exceptions can this throw?
            # - Add timeout
            head = session.head(dest.url)
        except requests.exceptions.ConnectionError as e:
            return False, 'Could not connect to {} over circ {} {}: {}'.format(
                dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
        stem_utils.remove_event_listener(cont, listener)
    if head.status_code != requests.codes.ok:
        return False, error_prefix + 'we expected HTTP code '\
            '{} not {}'.format(requests.codes.ok, head.status_code)
    if 'content-length' not in head.headers:
        return False, error_prefix + 'we except the header Content-Length '\
                'to exist in the response'
    max_dl = conf.getint('scanner', 'max_download_size')
    content_length = int(head.headers['content-length'])
    if max_dl > content_length:
        return False, error_prefix + 'our maximum configured download size '\
            'is {} but the content is only {}'.format(max_dl, content_length)
    log.debug('Connected to %s over circuit %s', dest.url, circ_id)
    return True, {'content_length': content_length}


def measure_bandwidth_to_server(session, conf, dest, content_length):
    results = []
    num_downloads = conf.getint('scanner', 'num_downloads')
@@ -206,18 +162,18 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
              relay.fingerprint[0:8])
    # Make a connection to the destionation webserver and make sure it can
    # still help us measure
    success, details = connect_to_destination_over_circuit(
        dest, circ_id, s, cb.controller, conf)
    if not success:
        log.warning('When measuring %s %s: %s', relay.nickname,
                    relay.fingerprint[0:8], details)
    is_usable, usable_data = dest.is_usable(circ_id, s, cb.controller)
    if not is_usable:
        log.warning('When measuring %s %s the destination seemed to have '
                    'stopped being usable: %s', relay.nickname,
                    relay.fingerprint[0:8], usable_data)
        cb.close_circuit(circ_id)
        # TODO: Return ResultError of some sort???
        return None
    assert success
    assert 'content_length' in details
    assert is_usable
    assert 'content_length' in usable_data
    # FIRST: measure RTT
    rtts = measure_rtt_to_server(s, conf, dest, details['content_length'])
    rtts = measure_rtt_to_server(s, conf, dest, usable_data['content_length'])
    if rtts is None:
        log.warning('Unable to measure RTT to %s via relay %s %s',
                    dest.url, relay.nickname, relay.fingerprint[0:8])
@@ -226,7 +182,7 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
        return None
    # SECOND: measure bandwidth
    bw_results = measure_bandwidth_to_server(
        s, conf, dest, details['content_length'])
        s, conf, dest, usable_data['content_length'])
    circ_fps = cb.get_circuit_path(circ_id)
    our_nick = conf['scanner']['nickname']
    cb.close_circuit(circ_id)
+70 −5
Original line number Diff line number Diff line
@@ -2,14 +2,59 @@ import logging
import random
import time
from threading import RLock
import requests
from urllib.parse import urlparse
from stem.control import EventType
import sbws.util.stem as stem_utils

log = logging.getLogger(__name__)


def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
    '''
    Connect to **dest* over the given **circ_id** using the given Requests
    **session**. Make sure everything seems in order. Return True and a
    dictionary of helpful information if we connected and everything looks
    fine.  Otherwise return False and a string stating what the issue is.

    :param dest Destination: the place to which we should connect
    :param circ_id str: the circuit we should connect over
    :param session Session: the Requests library session object to use to make
        the connection.
    :param cont Controller: them Stem library controller controlling Tor
    :returns: True and a dictionary if everything is in order and measurements
        should commence.  False and an error string otherwise.
    '''
    assert isinstance(dest, Destination)
    error_prefix = 'When sending HTTP HEAD to {}, '.format(dest.url)
    with stem_utils.stream_building_lock:
        listener = stem_utils.attach_stream_to_circuit_listener(cont, circ_id)
        stem_utils.add_event_listener(cont, listener, EventType.STREAM)
        try:
            # TODO:
            # - What other exceptions can this throw?
            # - Add timeout
            head = session.head(dest.url)
        except requests.exceptions.ConnectionError as e:
            return False, 'Could not connect to {} over circ {} {}: {}'.format(
                dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
        stem_utils.remove_event_listener(cont, listener)
    if head.status_code != requests.codes.ok:
        return False, error_prefix + 'we expected HTTP code '\
            '{} not {}'.format(requests.codes.ok, head.status_code)
    if 'content-length' not in head.headers:
        return False, error_prefix + 'we except the header Content-Length '\
                'to exist in the response'
    content_length = int(head.headers['content-length'])
    if max_dl > content_length:
        return False, error_prefix + 'our maximum configured download size '\
            'is {} but the content is only {}'.format(max_dl, content_length)
    log.debug('Connected to %s over circuit %s', dest.url, circ_id)
    return True, {'content_length': content_length}


class Destination:
    def __init__(self, url, default_path):
    def __init__(self, url, default_path, max_dl):
        u = urlparse(url)
        # these things should have been verified in verify_config
        assert u.scheme in ['http', 'https']
@@ -19,6 +64,14 @@ class Destination:
            u = urlparse('{}://{}{}{}{}{}'.format(
                *u[0:2], default_path, *u[2:]))
        self._url = u
        self._max_dl = max_dl

    def is_usable(self, circ_id, session, cont):
        ''' Use **connect_to_destination_over_circuit** to determine if this
        destination is usable and return what it returns. Just a small wrapper.
        '''
        return connect_to_destination_over_circuit(
            self, circ_id, session, cont, self._max_dl)

    @property
    def url(self):
@@ -43,10 +96,10 @@ class Destination:
        return p

    @staticmethod
    def from_config(conf_section, default_path):
    def from_config(conf_section, default_path, max_dl):
        assert 'url' in conf_section
        url = conf_section['url']
        return Destination(url, default_path)
        return Destination(url, default_path, max_dl)


class DestinationList:
@@ -72,6 +125,8 @@ class DestinationList:
        self._usability_lock.acquire()
        log.debug('Perform usability tests')
        cont = self._cont
        session = requests.Session()
        usable_dests = []
        for dest in self._all_dests:
            possible_exits = self._rl.exits_can_exit_to(
                dest.hostname, dest.port)
@@ -80,8 +135,8 @@ class DestinationList:
            possible_exits = sorted(
                possible_exits, key=lambda e: e.bandwidth, reverse=True)
            exits = possible_exits[0:num_keep]
            circ_id = None
            # Try three times to build a circuit to test this destination
            circ_id = None
            for _ in range(0, 3):
                # Pick a random exit
                exit = random.choice(exits)
@@ -94,7 +149,16 @@ class DestinationList:
                continue
            log.debug('Built circ %s %s to test usability of %s', circ_id,
                      stem_utils.circuit_str(cont, circ_id), dest.url)
            is_usable, data = dest.is_usable(circ_id, session, cont)
            if not is_usable:
                log.warning(data)
                continue
            assert is_usable
            log.debug('%s seems usable so we will keep it', dest.url)
            usable_dests.append(dest)
            self._cb.close_circuit(circ_id)
        self._usable_dests = usable_dests
        self._last_usability_test = time.time()
        self._usability_lock.release()

    @staticmethod
@@ -113,7 +177,8 @@ class DestinationList:
            assert dest_sec in conf  # validate_config should require this
            log.debug('Loading info for destination %s', key)
            dests.append(Destination.from_config(
                conf[dest_sec], default_path))
                conf[dest_sec], default_path,
                conf.getint('scanner', 'max_download_size')))
        if len(dests) < 1:
            return None, 'No enabled destinations in config'
        return DestinationList(conf, dests, circuit_builder, relay_list,