Commit c89a8a35 authored by juga's avatar juga
Browse files

Merge branch 'maint-1.1'

parents 44489f37 02861e10
......@@ -45,6 +45,7 @@ Included in the
.. _torflow_aggr:
Torflow measurements aggregation
Torflow aggregation or scaling goal is:
From Torflow's `README.spec.txt`_ (section 2.2)::
In this way, the resulting network status consensus bandwidth values
are effectively re-weighted proportional to how much faster the node
was as compared to the rest of the network.
With and without PID control
Per relay measurements' bandwidth
They are calculated in the same way whether or not `PID controller`_ feedback
is used.
From Torflow's `README.spec.txt`_ (section 1.6)::
The strm_bw field is the average (mean) of all the streams for the relay
identified by the fingerprint field.
The filt_bw field is computed similarly, but only the streams equal to
or greater than the strm_bw are counted in order to filter very slow
streams due to slow node pairings.
In the code, ``_, ``strm_bw`` is ``sbw`` and
``filt_bw`` is ``filt_sbws``::
for rs in RouterStats.query.filter(stats_clause).\
tot_sbw = 0
sbw_cnt = 0
for s in rs.router.streams:
if isinstance(s, ClosedStream):
skip = False
#for br in badrouters:
# if br != rs:
# if br.router in s.circuit.routers:
# skip = True
if not skip:
# Throw out outliers < mean
# (too much variance for stddev to filter much)
if rs.strm_closed == 1 or s.bandwidth() >= rs.sbw:
tot_sbw += s.bandwidth()
sbw_cnt += 1
if sbw_cnt: rs.filt_sbw = tot_sbw/sbw_cnt
else: rs.filt_sbw = None
This is also expressed in pseudocode in the `bandwidth file spec`_, section B.4
step 1.
Calling ``bw_i`` to ``strm_bw`` and ``bwfilt_i`` to ``filt_bw``,
if ``bw_j`` is a measurement for a relay ``i`` and ``m`` is the number of
measurements for that relay, then:
.. math::
bw_i = \mu(bw_j) = \frac{\sum_{j=1}^{m}bw_j}{m}
.. math::
bwfilt_i &= \mu(max(\mu(bw_j), bw_j))
= \frac{\sum_{j=1}^{m} max\left(\frac{\sum_{j=1}^{m}bw_j}{m}, bw_j\right)}{m}
Network measurements' bandwidth average
From `README.spec.txt`_ (section 2.1)::
Once we have determined the most recent measurements for each node, we
compute an average of the filt_bw fields over all nodes we have measured.
In Torflow's ``_ code::
filt_avg = sum(map(lambda n: n.filt_bw, nodes.itervalues()))/float(len(nodes))
strm_avg = sum(map(lambda n: n.strm_bw, nodes.itervalues()))/float(len(nodes))
Both in the code with PID and without, all types of nodes get the same
This is also expressed in pseudocode in the `bandwidth file spec`_, section B.4
step 2.
Calling ``bwstrm`` to ``strm_avg`` and ``bwfilt`` to ``fitl_avg``, if ``n`` is
the number of relays in the network, then:
.. math::
bwstrm &= \mu(bw_i)
= \frac{\sum_{i=1}^{n}bw_i}{n}
= \frac{\sum_{i=1}^{n} \frac{\sum_{j=1}^{m}bw_j}{m} }{n}
.. math::
bwfilt &= \mu(bwfilt_i)
= \frac{\sum_{i=1}^{n}bwfilt_i}{n}
= \frac{\sum_{i=1}^{n}\frac{\sum_{j=1}^{m}max(\mu(bw_j), bw_j)}{m}}{n}
= \frac{\sum_{i=1}^{n}\frac{\sum_{j=1}^{m}max\left(\frac{\sum_{j=1}^{m}bw_j}{m}, bw_j\right)}{m}}{n}
Per relay bandwidth ratio
From `README.spec.txt`_ (section 2.2)::
These averages are used to produce ratios for each node by dividing the
measured value for that node by the network average.
In Torflow's ``_ code::
for n in nodes.itervalues():
n.fbw_ratio = n.filt_bw/true_filt_avg[n.node_class()]
n.sbw_ratio = n.strm_bw/true_strm_avg[n.node_class()]
# Choose the larger between sbw and fbw
if n.sbw_ratio > n.fbw_ratio:
n.ratio = n.sbw_ratio
n.ratio = n.fbw_ratio
This is also expressed in pseudocode in the `bandwidth file spec`_, section B.4
step 2 and 3.
Calling ``rf_i`` to ``fbw_ratio`` and ``rs_i`` to ``sbw_ration`` and ``r_i``
to ``ratio``:
.. math::
rf_i = \frac{bwfilt_i}{bwfilt}
rs_i = \frac{bw_i}{bwstrm}
.. math::
r_i = max(rf_i, rs_i)
= max\left(\frac{bwfilt_i}{bwfilt}, \frac{bw_i}{bwstrm}\right)
= max\left(\frac{bwfilt_i}{\mu(bwfilt_i)}, \frac{bw_i}{\mu(bwfilt_i)}\right)
Per relay descriptors bandwidth
From ``_ code, it is the minimum of all the descriptor bandwidth
bws = map(int, g)
bw_observed = min(bws)
return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
ns.flags, ip, version, os, uptime, published, contact, rate_limited,
ns.orhash, ns.bandwidth, extra_info_digest, ns.unmeasured)
Because of the matched regular expression, ``bws`` is **not** all the descriptor
bandwidth values, but the observed bandwidth and the burst bandwidth, ie., it
does not take the average bandwidth, what seems to be a bug in Torflow.
This is passed to ``Router``, in which the consensus bandwidth is assigned to the
descriptor bandwidth when there is no consensus bandwidth::
(idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime,
published, contact, rate_limited, orhash,
ns_bandwidth,extra_info_digest,unmeasured) = args
if ns_bandwidth != None: = max(ns_bandwidth,1) # Avoid div by 0
else: = max(bw,1) # Avoid div by 0
self.desc_bw = max(bw,1) # Avoid div by 0
And written by ``_ as descriptor and conensus bandwidth::
f.write(" desc_bw="+str(int(cvt(s.avg_desc_bw,0))))
f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n")
Without PID control
Per relay scaled bandwidth
From `README.spec.txt`_ (section 2.2)::
These ratios are then multiplied by the most recent observed descriptor
bandwidth we have available for each node, to produce a new value for
the network status consensus process.
In ``_ code::
n.new_bw = n.desc_bw*n.ratio
This is also expressed in pseudocode in the `bandwidth file spec`_, section B.4
step 5.
Calling ``bwnew_i`` to ``new_bw`` and ``descbw_i`` to ``use_bw``:
.. math::
descbw_i = min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right)
bwnew_i =& descbw_i \times r_i \
&= min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max(rf_i, rs_i) \
&= min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max\left(\frac{bwfilt_i}{bwfilt}, \frac{bw_i}{bwstrm}\right) \
&= min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max\left(\frac{bwfilt_i}{\mu(bwfilt_i)}, \frac{bw_i}{\mu(bw_i)}\right)
With PID control
Per relay descriptors bandwidth
Even though `README.spec.txt`_ talks about the consensus bandwidth, in
``_ code, the consensus bandwidth is never used, since
``use_desc_bw`` is initialized to True and never changed::
self.use_desc_bw = True
if cs_junk.bwauth_pid_control:
if cs_junk.use_desc_bw:
n.use_bw = n.desc_bw
n.use_bw = n.ns_bw
Per relay scaled bandwidth
From `README.spec.txt`_ section 3.1::
The bandwidth authorities measure F_node: the filtered stream
capacity through a given node (filtering is described in Section 1.6).
pid_error = e(t) = (F_node - F_avg)/F_avg.
new_consensus_bw = old_consensus_bw +
old_consensus_bw * K_p * e(t) +
old_consensus_bw * K_i * \integral{e(t)} +
old_consensus_bw * K_d * \derivative{e(t)}
For the case where K_p = 1, K_i=0, and K_d=0, it can be seen that this
system is equivalent to the one defined in 2.2, except using consensus
bandwidth instead of descriptor bandwidth:
new_bw = old_bw + old_bw*e(t)
new_bw = old_bw + old_bw*(F_node/F_avg - 1)
new_bw = old_bw*F_node/F_avg
new_bw = old_bw*ratio
In Torflow's code, this is actually the case and most of the code is not
executed because the default ``K`` values.
It seems then that ``F_node`` is ``filt_bw`` in Torflow's code or ``bwfilt_i``
here, and ``F_avg`` is ``filt_avg`` in Torflow's code or ``bwfilt`` here.
In ``_ code, pid error also depends on which of the ratios is
if cs_junk.use_best_ratio and n.sbw_ratio > n.fbw_ratio:
n.pid_error = (n.strm_bw - true_strm_avg[n.node_class()]) \
/ true_strm_avg[n.node_class()]
n.pid_error = (n.filt_bw - true_filt_avg[n.node_class()]) \
/ true_filt_avg[n.node_class()]
n.new_bw = n.use_bw + cs_junk.K_p*n.use_bw*n.pid_error
Calling ``e_i`` to ``pid_error``, in the case that ``rs_i`` > ``rf_i``:
.. math::
e_i = \frac{bw_i - bwstrm}{bwstrm} = \frac{bw_i}{bwstrm} - 1
bwn_i = descbw_i + descbw_i \times e_i = descbw_i \times (1 + e_i)
= descbw_i \times (1 + \frac{bw_i}{bwstrm} - 1)
= descbw_i \times \frac{bw_i}{bwstrm} = descbw_i \times rs_i
And in the case that ``rs_i`` < ``rf_i``:
.. math::
e_i = \frac{bwfilt_i - bwfilt}{bwfilt} = \frac{bwfilt_i}{bwfilt} - 1
bwn_i = descbw_i + descbw_i \times e_i = descbw_i \times (1 + e_i)
= descbw_i \times (1 + \frac{bwfilt_i}{bwfilt} - 1)
= descbw_i \times \frac{bwfilt_i}{bwfilt} = descbw_i \times rf_i
So, it is the same as the scaled bandwidth in the case without PID controller,
.. math::
bwn_i = descbw_i \times max(rf_i, rs_i)
With and without PID control
Per relay scaled bandwidth limit
Once each relay bandwidth is scaled, it is limited to a maximum, that is
calculated as the sum of all the relays in the current consensus scaled
bandwidth per 0.05.
From ``_ code::
NODE_CAP = 0.05
if n.idhex in prev_consensus:
if prev_consensus[n.idhex].bandwidth != None:
prev_consensus[n.idhex].measured = True
tot_net_bw += n.new_bw
if n.new_bw > tot_net_bw*NODE_CAP:
n.new_bw = int(tot_net_bw*NODE_CAP)
.. math::
bwn_i =& min\left(bwnew_i,
\sum_{i=1}^{n}bwnew_i \times 0.05\right) \
&= min\left(
\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times r_i\right),
\sum_{i=1}^{n}\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times r_i\right)
\times 0.05\right)\
&= min\left(
\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max\left(rf_i, rs_i\right)\right),
\sum_{i=1}^{n}\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times
max\left(rf_i, rs_i\right)\right) \times 0.05\right)\
&= min\left(
\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max\left(\frac{bwfilt_i}{bwfilt},
\sum_{i=1}^{n}\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times
\frac{bw_i}{bwstrm}\right)\right) \times 0.05\right)
.. math::
bwn_i = min\left(
\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times max\left(\frac{bwfilt_i}{bwfilt},
\sum_{i=1}^{n}\left(min\left(bwobs_i, bwavg_i, bwburst_i, measuredconsensusbw_i \right) \times
\frac{bw_i}{bwstrm}\right)\right) \times 0.05\right)
Per relay scaled bandwidth rounding
Finally, the new scaled bandwidth is expressed in kilobytes and rounded a number
of digits.
Differences between Torflow aggregation and sbws scaling (May 2020)
Torflow does not exclude relays because of having "few" measurements or "close"
to each other for that relay.
If there are not new measurements for a relay, Torflow uses the previous
calculated bandwidth, instead of the new value::
# If there is a new sample, let's use it for all but guards
if n.measured_at > prev_votes.vote_map[n.idhex].measured_at:
# Reset values. Don't vote/sample this measurement round.
The oldest measurements Toflow seems to take are from 4 weeks ago, while sbws
oldest measurements are 5 days old::
GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks
# old measurements are probably
# better than no measurements. We may not
# measure hibernating routers for days.
# This filter is just to remove REALLY old files
if time.time() - timestamp > MAX_AGE:
.. _README.spec.txt:
.. _PID Controller:
.. _bandwidth file spec:
"""Expected bandwidth file values for KeyValues."""
# flake8: noqa: E741
# (E741 ambiguous variable name), when using l.
import logging
from stem import descriptor
from statistics import mean
def bw_measurements_from_results(results):
return [
dl['amount'] / dl['duration']
for r in results for dl in r.downloads
def bw_filt(bw_measurements):
"""Filtered bandwidth for a relay.
It is the equivalent to Torflow's ``filt_sbw``.
``mu`` in this function is the equivalent to Torflow's ``sbw``.
mu = mean(bw_measurements)
bws_gte_mean = filter(lambda bw: bw >= mu, bw_measurements)
return mean(bws_gte_mean)
# -*- coding: utf-8 -*-
"""Classes and functions that create the bandwidth measurements document
(v3bw) used by bandwidth authorities."""
# flake8: noqa: E741
# (E741 ambiguous variable name), when using l.
import copy
import logging
......@@ -15,6 +17,7 @@ from sbws.globals import (SPEC_VERSION, BW_LINE_SIZE, SBWS_SCALE_CONSTANT,
from sbws.lib import scaling
from sbws.lib.resultdump import ResultSuccess, _ResultType
from sbws.util.filelock import DirectoryLock
from sbws.util.timestamp import (now_isodt_str, unixts_to_isodt_str,
......@@ -631,15 +634,17 @@ class V3BWLine(object):
assert node_id.startswith('$')
self.node_id = node_id = bw
# For now, we do not want to add ``bw_filt`` to the bandwidth file,
# therefore it is set here but not added to ``BWLINE_KEYS_V1``.
[setattr(self, k, v) for k, v in kwargs.items()
if k in BWLINE_KEYS_V1]
if k in BWLINE_KEYS_V1 + ["bw_filt"]]
def __str__(self):
return self.bw_strv1
def from_results(cls, results, secs_recent=None, secs_away=None,
min_num=0, router_statuses_d=None):
"""Convert sbws results to relays' Bandwidth Lines
``bs`` stands for Bytes/seconds
......@@ -753,6 +758,30 @@ class V3BWLine(object):
return (cls(node_id, 1, **kwargs), exclusion_reason)
# Use the last consensus if available, since the results' consensus
# values come from the moment the measurement was made.
if router_statuses_d and node_id in router_statuses_d:
consensus_bandwidth = \
router_statuses_d[node_id].bandwidth * 1000
consensus_bandwidth_is_unmeasured = \
consensus_bandwidth = \
consensus_bandwidth_is_unmeasured = \
# If there is no last observed bandwidth, there won't be mean either.
desc_bw_obs_last = \
# Exclude also relays without consensus bandwidth nor observed
# bandwidth, since they can't be scaled
if (desc_bw_obs_last is None and consensus_bandwidth is None):
# This reason is not counted, not added in the file, but it will
# have vote = 0
return(cls(node_id, 1), "no_consensus_no_observed_bw")
# For any line not excluded, do not include vote and unmeasured
# KeyValues
del kwargs['vote']
......@@ -762,22 +791,24 @@ class V3BWLine(object):
if rtt:
kwargs['rtt'] = rtt
bw = cls.bw_median_from_results(results_recent)
# XXX: all the class functions could use the bw_measurements instead of
# obtaining them each time or use a class Measurements.
bw_measurements = scaling.bw_measurements_from_results(results_recent)
kwargs['bw_mean'] = cls.bw_mean_from_results(results_recent)
kwargs['bw_filt'] = scaling.bw_filt(bw_measurements)
kwargs['bw_median'] = cls.bw_median_from_results(
kwargs['desc_bw_avg'] = \
kwargs['desc_bw_bur'] = \
kwargs['consensus_bandwidth'] = \
kwargs['consensus_bandwidth'] = consensus_bandwidth
kwargs['consensus_bandwidth_is_unmeasured'] = \
kwargs['desc_bw_obs_last'] = \
kwargs['desc_bw_obs_last'] = desc_bw_obs_last
kwargs['desc_bw_obs_mean'] = \
bwl = cls(node_id, bw, **kwargs)
return bwl, None
......@@ -862,6 +893,7 @@ class V3BWLine(object):
for r in reversed(results):
if r.relay_average_bandwidth is not None:
return r.relay_average_bandwidth
log.warning("Descriptor average bandwidth is None.")
return None
......@@ -870,6 +902,7 @@ class V3BWLine(object):
for r in reversed(results):
if r.relay_burst_bandwidth is not None:
return r.relay_burst_bandwidth
log.warning("Descriptor burst bandwidth is None.")
return None
......@@ -878,6 +911,7 @@ class V3BWLine(object):
for r in reversed(results):
if r.consensus_bandwidth is not None:
return r.consensus_bandwidth
log.warning("Consensus bandwidth is None.")
return None
......@@ -886,6 +920,7 @@ class V3BWLine(object):
for r in reversed(results):
if r.consensus_bandwidth_is_unmeasured is not None:
return r.consensus_bandwidth_is_unmeasured
log.warning("Consensus bandwidth is unmeasured is None.")
return None
......@@ -895,7 +930,8 @@ class V3BWLine(object):
if r.relay_observed_bandwidth is not None:
if desc_bw_obs_ls:
return max(round(mean(desc_bw_obs_ls)), 1)
return round(mean(desc_bw_obs_ls))
log.warning("Descriptor observed bandwidth is None.")
return None
......@@ -904,6 +940,7 @@ class V3BWLine(object):
for r in reversed(results):
if r.relay_observed_bandwidth is not None:
return r.relay_observed_bandwidth
log.warning("Descriptor observed bandwidth is None.")
return None
......@@ -984,8 +1021,10 @@ class V3BWFile(object):
destinations_countries, state_fpath)
bw_lines_raw = []
bw_lines_excluded = []
number_consensus_relays = cls.read_number_consensus_relays(
router_statuses_d = cls.read_router_statuses(consensus_path)
# XXX: Use router_statuses_d to not parse again the file.
number_consensus_relays = \
state = State(state_fpath)
# Create a dictionary with the number of relays excluded by any of the
......@@ -999,7 +1038,8 @@ class V3BWFile(object):
for fp, values in results.items():
# log.debug("Relay fp %s", fp)
line, reason = V3BWLine.from_results(values, secs_recent,
secs_away, min_num)
secs_away, min_num,
# If there is no reason it means the line will not be excluded.
if not reason:
......@@ -1029,8 +1069,10 @@ class V3BWFile(object):
cls.warn_if_not_accurate_enough(bw_lines, scale_constant)
# log.debug(bw_lines[-1])
elif scaling_method == TORFLOW_SCALING:
bw_lines = cls.bw_torflow_scale(bw_lines_raw, torflow_obs,
torflow_cap, round_digs)
bw_lines = cls.bw_torflow_scale(
bw_lines_raw, torflow_obs, torflow_cap, round_digs,
# log.debug(bw_lines[-1])
# Update the header and log the progress.
min_perc = cls.update_progress(
......@@ -1044,7 +1086,9 @@ class V3BWFile(object):