Commit 0428797e authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Modified proxy-go to use pion/webrtc

The API is very similar, differences were mostly due to:
- closing peer connections and datachannels (no destroy/delete methods)
- different way to set datachannel/peer connection callbacks
- differences in whether functions take pointers or values
- no serialize/deserialize functions in the API
parent 9e22af90
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
...@@ -19,7 +20,7 @@ import ( ...@@ -19,7 +20,7 @@ import (
"time" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
) )
...@@ -43,7 +44,7 @@ const ( ...@@ -43,7 +44,7 @@ const (
var ( var (
tokens chan bool tokens chan bool
config *webrtc.Configuration config webrtc.Configuration
client http.Client client http.Client
) )
...@@ -92,7 +93,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) { ...@@ -92,7 +93,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
func (c *webRTCConn) Close() (err error) { func (c *webRTCConn) Close() (err error) {
c.once.Do(func() { c.once.Do(func() {
err = c.pc.Destroy() err = c.pc.Close()
}) })
return return
} }
...@@ -103,7 +104,7 @@ func (c *webRTCConn) LocalAddr() net.Addr { ...@@ -103,7 +104,7 @@ func (c *webRTCConn) LocalAddr() net.Addr {
func (c *webRTCConn) RemoteAddr() net.Addr { func (c *webRTCConn) RemoteAddr() net.Addr {
//Parse Remote SDP offer and extract client IP //Parse Remote SDP offer and extract client IP
clientIP := remoteIPFromSDP(c.pc.RemoteDescription().Sdp) clientIP := remoteIPFromSDP(c.pc.RemoteDescription().SDP)
if clientIP == nil { if clientIP == nil {
return nil return nil
} }
...@@ -178,7 +179,7 @@ func pollOffer(sid string) *webrtc.SessionDescription { ...@@ -178,7 +179,7 @@ func pollOffer(sid string) *webrtc.SessionDescription {
if err != nil { if err != nil {
log.Printf("error reading broker response: %s", err) log.Printf("error reading broker response: %s", err)
} else { } else {
return webrtc.DeserializeSessionDescription(string(body)) return deserializeSessionDescription(string(body))
} }
} }
} }
...@@ -187,7 +188,7 @@ func pollOffer(sid string) *webrtc.SessionDescription { ...@@ -187,7 +188,7 @@ func pollOffer(sid string) *webrtc.SessionDescription {
func sendAnswer(sid string, pc *webrtc.PeerConnection) error { func sendAnswer(sid string, pc *webrtc.PeerConnection) error {
broker := brokerURL.ResolveReference(&url.URL{Path: "answer"}) broker := brokerURL.ResolveReference(&url.URL{Path: "answer"})
body := bytes.NewBuffer([]byte(pc.LocalDescription().Serialize())) body := bytes.NewBuffer([]byte(serializeSessionDescription(pc.LocalDescription())))
req, _ := http.NewRequest("POST", broker.String(), body) req, _ := http.NewRequest("POST", broker.String(), body)
req.Header.Set("X-Session-ID", sid) req.Header.Set("X-Session-ID", sid)
resp, err := client.Do(req) resp, err := client.Do(req)
...@@ -275,71 +276,63 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { ...@@ -275,71 +276,63 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
// candidates is complete and the answer is available in LocalDescription. // candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to // Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler. // datachannelHandler.
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) { func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
pc, err := webrtc.NewPeerConnection(config) pc, err := webrtc.NewPeerConnection(config)
if err != nil { if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
} }
pc.OnNegotiationNeeded = func() { pc.OnDataChannel(func(dc *webrtc.DataChannel) {
panic("OnNegotiationNeeded")
}
pc.OnDataChannel = func(dc *webrtc.DataChannel) {
log.Println("OnDataChannel") log.Println("OnDataChannel")
close(dataChan) close(dataChan)
pr, pw := io.Pipe() pr, pw := io.Pipe()
conn := &webRTCConn{pc: pc, dc: dc, pr: pr} conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
dc.OnOpen = func() { dc.OnOpen(func() {
log.Println("OnOpen channel") log.Println("OnOpen channel")
} })
dc.OnClose = func() { dc.OnClose(func() {
conn.lock.Lock() conn.lock.Lock()
defer conn.lock.Unlock() defer conn.lock.Unlock()
log.Println("OnClose channel") log.Println("OnClose channel")
conn.dc = nil conn.dc = nil
pc.DeleteDataChannel(dc) dc.Close()
pw.Close() pw.Close()
} })
dc.OnMessage = func(msg []byte) { dc.OnMessage(func(msg webrtc.DataChannelMessage) {
log.Printf("OnMessage <--- %d bytes", len(msg)) log.Printf("OnMessage <--- %d bytes", len(msg.Data))
n, err := pw.Write(msg) n, err := pw.Write(msg.Data)
if err != nil { if err != nil {
pw.CloseWithError(err) pw.CloseWithError(err)
} }
if n != len(msg) { if n != len(msg.Data) {
panic("short write") panic("short write")
} }
} })
go datachannelHandler(conn, conn.RemoteAddr()) go datachannelHandler(conn, conn.RemoteAddr())
} })
err = pc.SetRemoteDescription(sdp) err = pc.SetRemoteDescription(*sdp)
if err != nil { if err != nil {
pc.Destroy() pc.Close()
return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
} }
log.Println("sdp offer successfully received.") log.Println("sdp offer successfully received.")
log.Println("Generating answer...") log.Println("Generating answer...")
answer, err := pc.CreateAnswer() answer, err := pc.CreateAnswer(nil)
// blocks on ICE gathering. we need to add a timeout if needed // blocks on ICE gathering. we need to add a timeout if needed
// not putting this in a separate go routine, because we need // not putting this in a separate go routine, because we need
// SetLocalDescription(answer) to be called before sendAnswer // SetLocalDescription(answer) to be called before sendAnswer
if err != nil { if err != nil {
pc.Destroy() pc.Close()
return nil, err return nil, err
} }
if answer == nil {
pc.Destroy()
return nil, fmt.Errorf("Failed gathering ICE candidates.")
}
err = pc.SetLocalDescription(answer) err = pc.SetLocalDescription(answer)
if err != nil { if err != nil {
pc.Destroy() pc.Close()
return nil, err return nil, err
} }
...@@ -363,7 +356,7 @@ func runSession(sid string) { ...@@ -363,7 +356,7 @@ func runSession(sid string) {
err = sendAnswer(sid, pc) err = sendAnswer(sid, pc)
if err != nil { if err != nil {
log.Printf("error sending answer to client through broker: %s", err) log.Printf("error sending answer to client through broker: %s", err)
pc.Destroy() pc.Close()
retToken() retToken()
return return
} }
...@@ -375,7 +368,7 @@ func runSession(sid string) { ...@@ -375,7 +368,7 @@ func runSession(sid string) {
log.Println("Connection successful.") log.Println("Connection successful.")
case <-time.After(dataChannelTimeout): case <-time.After(dataChannelTimeout):
log.Println("Timed out waiting for client to open data channel.") log.Println("Timed out waiting for client to open data channel.")
pc.Destroy() pc.Close()
retToken() retToken()
} }
} }
...@@ -422,7 +415,13 @@ func main() { ...@@ -422,7 +415,13 @@ func main() {
log.Fatalf("invalid relay url: %s", err) log.Fatalf("invalid relay url: %s", err)
} }
config = webrtc.NewConfiguration(webrtc.OptionIceServer(stunURL)) config = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{stunURL},
},
},
}
tokens = make(chan bool, capacity) tokens = make(chan bool, capacity)
for i := uint(0); i < capacity; i++ { for i := uint(0); i < capacity; i++ {
tokens <- true tokens <- true
...@@ -434,3 +433,53 @@ func main() { ...@@ -434,3 +433,53 @@ func main() {
runSession(sessionID) runSession(sessionID)
} }
} }
func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
var parsed map[string]interface{}
err := json.Unmarshal([]byte(msg), &parsed)
if nil != err {
log.Println(err)
return nil
}
if _, ok := parsed["type"]; !ok {
log.Println("Cannot deserialize SessionDescription without type field.")
return nil
}
if _, ok := parsed["sdp"]; !ok {
log.Println("Cannot deserialize SessionDescription without sdp field.")
return nil
}
var stype webrtc.SDPType
switch parsed["type"].(string) {
default:
log.Println("Unknown SDP type")
return nil
case "offer":
stype = webrtc.SDPTypeOffer
case "pranswer":
stype = webrtc.SDPTypePranswer
case "answer":
stype = webrtc.SDPTypeAnswer
case "rollback":
stype = webrtc.SDPTypeRollback
}
if err != nil {
log.Println(err)
return nil
}
return &webrtc.SessionDescription{
Type: stype,
SDP: parsed["sdp"].(string),
}
}
func serializeSessionDescription(desc *webrtc.SessionDescription) string {
bytes, err := json.Marshal(*desc)
if nil != err {
log.Println(err)
return ""
}
return string(bytes)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment