GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Commit 047d3214 authored by David Fifield's avatar David Fifield

Wait for data channel OnOpen before returning from NewWebRTCPeer.

Now callers cannot call Write without there being a DataChannel to write
to. This lets us remove the internal buffer and checks for transport ==
nil.

Don't set internal fields like writePipe, transport, and pc to nil when
closing; just close them and let them return errors if further calls are
made on them.

There's now a constant DataChannelTimeout that's separate from
SnowflakeTimeout (the latter is what checkForStaleness uses). Now we can
set DataChannel timeout to a lower value, to quickly dispose of
unconnectable proxies, while still keeping the threshold for detecting
the failure of a once-working proxy at 30 seconds.

https://bugs.torproject.org/33897
parent e8c41650
......@@ -17,6 +17,8 @@ import (
const (
ReconnectTimeout = 10 * time.Second
SnowflakeTimeout = 30 * time.Second
// How long to wait for the OnOpen callback on a DataChannel.
DataChannelTimeout = 30 * time.Second
)
type dummyAddr struct{}
......
package lib
import (
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
......@@ -25,12 +24,10 @@ type WebRTCPeer struct {
recvPipe *io.PipeReader
writePipe *io.PipeWriter
lastReceive time.Time
buffer bytes.Buffer
closed bool
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
once sync.Once // Synchronization for PeerConnection destruction
BytesLogger BytesLogger
}
......@@ -70,16 +67,11 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
c.lock.Lock()
defer c.lock.Unlock()
c.BytesLogger.AddOutbound(len(b))
// TODO: Buffering could be improved / separated out of WebRTCPeer.
if nil == c.transport {
log.Printf("Buffered %d bytes --> WebRTC", len(b))
c.buffer.Write(b)
} else {
c.transport.Send(b)
err := c.transport.Send(b)
if err != nil {
return 0, err
}
c.BytesLogger.AddOutbound(len(b))
return len(b), nil
}
......@@ -127,8 +119,9 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err
}
err = c.establishDataChannel()
c.transport, err = c.establishDataChannel()
if err != nil {
log.Printf("establishDataChannel: %v", err)
// nolint: golint
return errors.New("WebRTC: Could not establish DataChannel")
}
......@@ -177,13 +170,9 @@ func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection
return pc, nil
}
// Create a WebRTC DataChannel locally.
func (c *WebRTCPeer) establishDataChannel() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.transport != nil {
panic("Unexpected datachannel already exists!")
}
// Create a WebRTC DataChannel locally. Blocks until the data channel is open,
// or a timeout or error occurs.
func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
ordered := true
dataChannelOptions := &webrtc.DataChannelInit{
Ordered: &ordered,
......@@ -191,41 +180,15 @@ func (c *WebRTCPeer) establishDataChannel() error {
dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
if err != nil {
log.Printf("CreateDataChannel ERROR: %s", err)
return err
return nil, err
}
openChannel := make(chan struct{})
dc.OnOpen(func() {
c.lock.Lock()
defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen")
if nil != c.transport {
panic("WebRTC: transport already exists.")
}
// Flush buffered outgoing SOCKS data if necessary.
if c.buffer.Len() > 0 {
dc.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes.")
c.buffer.Reset()
}
// Then enable the datachannel.
c.transport = dc
close(openChannel)
})
dc.OnClose(func() {
c.lock.Lock()
// Future writes will go to the buffer until a new DataChannel is available.
if nil == c.transport {
// Closed locally, as part of a reset.
log.Println("WebRTC: DataChannel.OnClose [locally]")
c.lock.Unlock()
return
}
// Closed remotely, need to reset everything.
// Disable the DataChannel as a write destination.
log.Println("WebRTC: DataChannel.OnClose [remotely]")
c.transport = nil
dc.Close()
// Unlock before Close'ing, since it calls cleanup and asks for the
// lock to check if the transport needs to be be deleted.
c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnClose")
c.Close()
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
......@@ -244,7 +207,14 @@ func (c *WebRTCPeer) establishDataChannel() error {
c.lastReceive = time.Now()
})
log.Println("WebRTC: DataChannel created.")
return nil
select {
case <-openChannel:
return dc, nil
case <-time.After(DataChannelTimeout):
dc.Close()
return nil, errors.New("timeout waiting for DataChannel.OnOpen")
}
}
// exchangeSDP sends the local SDP offer to the Broker, awaits the SDP answer,
......@@ -266,27 +236,10 @@ func exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrt
// Close all channels and transports
func (c *WebRTCPeer) cleanup() {
// Close this side of the SOCKS pipe.
if nil != c.writePipe {
c.writePipe.Close()
c.writePipe = nil
}
c.lock.Lock()
c.writePipe.Close()
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
dataChannel := c.transport
// Setting transport to nil *before* dc Close indicates to OnClose that
// this was locally triggered.
c.transport = nil
// Release the lock before calling DeleteDataChannel (which in turn
// calls Close on the dataChannel), but after nil'ing out the transport,
// since otherwise we'll end up in the onClose handler in a deadlock.
c.lock.Unlock()
if c.pc == nil {
panic("DataChannel w/o PeerConnection, not good.")
}
dataChannel.Close()
} else {
c.lock.Unlock()
c.transport.Close()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
......@@ -294,6 +247,5 @@ func (c *WebRTCPeer) cleanup() {
if nil != err {
log.Printf("Error closing peerconnection...")
}
c.pc = nil
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment