Commit 99b4fdc2 authored by Philipp Winter's avatar Philipp Winter
Browse files

Merge branch 'issue/33' into 'master'

Add dummy distributor for educational purposes.

Closes #33

See merge request tpo/anti-censorship/rdsys!9
parents 777ceec3 245f5bc4
......@@ -41,3 +41,4 @@ More documentation
* [Design and architecture](doc/architecture.md)
* [Resource testing](doc/resource-testing.md)
* [Implementing new distributors](doc/new-distributor.md)
......@@ -7,8 +7,10 @@ import (
"gitlab.torproject.org/tpo/anti-censorship/rdsys/internal"
httpsUI "gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/presentation/distributors/https"
salmonWeb "gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/presentation/distributors/salmon"
stubWeb "gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/presentation/distributors/stub"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/usecases/distributors/https"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/usecases/distributors/salmon"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/usecases/distributors/stub"
)
func main() {
......@@ -32,6 +34,7 @@ func main() {
var constructors = map[string]func(*internal.Config){
salmon.DistName: salmonWeb.InitFrontend,
https.DistName: httpsUI.InitFrontend,
stub.DistName: stubWeb.InitFrontend,
}
runFunc, exists := constructors[distName]
if !exists {
......
......@@ -10,7 +10,8 @@
"supported_resources": ["vanilla", "obfs2", "obfs3", "obfs4", "scramblesuit"],
"api_tokens": {
"https": "HttpsApiTokenPlaceholder",
"salmon": "SalmonApiTokenPlaceholder"
"salmon": "SalmonApiTokenPlaceholder",
"stub": "StubApiTokenPlaceholder"
},
"web_api": {
"api_address": "127.0.0.1:7100",
......@@ -19,7 +20,8 @@
},
"distribution_proportions": {
"https": 1,
"salmon": 5
"salmon": 5,
"stub": 3
}
},
"distributors": {
......@@ -39,6 +41,14 @@
"cert_file": "",
"key_file": ""
}
},
"stub": {
"resources": ["obfs4"],
"web_api": {
"api_address": "127.0.0.1:7400",
"cert_file": "",
"key_file": ""
}
}
}
}
Implementing new distributors
=============================
This document explains the process of building a new rdsys distributor. In a
nutshell, a new distributor requires the following code:
1. [Backend code](https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/master/pkg/usecases/distributors/dummy/dummy.go)
that handles the distribution logic and the interaction with the backend.
2. [Frontend code](https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/blob/master/pkg/presentation/distributors/dummy/web.go)
that hands out resources to users.
3. Simple configuration and command line code.
Note that it's convenient to implement new distributors in Go (because one can
re-use existing code like the IPC API) but it's not necessary. Distributors are
stand-alone processes that can be implemented in any language.
Conceptually, a distributor has backend and frontend code. Backend code (part
of rdsys's
[usecases](https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/tree/master/pkg/usecases)
layer) takes care of distribution logic and frontend code (part of rdsys's
[presentation](https://gitlab.torproject.org/tpo/anti-censorship/rdsys/-/tree/master/pkg/presentation)
layer) takes care of handing resources out to users. Some distributors, like
Salmon, have complex backend code but simple frontend code. Other distributors,
like HTTPS, have simple backend code but more complex frontend code.
The separation between backend and frontend code brings with it the flexibility
to build multiple ways to access a distributor. For example, one could write
different frontends for Salmon: In addition to the domain-fronted API, one could
build a command line interface or an SMTP-based interface. The backend code
remains the same but the means via which users access the backend code differs.
......@@ -26,14 +26,24 @@ type BackendConfig struct {
StatusEndpoint string `json:"web_endpoint_status"`
MetricsEndpoint string `json:"web_endpoint_metrics"`
BridgestrapEndpoint string `json:"bridgestrap_endpoint"`
DistProportions map[string]int `json:"distribution_proportions"`
SupportedResources []string `json:"supported_resources"`
WebApi WebApiConfig `json:"web_api"`
// DistProportions contains the proportion of resources that each
// distributor should get. E.g. if the HTTPS distributor is set to x and
// the Salmon distributor is set to y, then HTTPS gets x/(x+y) of all
// resources and Salmon gets y/(x+y).
DistProportions map[string]int `json:"distribution_proportions"`
SupportedResources []string `json:"supported_resources"`
WebApi WebApiConfig `json:"web_api"`
}
type Distributors struct {
Https HttpsDistConfig `json:"https"`
Salmon SalmonDistConfig `json:"salmon"`
Stub StubDistConfig `json:"stub"`
}
type StubDistConfig struct {
Resources []string `json:"resources"`
WebApi WebApiConfig `json:"web_api"`
}
type HttpsDistConfig struct {
......
package stub
import (
"fmt"
"net/http"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/internal"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/presentation/distributors/common"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/usecases/distributors/stub"
)
var dist *stub.StubDistributor
// RequestHandler handles requests for /.
func RequestHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// Call our distributor backend to get bridges.
resources, err := dist.RequestBridges(0)
if err != nil {
fmt.Fprintf(w, err.Error())
} else {
for _, res := range resources {
fmt.Fprintf(w, fmt.Sprintln(res.String()))
}
}
}
// InitFrontend is the entry point to stub's Web frontend. It spins up a Web
// server and then waits until it receives a SIGINT. Note that we can
// implement all sorts of user-facing frontends here. It doesn't have to be a
// Web server. It could be an SMTP server, BitTorrent tracker, message board,
// etc.
func InitFrontend(cfg *internal.Config) {
// Start our distributor backend, which takes care of the distribution
// logic. This file implements the user-facing distribution code.
dist = &stub.StubDistributor{}
handlers := map[string]http.HandlerFunc{
"/": http.HandlerFunc(RequestHandler),
}
common.StartWebServer(
&cfg.Distributors.Stub.WebApi,
cfg,
dist,
handlers,
)
}
package stub
import (
"errors"
"log"
"sync"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/internal"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/core"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/delivery"
"gitlab.torproject.org/tpo/anti-censorship/rdsys/pkg/delivery/mechanisms"
)
const (
DistName = "stub"
)
// StubDistributor contains the context that the distributor needs. This
// structure must implement the Distributor interface.
type StubDistributor struct {
// ring contains the resources that we are going to distribute.
ring *core.Hashring
// ipc represents the IPC mechanism that we use to talk to the backend.
ipc delivery.Mechanism
// cfg represents our configuration file.
cfg *internal.Config
// shutdown is used to let housekeeping know when it's time to finish.
shutdown chan bool
// wg is used to figure out when our housekeeping method is finished.
wg sync.WaitGroup
}
// housekeeping keeps track of periodic tasks.
func (d *StubDistributor) housekeeping(rStream chan *core.ResourceDiff) {
defer d.wg.Done()
defer close(rStream)
defer d.ipc.StopStream()
for {
select {
case diff := <-rStream:
// We got a resource update from the backend. Let's add it to our
// hashring.
d.ring.ApplyDiff(diff)
case <-d.shutdown:
// We are told to shut down.
log.Printf("Shutting down housekeeping.")
return
}
}
}
// RequestBridges takes as input a hashkey (it is the frontend's responsibility
// to derive the hashkey) and uses it to return a slice of resources.
func (d *StubDistributor) RequestBridges(key core.Hashkey) ([]core.Resource, error) {
if d.ring.Len() == 0 {
return nil, errors.New("no bridges available")
}
r, err := d.ring.Get(key)
return []core.Resource{r}, err
}
// Init initialises the distributor. Along with Shutdown, it's the only method
// that a distributor must implement to satisfy the Distributor interface.
func (d *StubDistributor) Init(cfg *internal.Config) {
log.Printf("Initialising %s distributor.", DistName)
d.cfg = cfg
d.shutdown = make(chan bool)
d.ring = core.NewHashring()
// Request resources from the backend. The backend will respond with an
// initial batch of resources and then follow up with incremental updates
// as resources change (e.g. some resources may disappear, others appear,
// and others may change their state). We will receive resources at the
// rStream channel.
log.Printf("Initialising resource stream.")
d.ipc = mechanisms.NewHttpsIpc("http://" + cfg.Backend.WebApi.ApiAddress + cfg.Backend.ResourceStreamEndpoint)
rStream := make(chan *core.ResourceDiff)
req := core.ResourceRequest{
RequestOrigin: DistName,
ResourceTypes: d.cfg.Distributors.Stub.Resources,
BearerToken: d.cfg.Backend.ApiTokens[DistName],
Receiver: rStream,
}
d.ipc.StartStream(&req)
d.wg.Add(1)
go d.housekeeping(rStream)
}
// Shutdown shuts down the distributor. This method is required to satisfy the
// Distributor interface.
func (d *StubDistributor) Shutdown() {
log.Printf("Shutting down %s distributor.", DistName)
// Signal to housekeeping that it's time to stop and wait until the
// goroutine is done.
close(d.shutdown)
d.wg.Wait()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment