GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Implement token bucket mechanism for API.

Unlike the Web API, we don't reject connections.  Instead, we try hard
to serve them all, even though some may be delayed.
parent 951ae39e
...@@ -8,12 +8,14 @@ import ( ...@@ -8,12 +8,14 @@ import (
"log" "log"
"net/http" "net/http"
"path" "path"
"sync"
"time" "time"
) )
var IndexPage string var IndexPage string
var SuccessPage string var SuccessPage string
var FailurePage string var FailurePage string
var reqChan = make(chan *TestRequest, 1000)
// TestResult represents the result of a test, sent back to the client as JSON // TestResult represents the result of a test, sent back to the client as JSON
// object. // object.
...@@ -25,6 +27,7 @@ type TestResult struct { ...@@ -25,6 +27,7 @@ type TestResult struct {
type TestRequest struct { type TestRequest struct {
BridgeLine string `json:"bridge_line"` BridgeLine string `json:"bridge_line"`
respChan chan *TestResult
} }
// limiter implements a rate limiter. We allow 1 request per second on average // limiter implements a rate limiter. We allow 1 request per second on average
...@@ -73,8 +76,37 @@ func Index(w http.ResponseWriter, r *http.Request) { ...@@ -73,8 +76,37 @@ func Index(w http.ResponseWriter, r *http.Request) {
SendHtmlResponse(w, IndexPage) SendHtmlResponse(w, IndexPage)
} }
func createJsonResult(err error, start time.Time) string { func dispatchRequests(shutdown chan bool, wg *sync.WaitGroup, numSeconds int) {
log.Printf("Starting request dispatcher.")
delay := time.Tick(time.Second * time.Duration(numSeconds))
wg.Add(1)
defer wg.Done()
defer log.Printf("Stopping request dispatcher.")
for {
select {
case <-shutdown:
return
case req := <-reqChan:
log.Printf("Fetching next request; %d requests remain buffered.", len(reqChan))
go processRequest(req)
// Either wait for the delay to expire or the service to shut down;
// whatever comes first.
select {
case <-shutdown:
return
case <-delay:
}
}
}
}
func processRequest(req *TestRequest) {
start := time.Now()
err := bootstrapTorOverBridge(req.BridgeLine)
end := time.Now() end := time.Now()
result := &TestResult{ result := &TestResult{
Functional: err == nil, Functional: err == nil,
...@@ -83,19 +115,11 @@ func createJsonResult(err error, start time.Time) string { ...@@ -83,19 +115,11 @@ func createJsonResult(err error, start time.Time) string {
if err != nil { if err != nil {
result.Error = err.Error() result.Error = err.Error()
} }
req.respChan <- result
jsonResult, err := json.Marshal(result)
if err != nil {
log.Printf("Bug: %s", err)
}
return string(jsonResult)
} }
func BridgeState(w http.ResponseWriter, r *http.Request) { func BridgeState(w http.ResponseWriter, r *http.Request) {
start := time.Now()
b, err := ioutil.ReadAll(r.Body) b, err := ioutil.ReadAll(r.Body)
defer r.Body.Close() defer r.Body.Close()
if err != nil { if err != nil {
...@@ -116,8 +140,28 @@ func BridgeState(w http.ResponseWriter, r *http.Request) { ...@@ -116,8 +140,28 @@ func BridgeState(w http.ResponseWriter, r *http.Request) {
http.Error(w, "no bridge line given", http.StatusBadRequest) http.Error(w, "no bridge line given", http.StatusBadRequest)
return return
} }
err = bootstrapTorOverBridge(req.BridgeLine)
SendJSONResponse(w, createJsonResult(err, start)) var res *TestResult
// Do we have the given bridge line cached? If so, we can respond right
// away.
if entry := cache.IsCached(req.BridgeLine); entry != nil {
res = &TestResult{
Functional: entry.Error == "",
Error: entry.Error,
Time: 0.0}
} else {
respChan := make(chan *TestResult)
req.respChan = respChan
reqChan <- req
res = <-respChan
}
jsonResult, err := json.Marshal(res)
if err != nil {
log.Printf("Bug: %s", err)
http.Error(w, "failed to marshal test tesult", http.StatusInternalServerError)
}
SendJSONResponse(w, string(jsonResult))
} }
func BridgeStateWeb(w http.ResponseWriter, r *http.Request) { func BridgeStateWeb(w http.ResponseWriter, r *http.Request) {
......
package main
import (
"fmt"
"testing"
"time"
)
func TestCreateJsonResult(t *testing.T) {
expected := `{"functional":false,"error":"test","time":1}`
now := time.Now()
then := now.Add(time.Duration(-1) * time.Second)
json := createJsonResult(fmt.Errorf("test"), then)
if json != expected {
t.Errorf("Got unexpected JSON: %s", json)
}
expected = `{"functional":true,"time":1}`
now = time.Now()
then = now.Add(time.Duration(-1) * time.Second)
json = createJsonResult(nil, then)
if json != expected {
t.Errorf("Got unexpected JSON: %s", json)
}
}
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time" "time"
...@@ -85,6 +86,7 @@ func main() { ...@@ -85,6 +86,7 @@ func main() {
var certFilename, keyFilename string var certFilename, keyFilename string
var cacheFile string var cacheFile string
var templatesDir string var templatesDir string
var numSecs int
flag.StringVar(&addr, "addr", ":5000", "Address to listen on.") flag.StringVar(&addr, "addr", ":5000", "Address to listen on.")
flag.BoolVar(&web, "web", false, "Enable the web interface (in addition to the JSON API).") flag.BoolVar(&web, "web", false, "Enable the web interface (in addition to the JSON API).")
...@@ -92,6 +94,7 @@ func main() { ...@@ -92,6 +94,7 @@ func main() {
flag.StringVar(&keyFilename, "key", "", "TLS private key file.") flag.StringVar(&keyFilename, "key", "", "TLS private key file.")
flag.StringVar(&cacheFile, "cache", "bridgestrap-cache.bin", "Cache file that contains test results.") flag.StringVar(&cacheFile, "cache", "bridgestrap-cache.bin", "Cache file that contains test results.")
flag.StringVar(&templatesDir, "templates", "templates", "Path to directory that contains our web templates.") flag.StringVar(&templatesDir, "templates", "templates", "Path to directory that contains our web templates.")
flag.IntVar(&numSecs, "seconds", 2, "Number of seconds after two subsequent requests are handled.")
flag.Parse() flag.Parse()
var logOutput io.Writer = os.Stderr var logOutput io.Writer = os.Stderr
...@@ -131,12 +134,17 @@ func main() { ...@@ -131,12 +134,17 @@ func main() {
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT) signal.Notify(signalChan, syscall.SIGINT)
signal.Notify(signalChan, syscall.SIGTERM) signal.Notify(signalChan, syscall.SIGTERM)
var wg sync.WaitGroup
shutdown := make(chan bool)
go dispatchRequests(shutdown, &wg, numSecs)
log.Printf("Waiting for signal to shut down.") log.Printf("Waiting for signal to shut down.")
<-signalChan <-signalChan
shutdown <- true
log.Printf("Received signal to shut down.") log.Printf("Received signal to shut down.")
// Give our Web server a maximum of a minute to shut down, and finish // Give our Web server a maximum of a minute to finish handling open
// handling open connections. // connections and shut down gracefully.
t := time.Now().Add(time.Minute) t := time.Now().Add(time.Minute)
ctx, cancel := context.WithDeadline(context.Background(), t) ctx, cancel := context.WithDeadline(context.Background(), t)
defer cancel() defer cancel()
...@@ -147,4 +155,5 @@ func main() { ...@@ -147,4 +155,5 @@ func main() {
if err := cache.WriteToDisk(cacheFile); err != nil { if err := cache.WriteToDisk(cacheFile); err != nil {
log.Printf("Failed to write cache to disk: %s", err) log.Printf("Failed to write cache to disk: %s", err)
} }
wg.Wait()
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment