GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Verified Commit d7395247 authored by Philipp Winter's avatar Philipp Winter
Browse files

Fix deadlock caused by accumulating events.

So far, bridgestrap would only consume Tor's controller events from
c.eventChan as long as the main execution flow was in TestBridgeLines.
Outside of that function, controller events would keep on coming but not
be consumed.  That worked fine for a while because c.eventChan has a
buffer of 100 events but occasionally it would result in the buffer
filling up, which would cause a deadlock because bulb, our controller
library, would block on sending bridgestrap its events.

This patch fixes the issue by also consuming events outside of
TestBridgeLines.  The patch further adds a new Prometheus time series
called pending_events, which captures the number of pending events.  The
time series will help us monitor if this patch got the job done.
parent d30ebe0f
...@@ -12,6 +12,7 @@ const ( ...@@ -12,6 +12,7 @@ const (
type Metrics struct { type Metrics struct {
CacheSize prometheus.Gauge CacheSize prometheus.Gauge
PendingReqs prometheus.Gauge PendingReqs prometheus.Gauge
PendingEvents prometheus.Gauge
FracFunctional prometheus.Gauge FracFunctional prometheus.Gauge
TorTestTime prometheus.Histogram TorTestTime prometheus.Histogram
Events *prometheus.CounterVec Events *prometheus.CounterVec
...@@ -33,6 +34,12 @@ func InitMetrics() { ...@@ -33,6 +34,12 @@ func InitMetrics() {
Help: "The number of pending requests", Help: "The number of pending requests",
}) })
metrics.PendingEvents = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: PrometheusNamespace,
Name: "pending_events",
Help: "The number of pending Tor controller events",
})
metrics.FracFunctional = promauto.NewGauge(prometheus.GaugeOpts{ metrics.FracFunctional = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: PrometheusNamespace, Namespace: PrometheusNamespace,
Name: "fraction_functional", Name: "fraction_functional",
......
...@@ -24,7 +24,9 @@ const ( ...@@ -24,7 +24,9 @@ const (
DefaultBridge2 = "obfs4 193.11.166.194:27015 2D82C2E354D531A68469ADF7F878FA6060C6BACA cert=4TLQPJrTSaDffMK7Nbao6LC7G9OW/NHkUwIdjLSS3KYf0Nv4/nQiiI8dY2TcsQx01NniOg iat-mode=0" DefaultBridge2 = "obfs4 193.11.166.194:27015 2D82C2E354D531A68469ADF7F878FA6060C6BACA cert=4TLQPJrTSaDffMK7Nbao6LC7G9OW/NHkUwIdjLSS3KYf0Nv4/nQiiI8dY2TcsQx01NniOg iat-mode=0"
DefaultBridge3 = "obfs4 37.218.245.14:38224 D9A82D2F9C2F65A18407B1D2B764F130847F8B5D cert=bjRaMrr1BRiAW8IE9U5z27fQaYgOhX1UCmOpg2pFpoMvo6ZgQMzLsaTzzQNTlm7hNcb+Sg iat-mode=0" DefaultBridge3 = "obfs4 37.218.245.14:38224 D9A82D2F9C2F65A18407B1D2B764F130847F8B5D cert=bjRaMrr1BRiAW8IE9U5z27fQaYgOhX1UCmOpg2pFpoMvo6ZgQMzLsaTzzQNTlm7hNcb+Sg iat-mode=0"
// The maximum amount of bridges per batch. // The maximum amount of bridges per batch.
MaxBridgesPerReq = 100 MaxBridgesPerReq = 100
MaxEventBacklog = 100
MaxRequestBacklog = 100
) )
// The amount of time we give Tor to test a batch of bridges. // The amount of time we give Tor to test a batch of bridges.
...@@ -141,8 +143,8 @@ func (c *TorContext) Start() error { ...@@ -141,8 +143,8 @@ func (c *TorContext) Start() error {
defer c.Unlock() defer c.Unlock()
log.Println("Starting Tor process.") log.Println("Starting Tor process.")
c.eventChan = make(chan *bulb.Response, 100) c.eventChan = make(chan *bulb.Response, MaxEventBacklog)
c.RequestQueue = make(chan *TestRequest, 100) c.RequestQueue = make(chan *TestRequest, MaxRequestBacklog)
c.shutdown = make(chan bool) c.shutdown = make(chan bool)
// Create Tor's data directory. // Create Tor's data directory.
...@@ -200,6 +202,17 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult { ...@@ -200,6 +202,17 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult {
result := NewTestResult() result := NewTestResult()
log.Printf("Testing %d bridge lines.", len(bridgeLines)) log.Printf("Testing %d bridge lines.", len(bridgeLines))
// We maintain per-bridge state machines that parse Tor's event output.
eventParsers := make(map[string]*TorEventState)
for _, bridgeLine := range bridgeLines {
identifier, err := getBridgeIdentifier(bridgeLine)
if err != nil {
log.Printf("Bug: Could not extract identifier from bridge line %q.", bridgeLine)
continue
}
eventParsers[bridgeLine] = NewTorEventState(identifier)
}
// By default, Tor enters dormant mode 24 hours after seeing no user // By default, Tor enters dormant mode 24 hours after seeing no user
// activity. Bridgestrap's control port interaction doesn't count as user // activity. Bridgestrap's control port interaction doesn't count as user
// activity, which is why we explicitly wake up Tor before issuing our // activity, which is why we explicitly wake up Tor before issuing our
...@@ -225,17 +238,6 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult { ...@@ -225,17 +238,6 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult {
return result return result
} }
// We maintain per-bridge state machines that parse Tor's event output.
eventParsers := make(map[string]*TorEventState)
for _, bridgeLine := range bridgeLines {
identifier, err := getBridgeIdentifier(bridgeLine)
if err != nil {
log.Printf("Bug: Could not extract identifier from bridge line %q.", bridgeLine)
continue
}
eventParsers[bridgeLine] = NewTorEventState(identifier)
}
log.Printf("Waiting for Tor to give us test results.") log.Printf("Waiting for Tor to give us test results.")
timeout := time.After(TorTestTimeout) timeout := time.After(TorTestTimeout)
for { for {
...@@ -311,6 +313,9 @@ func (c *TorContext) dispatcher() { ...@@ -311,6 +313,9 @@ func (c *TorContext) dispatcher() {
metrics.TorTestTime.Observe(elapsed.Seconds()) metrics.TorTestTime.Observe(elapsed.Seconds())
req.resultChan <- result req.resultChan <- result
case <-c.eventChan:
// Discard events that happen while we are not testing bridges.
log.Printf("Discarding event because we're not testing bridges.")
case <-c.shutdown: case <-c.shutdown:
return return
} }
...@@ -329,6 +334,7 @@ func (c *TorContext) eventReader() { ...@@ -329,6 +334,7 @@ func (c *TorContext) eventReader() {
close(c.eventChan) close(c.eventChan)
return return
} }
metrics.PendingEvents.Set(float64(len(c.eventChan)))
c.eventChan <- ev c.eventChan <- ev
} }
} }
...@@ -61,7 +61,16 @@ func TestBridgeTest(t *testing.T) { ...@@ -61,7 +61,16 @@ func TestBridgeTest(t *testing.T) {
t.Fatalf("Failed to start tor: %s", err) t.Fatalf("Failed to start tor: %s", err)
} }
result := torCtx.TestBridgeLines([]string{defaultBridge1, defaultBridge2, bogusBridge}) resultChan := make(chan *TestResult)
req := &TestRequest{
BridgeLines: []string{defaultBridge1, defaultBridge2, bogusBridge},
resultChan: resultChan,
}
// Submit the test request.
torCtx.RequestQueue <- req
// Now wait for the test result.
result := <-resultChan
r, _ := result.Bridges[defaultBridge1] r, _ := result.Bridges[defaultBridge1]
if !r.Functional { if !r.Functional {
t.Errorf("Default bridge #1 deemed non-functional.") t.Errorf("Default bridge #1 deemed non-functional.")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment