Commit 046dab86 authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Have broker pass client NAT type to proxy

This will allow browser-based proxies that are unable to determine their
NAT type to conservatively label themselves as restricted NATs if they
fail to work with clients that have restricted NATs.
parent 0052c0e1
...@@ -111,17 +111,17 @@ type ProxyPoll struct { ...@@ -111,17 +111,17 @@ type ProxyPoll struct {
id string id string
proxyType string proxyType string
natType string natType string
offerChannel chan []byte offerChannel chan *ClientOffer
} }
// Registers a Snowflake and waits for some Client to send an offer, // Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler. // as part of the polling logic of the proxy handler.
func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte { func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) *ClientOffer {
request := new(ProxyPoll) request := new(ProxyPoll)
request.id = id request.id = id
request.proxyType = proxyType request.proxyType = proxyType
request.natType = natType request.natType = natType
request.offerChannel = make(chan []byte) request.offerChannel = make(chan *ClientOffer)
ctx.proxyPolls <- request ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer. // Block until an offer is available, or timeout which sends a nil offer.
offer := <-request.offerChannel offer := <-request.offerChannel
...@@ -165,7 +165,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri ...@@ -165,7 +165,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
snowflake.id = id snowflake.id = id
snowflake.clients = 0 snowflake.clients = 0
snowflake.proxyType = proxyType snowflake.proxyType = proxyType
snowflake.offerChannel = make(chan []byte) snowflake.offerChannel = make(chan *ClientOffer)
snowflake.answerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte)
ctx.snowflakeLock.Lock() ctx.snowflakeLock.Lock()
if natType == NATRestricted { if natType == NATRestricted {
...@@ -213,7 +213,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { ...@@ -213,7 +213,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
ctx.metrics.proxyIdleCount++ ctx.metrics.proxyIdleCount++
ctx.metrics.lock.Unlock() ctx.metrics.lock.Unlock()
b, err = messages.EncodePollResponse("", false) b, err = messages.EncodePollResponse("", false, "")
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
...@@ -222,7 +222,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { ...@@ -222,7 +222,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.Write(b) w.Write(b)
return return
} }
b, err = messages.EncodePollResponse(string(offer), true) b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
return return
...@@ -232,28 +232,37 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { ...@@ -232,28 +232,37 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
} }
} }
// Client offer contains an SDP and the NAT type of the client
type ClientOffer struct {
natType string
sdp []byte
}
/* /*
Expects a WebRTC SDP offer in the Request to give to an assigned Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client. the HTTP response back to the client.
*/ */
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
var err error
startTime := time.Now() startTime := time.Now()
offer, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) offer := &ClientOffer{}
offer.sdp, err = ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err { if nil != err {
log.Println("Invalid data.") log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
natType := r.Header.Get("Snowflake-NAT-Type") offer.natType = r.Header.Get("Snowflake-NAT-Type")
if natType == "" { if offer.natType == "" {
natType = NATUnknown offer.natType = NATUnknown
} }
// Only hand out known restricted snowflakes to unrestricted clients // Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap var snowflakeHeap *SnowflakeHeap
if natType == NATUnrestricted { if offer.natType == NATUnrestricted {
snowflakeHeap = ctx.restrictedSnowflakes snowflakeHeap = ctx.restrictedSnowflakes
} else { } else {
snowflakeHeap = ctx.snowflakes snowflakeHeap = ctx.snowflakes
......
...@@ -37,7 +37,7 @@ func TestBroker(t *testing.T) { ...@@ -37,7 +37,7 @@ func TestBroker(t *testing.T) {
Convey("Broker goroutine matches clients with proxies", func() { Convey("Broker goroutine matches clients with proxies", func() {
p := new(ProxyPoll) p := new(ProxyPoll)
p.id = "test" p.id = "test"
p.offerChannel = make(chan []byte) p.offerChannel = make(chan *ClientOffer)
go func(ctx *BrokerContext) { go func(ctx *BrokerContext) {
ctx.proxyPolls <- p ctx.proxyPolls <- p
close(ctx.proxyPolls) close(ctx.proxyPolls)
...@@ -45,23 +45,23 @@ func TestBroker(t *testing.T) { ...@@ -45,23 +45,23 @@ func TestBroker(t *testing.T) {
ctx.Broker() ctx.Broker()
So(ctx.snowflakes.Len(), ShouldEqual, 1) So(ctx.snowflakes.Len(), ShouldEqual, 1)
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
snowflake.offerChannel <- []byte("test offer") snowflake.offerChannel <- &ClientOffer{sdp: []byte("test offer")}
offer := <-p.offerChannel offer := <-p.offerChannel
So(ctx.idToSnowflake["test"], ShouldNotBeNil) So(ctx.idToSnowflake["test"], ShouldNotBeNil)
So(offer, ShouldResemble, []byte("test offer")) So(offer.sdp, ShouldResemble, []byte("test offer"))
So(ctx.snowflakes.Len(), ShouldEqual, 0) So(ctx.snowflakes.Len(), ShouldEqual, 0)
}) })
Convey("Request an offer from the Snowflake Heap", func() { Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte) done := make(chan *ClientOffer)
go func() { go func() {
offer := ctx.RequestOffer("test", "", NATUnknown) offer := ctx.RequestOffer("test", "", NATUnknown)
done <- offer done <- offer
}() }()
request := <-ctx.proxyPolls request := <-ctx.proxyPolls
request.offerChannel <- []byte("test offer") request.offerChannel <- &ClientOffer{sdp: []byte("test offer")}
offer := <-done offer := <-done
So(offer, ShouldResemble, []byte("test offer")) So(offer.sdp, ShouldResemble, []byte("test offer"))
}) })
Convey("Responds to client offers...", func() { Convey("Responds to client offers...", func() {
...@@ -85,7 +85,7 @@ func TestBroker(t *testing.T) { ...@@ -85,7 +85,7 @@ func TestBroker(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer") snowflake.answerChannel <- []byte("fake answer")
<-done <-done
So(w.Body.String(), ShouldEqual, "fake answer") So(w.Body.String(), ShouldEqual, "fake answer")
...@@ -104,7 +104,7 @@ func TestBroker(t *testing.T) { ...@@ -104,7 +104,7 @@ func TestBroker(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("test"))
<-done <-done
So(w.Code, ShouldEqual, http.StatusGatewayTimeout) So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
}) })
...@@ -125,10 +125,10 @@ func TestBroker(t *testing.T) { ...@@ -125,10 +125,10 @@ func TestBroker(t *testing.T) {
// Pass a fake client offer to this proxy // Pass a fake client offer to this proxy
p := <-ctx.proxyPolls p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
p.offerChannel <- []byte("fake offer") p.offerChannel <- &ClientOffer{sdp: []byte("fake offer")}
<-done <-done
So(w.Code, ShouldEqual, http.StatusOK) So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer"}`) So(w.Body.String(), ShouldEqual, `{"Status":"client match","Offer":"fake offer","NAT":""}`)
}) })
Convey("return empty 200 OK when no client offer is available.", func() { Convey("return empty 200 OK when no client offer is available.", func() {
...@@ -141,7 +141,7 @@ func TestBroker(t *testing.T) { ...@@ -141,7 +141,7 @@ func TestBroker(t *testing.T) {
// nil means timeout // nil means timeout
p.offerChannel <- nil p.offerChannel <- nil
<-done <-done
So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":""}`) So(w.Body.String(), ShouldEqual, `{"Status":"no match","Offer":"","NAT":""}`)
So(w.Code, ShouldEqual, http.StatusOK) So(w.Code, ShouldEqual, http.StatusOK)
}) })
}) })
...@@ -279,7 +279,7 @@ func TestBroker(t *testing.T) { ...@@ -279,7 +279,7 @@ func TestBroker(t *testing.T) {
<-polled <-polled
So(wP.Code, ShouldEqual, http.StatusOK) So(wP.Code, ShouldEqual, http.StatusOK)
So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer"}`) So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer","NAT":"unknown"}`)
So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Follow up with the answer request afterwards // Follow up with the answer request afterwards
wA := httptest.NewRecorder() wA := httptest.NewRecorder()
...@@ -543,7 +543,7 @@ func TestMetrics(t *testing.T) { ...@@ -543,7 +543,7 @@ func TestMetrics(t *testing.T) {
done <- true done <- true
}() }()
offer := <-snowflake.offerChannel offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test")) So(offer.sdp, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer") snowflake.answerChannel <- []byte("fake answer")
<-done <-done
......
...@@ -11,7 +11,7 @@ over the offer and answer channels. ...@@ -11,7 +11,7 @@ over the offer and answer channels.
type Snowflake struct { type Snowflake struct {
id string id string
proxyType string proxyType string
offerChannel chan []byte offerChannel chan *ClientOffer
answerChannel chan []byte answerChannel chan []byte
clients int clients int
index int index int
......
...@@ -29,7 +29,8 @@ HTTP 200 OK ...@@ -29,7 +29,8 @@ HTTP 200 OK
{ {
type: offer, type: offer,
sdp: [WebRTC SDP] sdp: [WebRTC SDP]
} },
NAT: ["unknown"|"restricted"|"unrestricted"]
} }
2) If a client is not matched: 2) If a client is not matched:
...@@ -120,13 +121,15 @@ func DecodePollRequest(data []byte) (string, string, string, error) { ...@@ -120,13 +121,15 @@ func DecodePollRequest(data []byte) (string, string, string, error) {
type ProxyPollResponse struct { type ProxyPollResponse struct {
Status string Status string
Offer string Offer string
NAT string
} }
func EncodePollResponse(offer string, success bool) ([]byte, error) { func EncodePollResponse(offer string, success bool, natType string) ([]byte, error) {
if success { if success {
return json.Marshal(ProxyPollResponse{ return json.Marshal(ProxyPollResponse{
Status: "client match", Status: "client match",
Offer: offer, Offer: offer,
NAT: natType,
}) })
} }
...@@ -135,28 +138,33 @@ func EncodePollResponse(offer string, success bool) ([]byte, error) { ...@@ -135,28 +138,33 @@ func EncodePollResponse(offer string, success bool) ([]byte, error) {
}) })
} }
// Decodes a poll response from the broker and returns an offer // Decodes a poll response from the broker and returns an offer and the client's NAT type
// If there is a client match, the returned offer string will be non-empty // If there is a client match, the returned offer string will be non-empty
func DecodePollResponse(data []byte) (string, error) { func DecodePollResponse(data []byte) (string, string, error) {
var message ProxyPollResponse var message ProxyPollResponse
err := json.Unmarshal(data, &message) err := json.Unmarshal(data, &message)
if err != nil { if err != nil {
return "", err return "", "", err
} }
if message.Status == "" { if message.Status == "" {
return "", fmt.Errorf("received invalid data") return "", "", fmt.Errorf("received invalid data")
} }
if message.Status == "client match" { if message.Status == "client match" {
if message.Offer == "" { if message.Offer == "" {
return "", fmt.Errorf("no supplied offer") return "", "", fmt.Errorf("no supplied offer")
} }
} else { } else {
message.Offer = "" message.Offer = ""
} }
return message.Offer, nil natType := message.NAT
if natType == "" {
natType = "unknown"
}
return message.Offer, natType, nil
} }
type ProxyAnswerRequest struct { type ProxyAnswerRequest struct {
......
...@@ -109,7 +109,7 @@ func TestDecodeProxyPollResponse(t *testing.T) { ...@@ -109,7 +109,7 @@ func TestDecodeProxyPollResponse(t *testing.T) {
}{ }{
{ {
"fake offer", "fake offer",
`{"Status":"client match","Offer":"fake offer"}`, `{"Status":"client match","Offer":"fake offer","NAT":"unknown"}`,
nil, nil,
}, },
{ {
...@@ -128,9 +128,9 @@ func TestDecodeProxyPollResponse(t *testing.T) { ...@@ -128,9 +128,9 @@ func TestDecodeProxyPollResponse(t *testing.T) {
fmt.Errorf(""), fmt.Errorf(""),
}, },
} { } {
offer, err := DecodePollResponse([]byte(test.data)) offer, _, err := DecodePollResponse([]byte(test.data))
So(offer, ShouldResemble, test.offer)
So(err, ShouldHaveSameTypeAs, test.err) So(err, ShouldHaveSameTypeAs, test.err)
So(offer, ShouldResemble, test.offer)
} }
}) })
...@@ -138,16 +138,18 @@ func TestDecodeProxyPollResponse(t *testing.T) { ...@@ -138,16 +138,18 @@ func TestDecodeProxyPollResponse(t *testing.T) {
func TestEncodeProxyPollResponse(t *testing.T) { func TestEncodeProxyPollResponse(t *testing.T) {
Convey("Context", t, func() { Convey("Context", t, func() {
b, err := EncodePollResponse("fake offer", true) b, err := EncodePollResponse("fake offer", true, "restricted")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
offer, err := DecodePollResponse(b) offer, natType, err := DecodePollResponse(b)
So(offer, ShouldEqual, "fake offer") So(offer, ShouldEqual, "fake offer")
So(natType, ShouldEqual, "restricted")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
b, err = EncodePollResponse("", false) b, err = EncodePollResponse("", false, "unknown")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
offer, err = DecodePollResponse(b) offer, natType, err = DecodePollResponse(b)
So(offer, ShouldEqual, "") So(offer, ShouldEqual, "")
So(natType, ShouldEqual, "unknown")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
}) })
} }
......
...@@ -249,7 +249,7 @@ func TestBrokerInteractions(t *testing.T) { ...@@ -249,7 +249,7 @@ func TestBrokerInteractions(t *testing.T) {
Convey("polls broker correctly", func() { Convey("polls broker correctly", func() {
var err error var err error
b, err := messages.EncodePollResponse(sampleOffer, true) b, err := messages.EncodePollResponse(sampleOffer, true, "unknown")
So(err, ShouldEqual, nil) So(err, ShouldEqual, nil)
broker.transport = &MockTransport{ broker.transport = &MockTransport{
http.StatusOK, http.StatusOK,
......
...@@ -201,7 +201,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { ...@@ -201,7 +201,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
log.Printf("error reading broker response: %s", err) log.Printf("error reading broker response: %s", err)
} else { } else {
offer, err := messages.DecodePollResponse(body) offer, _, err := messages.DecodePollResponse(body)
if err != nil { if err != nil {
log.Printf("error reading broker response: %s", err.Error()) log.Printf("error reading broker response: %s", err.Error())
log.Printf("body: %s", body) log.Printf("body: %s", body)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment