Commit e2dcdc52 authored by juga's avatar juga Committed by Georg Koppen
Browse files

fix: scanner: Stop attaching streams to same circuit

Block other threads to attach an stream to the same circuit.
Otherwise, if there're measurer threads trying to attach other streams,
the controller will have several listener for the same event type
(stream) and it might (will?) use the same listener (and circuit) for
the new streams.
Bugfix #40130.

Closes #40150
parent 43e8159a
Loading
Loading
Loading
Loading
+22 −6
Original line number Diff line number Diff line
@@ -297,11 +297,6 @@ def upload_data_multipart(session, conf, dest, cont, circ_id):
    """
    log.debug("Uploading data...")
    settings.stream_event[circ_id] = {}
    listener = stem_utils.attach_stream_to_circuit_listener(
        cont, circ_id, BWSCANNER_CC2
    )
    stem_utils.add_event_listener(cont, listener, EventType.STREAM)

    settings.circ_bw_event[circ_id] = {}
    circ_bw_listener = functools.partial(stem_utils.handle_circ_bw_event)
    stem_utils.add_event_listener(cont, circ_bw_listener, EventType.CIRC_BW)
@@ -318,6 +313,28 @@ def upload_data_multipart(session, conf, dest, cont, circ_id):
        data = str(data)
    size_ss0 = conf.getint("scanner", "http_post_initial_size_ss0")

    # Block other threads to attach an stream to the same circuit.
    # Otherwise, if there're measurer threads trying to attach other streams,
    # the controller will have several listener for the same event type
    # (stream) and it might (will?) use the same listener (and circuit) for
    # the new streams.
    with stem_utils.stream_building_lock:
        listener = stem_utils.attach_stream_to_circuit_listener(
            cont, circ_id, BWSCANNER_CC2
        )
        stem_utils.add_event_listener(cont, listener, EventType.STREAM)
        # There must be some stream (HTTP request) to actually attach the
        # stream.
        try:
            session.head(dest.url, verify=dest.verify)
        except requests.exceptions.RequestException as e:
            dest.add_failure()
            return None, "Could not connect to {} over circ {} {}: {}".format(
                dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e
            )
        finally:
            stem_utils.remove_event_listener(cont, listener)

    # Only used in this piece of code
    from requests_toolbelt.multipart import encoder

@@ -352,7 +369,6 @@ def upload_data_multipart(session, conf, dest, cont, circ_id):
        log.debug(e)
        return None, e
    finally:
        stem_utils.remove_event_listener(cont, listener)
        stem_utils.remove_event_listener(cont, circ_bw_listener)

    if response and response.status_code != requests.codes.ok: