GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Use channel-based request dispatcher.

So far, bridgestrap would serialise requests by relying on a mutex's
locking mechanism.  That's dirty.  This patch implements a channel-based
dispatching mechanism that guarantees order (mutexes don't).  In
addition to that, we can now log the number of outstanding requests,
which is good to know.
parent 92752a12
...@@ -35,6 +35,7 @@ type TestResult struct { ...@@ -35,6 +35,7 @@ type TestResult struct {
// TestRequest represents a client's request to test a batch of bridges. // TestRequest represents a client's request to test a batch of bridges.
type TestRequest struct { type TestRequest struct {
BridgeLines []string `json:"bridge_lines"` BridgeLines []string `json:"bridge_lines"`
resultChan chan *TestResult
} }
// limiter implements a rate limiter. We allow 1 request per second on average // limiter implements a rate limiter. We allow 1 request per second on average
...@@ -90,13 +91,13 @@ func Index(w http.ResponseWriter, r *http.Request) { ...@@ -90,13 +91,13 @@ func Index(w http.ResponseWriter, r *http.Request) {
SendHtmlResponse(w, IndexPage) SendHtmlResponse(w, IndexPage)
} }
func testBridgeLines(bridgeLines []string) *TestResult { func testBridgeLines(req *TestRequest) *TestResult {
// Add cached bridge lines to the result. // Add cached bridge lines to the result.
result := NewTestResult() result := NewTestResult()
remainingBridgeLines := []string{} remainingBridgeLines := []string{}
numCached := 0 numCached := 0
for _, bridgeLine := range bridgeLines { for _, bridgeLine := range req.BridgeLines {
if entry := cache.IsCached(bridgeLine); entry != nil { if entry := cache.IsCached(bridgeLine); entry != nil {
numCached++ numCached++
result.Bridges[bridgeLine] = &BridgeTest{ result.Bridges[bridgeLine] = &BridgeTest{
...@@ -115,7 +116,9 @@ func testBridgeLines(bridgeLines []string) *TestResult { ...@@ -115,7 +116,9 @@ func testBridgeLines(bridgeLines []string) *TestResult {
numCached, len(remainingBridgeLines)) numCached, len(remainingBridgeLines))
start := time.Now() start := time.Now()
partialResult := torCtx.TestBridgeLines(remainingBridgeLines) req.resultChan = make(chan *TestResult)
torCtx.RequestQueue <- req
partialResult := <-req.resultChan
result.Time = float64(time.Now().Sub(start).Seconds()) result.Time = float64(time.Now().Sub(start).Seconds())
result.Error = partialResult.Error result.Error = partialResult.Error
...@@ -177,7 +180,7 @@ func BridgeState(w http.ResponseWriter, r *http.Request) { ...@@ -177,7 +180,7 @@ func BridgeState(w http.ResponseWriter, r *http.Request) {
} }
log.Printf("Got %d bridge lines from %s.", len(req.BridgeLines), r.RemoteAddr) log.Printf("Got %d bridge lines from %s.", len(req.BridgeLines), r.RemoteAddr)
result := testBridgeLines(req.BridgeLines) result := testBridgeLines(req)
jsonResult, err := json.Marshal(result) jsonResult, err := json.Marshal(result)
if err != nil { if err != nil {
...@@ -203,7 +206,7 @@ func BridgeStateWeb(w http.ResponseWriter, r *http.Request) { ...@@ -203,7 +206,7 @@ func BridgeStateWeb(w http.ResponseWriter, r *http.Request) {
return return
} }
result := testBridgeLines([]string{bridgeLine}) result := testBridgeLines(&TestRequest{BridgeLines: []string{bridgeLine}})
bridgeResult, exists := result.Bridges[bridgeLine] bridgeResult, exists := result.Bridges[bridgeLine]
if !exists { if !exists {
log.Printf("Bug: Test result not part of our result map.") log.Printf("Bug: Test result not part of our result map.")
......
...@@ -102,12 +102,14 @@ func makeControlConnection(domainSocket string) (*bulb.Conn, error) { ...@@ -102,12 +102,14 @@ func makeControlConnection(domainSocket string) (*bulb.Conn, error) {
// Tor process. // Tor process.
type TorContext struct { type TorContext struct {
sync.Mutex sync.Mutex
Ctrl *bulb.Conn Ctrl *bulb.Conn
DataDir string DataDir string
Cancel context.CancelFunc Cancel context.CancelFunc
Context context.Context Context context.Context
TorBinary string RequestQueue chan *TestRequest
eventChan chan *bulb.Response TorBinary string
eventChan chan *bulb.Response
shutdown chan bool
} }
// Stop stops the Tor process. Errors during cleanup are logged and the last // Stop stops the Tor process. Errors during cleanup are logged and the last
...@@ -117,6 +119,7 @@ func (c *TorContext) Stop() error { ...@@ -117,6 +119,7 @@ func (c *TorContext) Stop() error {
defer c.Unlock() defer c.Unlock()
var err error var err error
close(c.shutdown)
log.Println("Stopping Tor process.") log.Println("Stopping Tor process.")
c.Cancel() c.Cancel()
...@@ -139,6 +142,8 @@ func (c *TorContext) Start() error { ...@@ -139,6 +142,8 @@ func (c *TorContext) Start() error {
log.Println("Starting Tor process.") log.Println("Starting Tor process.")
c.eventChan = make(chan *bulb.Response, 100) c.eventChan = make(chan *bulb.Response, 100)
c.RequestQueue = make(chan *TestRequest, 100)
c.shutdown = make(chan bool)
// Create Tor's data directory. // Create Tor's data directory.
var err error var err error
...@@ -173,6 +178,7 @@ func (c *TorContext) Start() error { ...@@ -173,6 +178,7 @@ func (c *TorContext) Start() error {
} }
c.Ctrl.StartAsyncReader() c.Ctrl.StartAsyncReader()
go c.eventReader() go c.eventReader()
go c.dispatcher()
if _, err := c.Ctrl.Request("SETEVENTS ORCONN NEWDESC"); err != nil { if _, err := c.Ctrl.Request("SETEVENTS ORCONN NEWDESC"); err != nil {
return err return err
...@@ -277,6 +283,28 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult { ...@@ -277,6 +283,28 @@ func (c *TorContext) TestBridgeLines(bridgeLines []string) *TestResult {
return result return result
} }
// dispatcher reads new bridge test requests, triggers the test, and writes the
// result to the given channel.
func (c *TorContext) dispatcher() {
log.Printf("Starting request dispatcher.")
defer log.Printf("Stopping request dispatcher.")
for {
select {
case req := <-c.RequestQueue:
log.Printf("%d pending test requests.", len(c.RequestQueue))
start := time.Now()
result := c.TestBridgeLines(req.BridgeLines)
elapsed := time.Since(start)
metrics.TorTestTime.Observe(elapsed.Seconds())
req.resultChan <- result
case <-c.shutdown:
return
}
}
}
// eventReader reads events from Tor's control port and writes them to // eventReader reads events from Tor's control port and writes them to
// c.eventChan, allowing TestBridgeLines to read Tor's events in a select // c.eventChan, allowing TestBridgeLines to read Tor's events in a select
// statement. // statement.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment