Skip to content
Snippets Groups Projects
Commit c4ae6490 authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Redo protocol for proxy--broker messages

Switch to containing all communication between the proxy and the broker
in the HTTP response body. This will make things easier if we ever use
something other than HTTP communicate between different actors in the
snowflake system.

Other changes to the protocol are as follows:
- requests are accompanied by a version number so the broker can be
backwards compatable if desired in the future
- all responses are 200 OK unless the request was badly formatted
parent abefae15
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ import (
......@@ -151,15 +152,16 @@ func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
For snowflake proxies to request a client from the Broker.
func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("X-Session-ID")
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err {
if err != nil {
log.Println("Invalid data.")
if string(body) != id {
log.Println("Mismatched IDs!")
sid, err := messages.DecodePollRequest(body)
if err != nil {
log.Println("Invalid data.")
......@@ -173,14 +175,26 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(id)
offer := ctx.RequestOffer(sid)
var b []byte
if nil == offer {
b, err = messages.EncodePollResponse("", false)
if err != nil {
b, err = messages.EncodePollResponse(string(offer), true)
if err != nil {
log.Println("Passing client offer to snowflake.")
if _, err := w.Write(offer); err != nil {
if _, err := w.Write(b); err != nil {
log.Printf("proxyPolls unable to write offer with error: %v", err)
......@@ -235,22 +249,40 @@ an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client.
func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("X-Session-ID")
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err || nil == body || len(body) <= 0 {
log.Println("Invalid data.")
answer, id, err := messages.DecodeAnswerRequest(body)
if err != nil || answer == "" {
log.Println("Invalid data.")
var success = true
snowflake, ok := ctx.idToSnowflake[id]
if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker.
success = false
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err || nil == body || len(body) <= 0 {
log.Println("Invalid data.")
b, err := messages.EncodeAnswerResponse(success)
if err != nil {
log.Printf("Error encoding answer: %s", err.Error())
if success {
snowflake.answerChannel <- []byte(answer)
snowflake.answerChannel <- body
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
......@@ -113,9 +113,8 @@ func TestBroker(t *testing.T) {
Convey("Responds to proxy polls...", func() {
done := make(chan bool)
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`))
r, err := http.NewRequest("POST", "", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
Convey("with a client offer if available.", func() {
......@@ -125,57 +124,59 @@ func TestBroker(t *testing.T) {
// Pass a fake client offer to this proxy
p := <-ctx.proxyPolls
So(, ShouldEqual, "test")
So(, ShouldEqual, "ymbcCMto7KHNGYlp")
p.offerChannel <- []byte("fake offer")
So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, "fake offer")
So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer"}`)
Convey("times out when no client offer is available.", func() {
Convey("return empty 200 OK when no client offer is available.", func() {
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
p := <-ctx.proxyPolls
So(, ShouldEqual, "test")
So(, ShouldEqual, "ymbcCMto7KHNGYlp")
// nil means timeout
p.offerChannel <- nil
So(w.Body.String(), ShouldEqual, "")
So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":""}`)
So(w.Code, ShouldEqual, http.StatusOK)
Convey("Responds to proxy answers...", func() {
s := ctx.AddSnowflake("test")
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("fake answer"))
data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
Convey("by passing to the client if valid.", func() {
r, err := http.NewRequest("POST", "", data)
So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "test")
go func(ctx *BrokerContext) {
proxyAnswers(ctx, w, r)
answer := <-s.answerChannel
So(w.Code, ShouldEqual, http.StatusOK)
So(answer, ShouldResemble, []byte("fake answer"))
So(answer, ShouldResemble, []byte("test"))
Convey("with error if the proxy is not recognized", func() {
r, err := http.NewRequest("POST", "", nil)
Convey("with client gone status if the proxy is not recognized", func() {
data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`))
r, err := http.NewRequest("POST", "", data)
So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "invalid")
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusGone)
So(w.Code, ShouldEqual, http.StatusOK)
b, err := ioutil.ReadAll(w.Body)
So(err, ShouldBeNil)
So(b, ShouldResemble, []byte(`{"Status":"client gone"}`))
Convey("with error if the proxy gives invalid answer", func() {
data := bytes.NewReader(nil)
r, err := http.NewRequest("POST", "", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
......@@ -184,7 +185,6 @@ func TestBroker(t *testing.T) {
Convey("with error if the proxy writes too much data", func() {
data := bytes.NewReader(make([]byte, 100001))
r, err := http.NewRequest("POST", "", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
......@@ -199,11 +199,10 @@ func TestBroker(t *testing.T) {
ctx := NewBrokerContext(NullLogger())
// Proxy polls with its ID first...
dataP := bytes.NewReader([]byte("test"))
dataP := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`))
wP := httptest.NewRecorder()
rP, err := http.NewRequest("POST", "", dataP)
So(err, ShouldBeNil)
rP.Header.Set("X-Session-ID", "test")
go func() {
proxyPolls(ctx, wP, rP)
polled <- true
......@@ -211,13 +210,13 @@ func TestBroker(t *testing.T) {
// Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls
So(, ShouldEqual, "test")
So(, ShouldEqual, "ymbcCMto7KHNGYlp")
s := ctx.AddSnowflake(
go func() {
offer := <-s.offerChannel
p.offerChannel <- offer
So(ctx.idToSnowflake["test"], ShouldNotBeNil)
So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Client request blocks until proxy answer arrives.
dataC := bytes.NewReader([]byte("fake offer"))
......@@ -231,20 +230,19 @@ func TestBroker(t *testing.T) {
So(wP.Code, ShouldEqual, http.StatusOK)
So(wP.Body.String(), ShouldResemble, "fake offer")
So(ctx.idToSnowflake["test"], ShouldNotBeNil)
So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer"}`)
So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Follow up with the answer request afterwards
wA := httptest.NewRecorder()
dataA := bytes.NewReader([]byte("fake answer"))
rA, err := http.NewRequest("POST", "", dataA)
dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`))
rA, err := http.NewRequest("POST", "", dataA)
So(err, ShouldBeNil)
rA.Header.Set("X-Session-ID", "test")
proxyAnswers(ctx, wA, rA)
So(wA.Code, ShouldEqual, http.StatusOK)
So(wC.Code, ShouldEqual, http.StatusOK)
So(wC.Body.String(), ShouldEqual, "fake answer")
So(wC.Body.String(), ShouldEqual, "test")
......@@ -408,7 +406,7 @@ func TestMetrics(t *testing.T) {
//Test addition of proxy polls
Convey("for proxy polls", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
data := bytes.NewReader([]byte("{\"Sid\":\"ymbcCMto7KHNGYlp\",\"Version\":\"1.0\"}"))
r, err := http.NewRequest("POST", "", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "" //CA geoip
......@@ -492,7 +490,7 @@ func TestMetrics(t *testing.T) {
//Test unique ip
Convey("proxy counts by unique ip", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
data := bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`))
r, err := http.NewRequest("POST", "", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "" //CA geoip
......@@ -505,7 +503,7 @@ func TestMetrics(t *testing.T) {
p.offerChannel <- nil
data = bytes.NewReader([]byte("test"))
data = bytes.NewReader([]byte(`{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`))
r, err = http.NewRequest("POST", "", data)
if err != nil {
log.Printf("unable to get NewRequest with error: %v", err)
//Package for communication with the snowflake broker
//import ""
package messages
import (
const version = "1.0"
/* Version 1.0 specification:
== ProxyPollRequest ==
Sid: [generated session id of proxy]
Version: 1.0
== ProxyPollResponse ==
1) If a client is matched:
Status: "client match",
type: offer,
sdp: [WebRTC SDP]
2) If a client is not matched:
Status: "no proxies"
3) If the request is malformed:
HTTP 400 BadRequest
== ProxyAnswerRequest ==
Sid: [generated session id of proxy]
Version: 1.0
type: answer
sdp: [WebRTC SDP]
== ProxyAnswerResponse ==
1) If the client retrieved the answer:
Status: "success"
2) If the client left:
Status: "client gone"
3) If the request is malformed:
HTTP 400 BadRequest
type ProxyPollRequest struct {
Sid string
Version string
func EncodePollRequest(sid string) ([]byte, error) {
return json.Marshal(ProxyPollRequest{
Sid: sid,
Version: version,
// Decodes a poll message from a snowflake proxy and returns the
// sid of the proxy on success and an error if it failed
func DecodePollRequest(data []byte) (string, error) {
var message ProxyPollRequest
err := json.Unmarshal(data, &message)
if err != nil {
return "", err
if message.Version != "1.0" {
return "", fmt.Errorf("using unknown version")
// Version 1.0 requires an Sid
if message.Sid == "" {
return "", fmt.Errorf("no supplied session id")
return message.Sid, nil
type ProxyPollResponse struct {
Status string
Offer string
func EncodePollResponse(offer string, success bool) ([]byte, error) {
if success {
return json.Marshal(ProxyPollResponse{
Status: "client match",
Offer: offer,
return json.Marshal(ProxyPollResponse{
Status: "no match",
// Decodes a poll response from the broker and returns an offer
// If there is a client match, the returned offer string will be non-empty
func DecodePollResponse(data []byte) (string, error) {
var message ProxyPollResponse
err := json.Unmarshal(data, &message)
if err != nil {
return "", err
if message.Status == "" {
return "", fmt.Errorf("received invalid data")
if message.Status == "client match" {
if message.Offer == "" {
return "", fmt.Errorf("no supplied offer")
} else {
message.Offer = ""
return message.Offer, nil
type ProxyAnswerRequest struct {
Version string
Sid string
Answer string
func EncodeAnswerRequest(answer string, sid string) ([]byte, error) {
return json.Marshal(ProxyAnswerRequest{
Version: "1.0",
Sid: sid,
Answer: answer,
// Returns the sdp answer and proxy sid
func DecodeAnswerRequest(data []byte) (string, string, error) {
var message ProxyAnswerRequest
err := json.Unmarshal(data, &message)
if err != nil {
return "", "", err
if message.Version != "1.0" {
return "", "", fmt.Errorf("using unknown version")
if message.Sid == "" || message.Answer == "" {
return "", "", fmt.Errorf("no supplied sid or answer")
return message.Answer, message.Sid, nil
type ProxyAnswerResponse struct {
Status string
func EncodeAnswerResponse(success bool) ([]byte, error) {
if success {
return json.Marshal(ProxyAnswerResponse{
Status: "success",
return json.Marshal(ProxyAnswerResponse{
Status: "client gone",
func DecodeAnswerResponse(data []byte) (bool, error) {
var message ProxyAnswerResponse
var success bool
err := json.Unmarshal(data, &message)
if err != nil {
return success, err
if message.Status == "" {
return success, fmt.Errorf("received invalid data")
if message.Status == "success" {
success = true
return success, nil
package messages
import (
. ""
func TestDecodeProxyPollRequest(t *testing.T) {
Convey("Context", t, func() {
for _, test := range []struct {
sid string
data string
err error
//Version 1.0 proxy message
//Version 0.X proxy message:
} {
sid, err := DecodePollRequest([]byte(
So(sid, ShouldResemble, test.sid)
So(err, ShouldHaveSameTypeAs, test.err)
func TestEncodeProxyPollRequests(t *testing.T) {
Convey("Context", t, func() {
b, err := EncodePollRequest("ymbcCMto7KHNGYlp")
So(err, ShouldEqual, nil)
sid, err := DecodePollRequest(b)
So(sid, ShouldEqual, "ymbcCMto7KHNGYlp")
So(err, ShouldEqual, nil)
func TestDecodeProxyAnswerRequest(t *testing.T) {
Convey("Context", t, func() {
for _, test := range []struct {
answer string
sid string
data string
err error
`{"type":"offer","sdp":"v=0\r\no=- 4358805017720277108 2 IN IP4 [scrubbed]\r\ns=-\r\nt=0 0\r\na=group:BUNDLE data\r\na=msid-semantic: WMS\r\nm=application 56688 DTLS/SCTP 5000\r\nc=IN IP4 [scrubbed]\r\na=candidate:3769337065 1 udp 2122260223 [scrubbed] 56688 typ host generation 0 network-id 1 network-cost 50\r\na=candidate:2921887769 1 tcp 1518280447 [scrubbed] 35441 typ host tcptype passive generation 0 network-id 1 network-cost 50\r\na=ice-ufrag:aMAZ\r\na=ice-pwd:jcHb08Jjgrazp2dzjdrvPPvV\r\na=ice-options:trickle\r\na=fingerprint:sha-256 C8:88:EE:B9:E7:02:2E:21:37:ED:7A:D1:EB:2B:A3:15:A2:3B:5B:1C:3D:D4:D5:1F:06:CF:52:40:03:F8:DD:66\r\na=setup:actpass\r\na=mid:data\r\na=sctpmap:5000 webrtc-datachannel 1024\r\n"}`,
} {
answer, sid, err := DecodeAnswerRequest([]byte(
So(answer, ShouldResemble, test.answer)
So(sid, ShouldResemble, test.sid)
So(err, ShouldHaveSameTypeAs, test.err)
func TestEncodeProxyAnswerRequest(t *testing.T) {
Convey("Context", t, func() {
b, err := EncodeAnswerRequest("test answer", "test sid")
So(err, ShouldEqual, nil)
answer, sid, err := DecodeAnswerRequest(b)
So(answer, ShouldEqual, "test answer")
So(sid, ShouldEqual, "test sid")
So(err, ShouldEqual, nil)
func TestDecodeProxyAnswerResponse(t *testing.T) {
Convey("Context", t, func() {
for _, test := range []struct {
success bool
data string
err error
`{"Status":"client gone"}`,
} {
success, err := DecodeAnswerResponse([]byte(
So(success, ShouldResemble, test.success)
So(err, ShouldHaveSameTypeAs, test.err)
func TestEncodeProxyAnswerResponse(t *testing.T) {
Convey("Context", t, func() {
b, err := EncodeAnswerResponse(true)
So(err, ShouldEqual, nil)
success, err := DecodeAnswerResponse(b)
So(success, ShouldEqual, true)
So(err, ShouldEqual, nil)
......@@ -19,6 +19,7 @@ import (
......@@ -168,7 +169,12 @@ func pollOffer(sid string) *webrtc.SessionDescription {
timeOfNextPoll = now
req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer([]byte(sid)))
b, err := messages.EncodePollRequest(sid)
if err != nil {
log.Printf("Error encoding poll message: %s", err.Error())
return nil
req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer(b))
req.Header.Set("X-Session-ID", sid)
resp, err := client.Do(req)
if err != nil {
......@@ -182,7 +188,16 @@ func pollOffer(sid string) *webrtc.SessionDescription {
if err != nil {
log.Printf("error reading broker response: %s", err)
} else {
return deserializeSessionDescription(string(body))
offer, err := messages.DecodePollResponse(body)
if err != nil {
log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body)
return nil
if offer != "" {
return deserializeSessionDescription(offer)
......@@ -191,9 +206,12 @@ func pollOffer(sid string) *webrtc.SessionDescription {
func sendAnswer(sid string, pc *webrtc.PeerConnection) error {
broker := brokerURL.ResolveReference(&url.URL{Path: "answer"})
body := bytes.NewBuffer([]byte(serializeSessionDescription(pc.LocalDescription())))
req, _ := http.NewRequest("POST", broker.String(), body)
req.Header.Set("X-Session-ID", sid)
answer := string([]byte(serializeSessionDescription(pc.LocalDescription())))
b, err := messages.EncodeAnswerRequest(answer, sid)
if err != nil {
return err
req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer(b))
resp, err := client.Do(req)
if err != nil {
return err
......@@ -201,6 +219,19 @@ func sendAnswer(sid string, pc *webrtc.PeerConnection) error {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("broker returned %d", resp.StatusCode)
body, err := limitedRead(resp.Body, readLimit)
if err != nil {
return fmt.Errorf("error reading broker response: %s", err)
success, err := messages.DecodeAnswerResponse(body)
if err != nil {
return err
if !success {
return fmt.Errorf("broker returned client timeout")
return nil
translation @ bbf11bb0
Subproject commit 120578ec9dbf0975fc9ac573130282f628b9747a
Subproject commit bbf11bb0c9f1aca4f6b18c6505645f85e2fa1986
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment