Commit b5c50b69 authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Ported snowflake client to work with pion/webrtc

Modified the snowflake client to use pion/webrtc as the webrtc library.
This involved a few small changes to match function signatures as well
as several larger ones:
- OnNegotiationNeeded is no longer supported, so CreateOffer and
SetLocalDescription have been moved to a go routine called after the
other peer connection callbacks are set
- We need our own deserialize/serialize functions
- We need to use a SettingEngine in order to access the
OnICEGatheringStateChange callback
parent 0428797e
...@@ -52,5 +52,5 @@ type SocksConnector interface { ...@@ -52,5 +52,5 @@ type SocksConnector interface {
// Interface for the Snowflake's transport. (Typically just webrtc.DataChannel) // Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
type SnowflakeDataChannel interface { type SnowflakeDataChannel interface {
io.Closer io.Closer
Send([]byte) Send([]byte) error
} }
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
"net/http" "net/http"
"testing" "testing"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
) )
...@@ -17,9 +17,10 @@ type MockDataChannel struct { ...@@ -17,9 +17,10 @@ type MockDataChannel struct {
done chan bool done chan bool
} }
func (m *MockDataChannel) Send(data []byte) { func (m *MockDataChannel) Send(data []byte) error {
m.destination.Write(data) m.destination.Write(data)
m.done <- true m.done <- true
return nil
} }
func (*MockDataChannel) Close() error { return nil } func (*MockDataChannel) Close() error { return nil }
...@@ -217,11 +218,11 @@ func TestSnowflakeClient(t *testing.T) { ...@@ -217,11 +218,11 @@ func TestSnowflakeClient(t *testing.T) {
c.offerChannel = make(chan *webrtc.SessionDescription, 1) c.offerChannel = make(chan *webrtc.SessionDescription, 1)
c.answerChannel = make(chan *webrtc.SessionDescription, 1) c.answerChannel = make(chan *webrtc.SessionDescription, 1)
c.config = webrtc.NewConfiguration() c.config = &webrtc.Configuration{}
c.preparePeerConnection() c.preparePeerConnection()
c.offerChannel <- nil c.offerChannel <- nil
answer := webrtc.DeserializeSessionDescription( answer := deserializeSessionDescription(
`{"type":"answer","sdp":""}`) `{"type":"answer","sdp":""}`)
c.answerChannel <- answer c.answerChannel <- answer
c.exchangeSDP() c.exchangeSDP()
...@@ -264,12 +265,11 @@ func TestSnowflakeClient(t *testing.T) { ...@@ -264,12 +265,11 @@ func TestSnowflakeClient(t *testing.T) {
}) })
Convey("Rendezvous", t, func() { Convey("Rendezvous", t, func() {
webrtc.SetLoggingVerbosity(0)
transport := &MockTransport{ transport := &MockTransport{
http.StatusOK, http.StatusOK,
[]byte(`{"type":"answer","sdp":"fake"}`), []byte(`{"type":"answer","sdp":"fake"}`),
} }
fakeOffer := webrtc.DeserializeSessionDescription("test") fakeOffer := deserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
Convey("Construct BrokerChannel with no front domain", func() { Convey("Construct BrokerChannel with no front domain", func() {
b := NewBrokerChannel("test.broker", "", transport) b := NewBrokerChannel("test.broker", "", transport)
...@@ -291,7 +291,7 @@ func TestSnowflakeClient(t *testing.T) { ...@@ -291,7 +291,7 @@ func TestSnowflakeClient(t *testing.T) {
answer, err := b.Negotiate(fakeOffer) answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil) So(err, ShouldBeNil)
So(answer, ShouldNotBeNil) So(answer, ShouldNotBeNil)
So(answer.Sdp, ShouldResemble, "fake") So(answer.SDP, ShouldResemble, "fake")
}) })
Convey("BrokerChannel.Negotiate fails with 503", func() { Convey("BrokerChannel.Negotiate fails with 503", func() {
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
) )
const ( const (
...@@ -84,7 +84,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( ...@@ -84,7 +84,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
*webrtc.SessionDescription, error) { *webrtc.SessionDescription, error) {
log.Println("Negotiating via BrokerChannel...\nTarget URL: ", log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
bc.Host, "\nFront URL: ", bc.url.Host) bc.Host, "\nFront URL: ", bc.url.Host)
data := bytes.NewReader([]byte(offer.Serialize())) data := bytes.NewReader([]byte(serializeSessionDescription(offer)))
// Suffix with broker's client registration handler. // Suffix with broker's client registration handler.
clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
request, err := http.NewRequest("POST", clientURL.String(), data) request, err := http.NewRequest("POST", clientURL.String(), data)
...@@ -107,7 +107,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( ...@@ -107,7 +107,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
if nil != err { if nil != err {
return nil, err return nil, err
} }
answer := webrtc.DeserializeSessionDescription(string(body)) answer := deserializeSessionDescription(string(body))
return answer, nil return answer, nil
case http.StatusServiceUnavailable: case http.StatusServiceUnavailable:
...@@ -126,15 +126,18 @@ type WebRTCDialer struct { ...@@ -126,15 +126,18 @@ type WebRTCDialer struct {
} }
func NewWebRTCDialer( func NewWebRTCDialer(
broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer { broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer {
config := webrtc.NewConfiguration(iceServers...) var config webrtc.Configuration
if nil == config { if iceServers == nil {
log.Println("Unable to prepare WebRTC configuration.") config = webrtc.Configuration{
return nil ICEServers: iceServers,
}
} else {
config = webrtc.Configuration{}
} }
return &WebRTCDialer{ return &WebRTCDialer{
BrokerChannel: broker, BrokerChannel: broker,
webrtcConfig: config, webrtcConfig: &config,
} }
} }
......
package lib package lib
import ( import (
"fmt" "encoding/json"
"log" "log"
"time" "time"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
) )
const ( const (
LogTimeInterval = 5 LogTimeInterval = 5
) )
type IceServerList []webrtc.ConfigurationOption
func (i *IceServerList) String() string {
return fmt.Sprint(*i)
}
type BytesLogger interface { type BytesLogger interface {
Log() Log()
AddOutbound(int) AddOutbound(int)
...@@ -93,3 +87,52 @@ func (b *BytesSyncLogger) AddInbound(amount int) { ...@@ -93,3 +87,52 @@ func (b *BytesSyncLogger) AddInbound(amount int) {
} }
b.InboundChan <- amount b.InboundChan <- amount
} }
func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
var parsed map[string]interface{}
err := json.Unmarshal([]byte(msg), &parsed)
if nil != err {
log.Println(err)
return nil
}
if _, ok := parsed["type"]; !ok {
log.Println("Cannot deserialize SessionDescription without type field.")
return nil
}
if _, ok := parsed["sdp"]; !ok {
log.Println("Cannot deserialize SessionDescription without sdp field.")
return nil
}
var stype webrtc.SDPType
switch parsed["type"].(string) {
default:
log.Println("Unknown SDP type")
return nil
case "offer":
stype = webrtc.SDPTypeOffer
case "pranswer":
stype = webrtc.SDPTypePranswer
case "answer":
stype = webrtc.SDPTypeAnswer
case "rollback":
stype = webrtc.SDPTypeRollback
}
if err != nil {
log.Println(err)
return nil
}
return &webrtc.SessionDescription{
Type: stype,
SDP: parsed["sdp"].(string),
}
}
func serializeSessionDescription(desc *webrtc.SessionDescription) string {
bytes, err := json.Marshal(*desc)
if nil != err {
log.Println(err)
return ""
}
return string(bytes)
}
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
"time" "time"
"github.com/dchest/uniuri" "github.com/dchest/uniuri"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
) )
// Remote WebRTC peer. // Remote WebRTC peer.
...@@ -151,48 +151,54 @@ func (c *WebRTCPeer) Connect() error { ...@@ -151,48 +151,54 @@ func (c *WebRTCPeer) Connect() error {
// Create and prepare callbacks on a new WebRTC PeerConnection. // Create and prepare callbacks on a new WebRTC PeerConnection.
func (c *WebRTCPeer) preparePeerConnection() error { func (c *WebRTCPeer) preparePeerConnection() error {
if nil != c.pc { if nil != c.pc {
c.pc.Destroy() c.pc.Close()
c.pc = nil c.pc = nil
} }
pc, err := webrtc.NewPeerConnection(c.config) s := webrtc.SettingEngine{}
s.SetTrickle(true)
api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
pc, err := api.NewPeerConnection(*c.config)
if err != nil { if err != nil {
log.Printf("NewPeerConnection ERROR: %s", err) log.Printf("NewPeerConnection ERROR: %s", err)
return err return err
} }
// Prepare PeerConnection callbacks. // Prepare PeerConnection callbacks.
pc.OnNegotiationNeeded = func() { // Allow candidates to accumulate until ICEGatheringStateComplete.
log.Println("WebRTC: OnNegotiationNeeded") pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
log.Printf("WebRTC: Done gathering candidates")
} else {
log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
}
})
pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
if state == webrtc.ICEGathererStateComplete {
log.Println("WebRTC: ICEGatheringStateComplete")
c.offerChannel <- pc.LocalDescription()
}
})
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
pc.OnDataChannel(func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
})
c.pc = pc
go func() { go func() {
offer, err := pc.CreateOffer() offer, err := pc.CreateOffer(nil)
// TODO: Potentially timeout and retry if ICE isn't working. // TODO: Potentially timeout and retry if ICE isn't working.
if err != nil { if err != nil {
c.errorChannel <- err c.errorChannel <- err
return return
} }
log.Println("WebRTC: Created offer")
err = pc.SetLocalDescription(offer) err = pc.SetLocalDescription(offer)
if err != nil { if err != nil {
c.errorChannel <- err c.errorChannel <- err
return return
} }
log.Println("WebRTC: Set local description")
}() }()
}
// Allow candidates to accumulate until IceGatheringStateComplete.
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
log.Printf(candidate.Candidate)
}
pc.OnIceGatheringStateChange = func(state webrtc.IceGatheringState) {
if state == webrtc.IceGatheringStateComplete {
log.Printf("WebRTC: IceGatheringStateComplete")
c.offerChannel <- pc.LocalDescription()
}
}
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
pc.OnDataChannel = func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
}
c.pc = pc
log.Println("WebRTC: PeerConnection created.") log.Println("WebRTC: PeerConnection created.")
return nil return nil
} }
...@@ -204,7 +210,11 @@ func (c *WebRTCPeer) establishDataChannel() error { ...@@ -204,7 +210,11 @@ func (c *WebRTCPeer) establishDataChannel() error {
if c.transport != nil { if c.transport != nil {
panic("Unexpected datachannel already exists!") panic("Unexpected datachannel already exists!")
} }
dc, err := c.pc.CreateDataChannel(c.id) ordered := true
dataChannelOptions := &webrtc.DataChannelInit{
Ordered: &ordered,
}
dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the // an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen". // signaling. Eventually fires "OnOpen".
...@@ -212,7 +222,7 @@ func (c *WebRTCPeer) establishDataChannel() error { ...@@ -212,7 +222,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
log.Printf("CreateDataChannel ERROR: %s", err) log.Printf("CreateDataChannel ERROR: %s", err)
return err return err
} }
dc.OnOpen = func() { dc.OnOpen(func() {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen") log.Println("WebRTC: DataChannel.OnOpen")
...@@ -227,8 +237,8 @@ func (c *WebRTCPeer) establishDataChannel() error { ...@@ -227,8 +237,8 @@ func (c *WebRTCPeer) establishDataChannel() error {
} }
// Then enable the datachannel. // Then enable the datachannel.
c.transport = dc c.transport = dc
} })
dc.OnClose = func() { dc.OnClose(func() {
c.lock.Lock() c.lock.Lock()
// Future writes will go to the buffer until a new DataChannel is available. // Future writes will go to the buffer until a new DataChannel is available.
if nil == c.transport { if nil == c.transport {
...@@ -241,29 +251,29 @@ func (c *WebRTCPeer) establishDataChannel() error { ...@@ -241,29 +251,29 @@ func (c *WebRTCPeer) establishDataChannel() error {
// Disable the DataChannel as a write destination. // Disable the DataChannel as a write destination.
log.Println("WebRTC: DataChannel.OnClose [remotely]") log.Println("WebRTC: DataChannel.OnClose [remotely]")
c.transport = nil c.transport = nil
c.pc.DeleteDataChannel(dc) dc.Close()
// Unlock before Close'ing, since it calls cleanup and asks for the // Unlock before Close'ing, since it calls cleanup and asks for the
// lock to check if the transport needs to be be deleted. // lock to check if the transport needs to be be deleted.
c.lock.Unlock() c.lock.Unlock()
c.Close() c.Close()
} })
dc.OnMessage = func(msg []byte) { dc.OnMessage(func(msg webrtc.DataChannelMessage) {
if len(msg) <= 0 { if len(msg.Data) <= 0 {
log.Println("0 length message---") log.Println("0 length message---")
} }
c.BytesLogger.AddInbound(len(msg)) c.BytesLogger.AddInbound(len(msg.Data))
n, err := c.writePipe.Write(msg) n, err := c.writePipe.Write(msg.Data)
if err != nil { if err != nil {
// TODO: Maybe shouldn't actually close. // TODO: Maybe shouldn't actually close.
log.Println("Error writing to SOCKS pipe") log.Println("Error writing to SOCKS pipe")
c.writePipe.CloseWithError(err) c.writePipe.CloseWithError(err)
} }
if n != len(msg) { if n != len(msg.Data) {
log.Println("Error: short write") log.Println("Error: short write")
panic("short write") panic("short write")
} }
c.lastReceive = time.Now() c.lastReceive = time.Now()
} })
log.Println("WebRTC: DataChannel created.") log.Println("WebRTC: DataChannel created.")
return nil return nil
} }
...@@ -304,7 +314,7 @@ func (c *WebRTCPeer) exchangeSDP() error { ...@@ -304,7 +314,7 @@ func (c *WebRTCPeer) exchangeSDP() error {
} }
} }
log.Printf("Received Answer.\n") log.Printf("Received Answer.\n")
err := c.pc.SetRemoteDescription(answer) err := c.pc.SetRemoteDescription(*answer)
if nil != err { if nil != err {
log.Println("WebRTC: Unable to SetRemoteDescription:", err) log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err return err
...@@ -342,13 +352,13 @@ func (c *WebRTCPeer) cleanup() { ...@@ -342,13 +352,13 @@ func (c *WebRTCPeer) cleanup() {
if c.pc == nil { if c.pc == nil {
panic("DataChannel w/o PeerConnection, not good.") panic("DataChannel w/o PeerConnection, not good.")
} }
c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel)) dataChannel.(*webrtc.DataChannel).Close()
} else { } else {
c.lock.Unlock() c.lock.Unlock()
} }
if nil != c.pc { if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection") log.Printf("WebRTC: closing PeerConnection")
err := c.pc.Destroy() err := c.pc.Close()
if nil != err { if nil != err {
log.Printf("Error closing peerconnection...") log.Printf("Error closing peerconnection...")
} }
......
...@@ -17,7 +17,7 @@ import ( ...@@ -17,7 +17,7 @@ import (
"git.torproject.org/pluggable-transports/goptlib.git" "git.torproject.org/pluggable-transports/goptlib.git"
sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib" sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"github.com/keroserene/go-webrtc" "github.com/pion/webrtc"
) )
const ( const (
...@@ -65,6 +65,25 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) err ...@@ -65,6 +65,25 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) err
} }
} }
//s is a comma-separated list of ICE server URLs
func parseIceServers(s string) []webrtc.ICEServer {
var servers []webrtc.ICEServer
log.Println(s)
s = strings.TrimSpace(s)
if len(s) == 0 {
return nil
}
urls := strings.Split(s, ",")
log.Printf("Using ICE Servers:")
for _, url := range urls {
log.Printf("url: %s", url)
servers = append(servers, webrtc.ICEServer{
URLs: []string{url},
})
}
return servers
}
func main() { func main() {
iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers") iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
brokerURL := flag.String("url", "", "URL of signaling broker") brokerURL := flag.String("url", "", "URL of signaling broker")
...@@ -75,7 +94,6 @@ func main() { ...@@ -75,7 +94,6 @@ func main() {
"capacity for number of multiplexed WebRTC peers") "capacity for number of multiplexed WebRTC peers")
flag.Parse() flag.Parse()
webrtc.SetLoggingVerbosity(1)
log.SetFlags(log.LstdFlags | log.LUTC) log.SetFlags(log.LstdFlags | log.LUTC)
// Don't write to stderr; versions of tor earlier than about // Don't write to stderr; versions of tor earlier than about
...@@ -105,11 +123,7 @@ func main() { ...@@ -105,11 +123,7 @@ func main() {
log.Println("\n\n\n --- Starting Snowflake Client ---") log.Println("\n\n\n --- Starting Snowflake Client ---")
var iceServers sf.IceServerList iceServers := parseIceServers(*iceServersCommas)
if len(strings.TrimSpace(*iceServersCommas)) > 0 {
option := webrtc.OptionIceServer(*iceServersCommas)
iceServers = append(iceServers, option)
}
// Prepare to collect remote WebRTC peers. // Prepare to collect remote WebRTC peers.
snowflakes := sf.NewPeers(*max) snowflakes := sf.NewPeers(*max)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment