Commit 1114acbc authored by Arlo Breault's avatar Arlo Breault
Browse files

Add synchronization around destroying DataChannels and PeerConnections

From https://trac.torproject.org/projects/tor/ticket/21312#comment:33
parent 40bf7664
......@@ -5,6 +5,7 @@ import (
"errors"
"io"
"log"
"sync"
"time"
"github.com/dchest/uniuri"
......@@ -35,6 +36,9 @@ type WebRTCPeer struct {
closed bool
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
BytesLogger
}
......@@ -69,6 +73,8 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
c.lock.Lock()
defer c.lock.Unlock()
c.BytesLogger.AddOutbound(len(b))
// TODO: Buffering could be improved / separated out of WebRTCPeer.
if nil == c.transport {
......@@ -82,14 +88,12 @@ func (c *WebRTCPeer) Write(b []byte) (int, error) {
// As part of |Snowflake|
func (c *WebRTCPeer) Close() error {
if c.closed { // Skip if already closed.
return nil
}
// Mark for deletion.
c.closed = true
c.cleanup()
c.Reset()
log.Printf("WebRTC: Closing")
c.once.Do(func() {
c.closed = true
c.cleanup()
c.Reset()
log.Printf("WebRTC: Closing")
})
return nil
}
......@@ -194,6 +198,8 @@ func (c *WebRTCPeer) preparePeerConnection() error {
// Create a WebRTC DataChannel locally.
func (c *WebRTCPeer) establishDataChannel() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.transport != nil {
panic("Unexpected datachannel already exists!")
}
......@@ -206,6 +212,8 @@ func (c *WebRTCPeer) establishDataChannel() error {
return err
}
dc.OnOpen = func() {
c.lock.Lock()
defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen")
if nil != c.transport {
panic("WebRTC: transport already exists.")
......@@ -220,10 +228,12 @@ func (c *WebRTCPeer) establishDataChannel() error {
c.transport = dc
}
dc.OnClose = func() {
c.lock.Lock()
// Future writes will go to the buffer until a new DataChannel is available.
if nil == c.transport {
// Closed locally, as part of a reset.
log.Println("WebRTC: DataChannel.OnClose [locally]")
c.lock.Unlock()
return
}
// Closed remotely, need to reset everything.
......@@ -231,6 +241,9 @@ func (c *WebRTCPeer) establishDataChannel() error {
log.Println("WebRTC: DataChannel.OnClose [remotely]")
c.transport = nil
c.pc.DeleteDataChannel(dc)
// Unlock before Close'ing, since it calls cleanup and asks for the
// lock to check if the transport needs to be be deleted.
c.lock.Unlock()
c.Close()
}
dc.OnMessage = func(msg []byte) {
......@@ -321,16 +334,23 @@ func (c *WebRTCPeer) cleanup() {
c.writePipe.Close()
c.writePipe = nil
}
c.lock.Lock()
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
dataChannel := c.transport
// Setting transport to nil *before* dc Close indicates to OnClose that
// this was locally triggered.
c.transport = nil
// Release the lock before calling DeleteDataChannel (which in turn
// calls Close on the dataChannel), but after nil'ing out the transport,
// since otherwise we'll end up in the onClose handler in a deadlock.
c.lock.Unlock()
if c.pc == nil {
panic("DataChannel w/o PeerConnection, not good.")
}
c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
} else {
c.lock.Unlock()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
......
......@@ -62,6 +62,9 @@ type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
pr *io.PipeReader
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
}
func (c *webRTCConn) Read(b []byte) (int, error) {
......@@ -69,6 +72,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
}
func (c *webRTCConn) Write(b []byte) (int, error) {
c.lock.Lock()
defer c.lock.Unlock()
// log.Printf("webrtc Write %d %+q", len(b), string(b))
log.Printf("Write %d bytes --> WebRTC", len(b))
if c.dc != nil {
......@@ -77,8 +82,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
return len(b), nil
}
func (c *webRTCConn) Close() error {
return c.pc.Destroy()
func (c *webRTCConn) Close() (err error) {
c.once.Do(func() {
err = c.pc.Destroy()
})
return
}
func (c *webRTCConn) LocalAddr() net.Addr {
......@@ -255,17 +263,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.
log.Println("OnDataChannel")
pr, pw := io.Pipe()
conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
dc.OnOpen = func() {
log.Println("OnOpen channel")
}
dc.OnClose = func() {
conn.lock.Lock()
defer conn.lock.Unlock()
log.Println("OnClose channel")
pw.Close()
conn.dc = nil
pc.DeleteDataChannel(dc)
pw.Close()
}
dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg))
......
......@@ -43,6 +43,9 @@ type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
pr *io.PipeReader
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
}
func (c *webRTCConn) Read(b []byte) (int, error) {
......@@ -50,6 +53,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
}
func (c *webRTCConn) Write(b []byte) (int, error) {
c.lock.Lock()
defer c.lock.Unlock()
// log.Printf("webrtc Write %d %+q", len(b), string(b))
log.Printf("Write %d bytes --> WebRTC", len(b))
if c.dc != nil {
......@@ -58,8 +63,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
return len(b), nil
}
func (c *webRTCConn) Close() error {
return c.pc.Destroy()
func (c *webRTCConn) Close() (err error) {
c.once.Do(func() {
err = c.pc.Destroy()
})
return
}
func (c *webRTCConn) LocalAddr() net.Addr {
......@@ -122,17 +130,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.
log.Println("OnDataChannel")
pr, pw := io.Pipe()
conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
dc.OnOpen = func() {
log.Println("OnOpen channel")
}
dc.OnClose = func() {
conn.lock.Lock()
defer conn.lock.Unlock()
log.Println("OnClose channel")
pw.Close()
conn.dc = nil
pc.DeleteDataChannel(dc)
pw.Close()
}
dc.OnMessage = func(msg []byte) {
log.Printf("OnMessage <--- %d bytes", len(msg))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment