From cf1bce743a83cc349e85675d2e193879871c0e06 Mon Sep 17 00:00:00 2001 From: Philipp Winter Date: Fri, 20 Nov 2020 13:46:17 -0800 Subject: [PATCH] Use channel-based request dispatcher. So far, bridgestrap would serialise requests by relying on a mutex's locking mechanism. That's dirty. This patch implements a channel-based dispatching mechanism that guarantees order (mutexes don't). In addition to that, we can now log the number of outstanding requests, which is good to know. --- handlers.go | 13 ++++++++----- tor.go | 40 ++++++++++++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/handlers.go b/handlers.go index 5f126d0..b26681a 100644 --- a/handlers.go +++ b/handlers.go @@ -35,6 +35,7 @@ type TestResult struct { // TestRequest represents a client's request to test a batch of bridges. type TestRequest struct { BridgeLines []string `json:"bridge_lines"` + resultChan chan *TestResult } // limiter implements a rate limiter. We allow 1 request per second on average @@ -90,13 +91,13 @@ func Index(w http.ResponseWriter, r *http.Request) { SendHtmlResponse(w, IndexPage) } -func testBridgeLines(bridgeLines []string) *TestResult { +func testBridgeLines(req *TestRequest) *TestResult { // Add cached bridge lines to the result. result := NewTestResult() remainingBridgeLines := []string{} numCached := 0 - for _, bridgeLine := range bridgeLines { + for _, bridgeLine := range req.BridgeLines { if entry := cache.IsCached(bridgeLine); entry != nil { numCached++ result.Bridges[bridgeLine] = &BridgeTest{ @@ -115,7 +116,9 @@ func testBridgeLines(bridgeLines []string) *TestResult { numCached, len(remainingBridgeLines)) start := time.Now() - partialResult := torCtx.TestBridgeLines(remainingBridgeLines) + req.resultChan = make(chan *TestResult) + torCtx.RequestQueue <- req + partialResult := <-req.resultChan result.Time = float64(time.Now().Sub(start).Seconds()) result.Error = partialResult.Error @@ -177,7 +180,7 @@ func BridgeState(w http.ResponseWriter, r *http.Request) { } log.Printf("Got %d bridge lines from %s.", len(req.BridgeLines), r.RemoteAddr) - result := testBridgeLines(req.BridgeLines) + result := testBridgeLines(req) jsonResult, err := json.Marshal(result) if err != nil { @@ -203,7 +206,7 @@ func BridgeStateWeb(w http.ResponseWriter, r *http.Request) { return } - result := testBridgeLines([]string{bridgeLine}) + result := testBridgeLines(&TestRequest{BridgeLines: []string{bridgeLine}}) bridgeResult, exists := result.Bridges[bridgeLine] if !exists { log.Printf("Bug: Test result not part of our result map.") diff --git a/tor.go b/tor.go index a907065..0686764 100644 --- a/tor.go +++ b/tor.go @@ -102,12 +102,14 @@ func makeControlConnection(domainSocket string) (*bulb.Conn, error) { // Tor process. type TorContext struct { sync.Mutex - Ctrl *bulb.Conn - DataDir string - Cancel context.CancelFunc - Context context.Context - TorBinary string - eventChan chan *bulb.Response + Ctrl *bulb.Conn + DataDir string + Cancel context.CancelFunc + Context context.Context + RequestQueue chan *TestRequest + TorBinary string + eventChan chan *bulb.Response + shutdown chan bool } // Stop stops the Tor process. Errors during cleanup are logged and the last @@ -117,6 +119,7 @@ func (c *TorContext) Stop() error { defer c.Unlock() var err error + close(c.shutdown) log.Println("Stopping Tor process.") c.Cancel() @@ -139,6 +142,8 @@ func (c *TorContext) Start() error { log.Println("Starting Tor process.") c.eventChan = make(chan *bulb.Response, 100) + c.RequestQueue = make(chan *TestRequest, 100) + c.shutdown = make(chan bool) // Create Tor's data directory. var err error @@ -173,6 +178,7 @@ func (c *TorContext) Start() error { } c.Ctrl.StartAsyncReader() go c.eventReader() + go c.dispatcher() if _, err := c.Ctrl.Request("SETEVENTS ORCONN NEWDESC"); err != nil { return err @@ -277,6 +283,28 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult { return result } +// dispatcher reads new bridge test requests, triggers the test, and writes the +// result to the given channel. +func (c *TorContext) dispatcher() { + log.Printf("Starting request dispatcher.") + defer log.Printf("Stopping request dispatcher.") + for { + select { + case req := <-c.RequestQueue: + log.Printf("%d pending test requests.", len(c.RequestQueue)) + + start := time.Now() + result := c.TestBridgeLines(req.BridgeLines) + elapsed := time.Since(start) + metrics.TorTestTime.Observe(elapsed.Seconds()) + + req.resultChan <- result + case <-c.shutdown: + return + } + } +} + // eventReader reads events from Tor's control port and writes them to // c.eventChan, allowing TestBridgeLines to read Tor's events in a select // statement. -- GitLab