webrtc.go 9.71 KB
Newer Older
1
package lib
2
3
4

import (
	"bytes"
5
	"errors"
6
7
	"io"
	"log"
8
	"sync"
9
	"time"
10
11

	"github.com/dchest/uniuri"
12
	"github.com/pion/webrtc"
13
14
)

15
16
17
// Remote WebRTC peer.
// Implements the |Snowflake| interface, which includes
// |io.ReadWriter|, |Resetter|, and |Connector|.
18
19
20
21
22
//
// Handles preparation of go-webrtc PeerConnection. Only ever has
// one DataChannel.
type WebRTCPeer struct {
	id        string
23
24
	config    *webrtc.Configuration
	pc        *webrtc.PeerConnection
25
	transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
26
27
	broker    *BrokerChannel

28
29
30
31
32
	offerChannel  chan *webrtc.SessionDescription
	answerChannel chan *webrtc.SessionDescription
	errorChannel  chan error
	recvPipe      *io.PipeReader
	writePipe     *io.PipeWriter
33
	lastReceive   time.Time
34
35
	buffer        bytes.Buffer
	reset         chan struct{}
36
37

	closed bool
38

39
40
41
	lock sync.Mutex // Synchronization for DataChannel destruction
	once sync.Once  // Synchronization for PeerConnection destruction

42
	BytesLogger
43
44
}

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Construct a WebRTC PeerConnection.
func NewWebRTCPeer(config *webrtc.Configuration,
	broker *BrokerChannel) *WebRTCPeer {
	connection := new(WebRTCPeer)
	connection.id = "snowflake-" + uniuri.New()
	connection.config = config
	connection.broker = broker
	connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
	connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
	// Error channel is mostly for reporting during the initial SDP offer
	// creation & local description setting, which happens asynchronously.
	connection.errorChannel = make(chan error, 1)
	connection.reset = make(chan struct{}, 1)

	// Override with something that's not NullLogger to have real logging.
	connection.BytesLogger = &BytesNullLogger{}

	// Pipes remain the same even when DataChannel gets switched.
	connection.recvPipe, connection.writePipe = io.Pipe()
	return connection
}

67
// Read bytes from local SOCKS.
68
// As part of |io.ReadWriter|
69
func (c *WebRTCPeer) Read(b []byte) (int, error) {
70
71
72
	return c.recvPipe.Read(b)
}

73
74
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
75
func (c *WebRTCPeer) Write(b []byte) (int, error) {
76
77
	c.lock.Lock()
	defer c.lock.Unlock()
78
	c.BytesLogger.AddOutbound(len(b))
79
	// TODO: Buffering could be improved / separated out of WebRTCPeer.
80
	if nil == c.transport {
81
82
83
		log.Printf("Buffered %d bytes --> WebRTC", len(b))
		c.buffer.Write(b)
	} else {
84
		c.transport.Send(b)
85
	}
86
87
88
	return len(b), nil
}

89
// As part of |Snowflake|
90
func (c *WebRTCPeer) Close() error {
91
92
93
94
95
96
	c.once.Do(func() {
		c.closed = true
		c.cleanup()
		c.Reset()
		log.Printf("WebRTC: Closing")
	})
97
	return nil
98
99
}

100
// As part of |Resetter|
101
func (c *WebRTCPeer) Reset() {
102
103
104
105
	if nil == c.reset {
		return
	}
	c.reset <- struct{}{}
106
107
}

108
// As part of |Resetter|
109
func (c *WebRTCPeer) WaitForReset() { <-c.reset }
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Prevent long-lived broken remotes.
// Should also update the DataChannel in underlying go-webrtc's to make Closes
// more immediate / responsive.
func (c *WebRTCPeer) checkForStaleness() {
	c.lastReceive = time.Now()
	for {
		if c.closed {
			return
		}
		if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
			log.Println("WebRTC: No messages received for", SnowflakeTimeout,
				"seconds -- closing stale connection.")
			c.Close()
			return
		}
		<-time.After(time.Second)
	}
128
129
}

130
// As part of |Connector| interface.
131
132
func (c *WebRTCPeer) Connect() error {
	log.Println(c.id, " connecting...")
133
134
135
136
	// TODO: When go-webrtc is more stable, it's possible that a new
	// PeerConnection won't need to be re-prepared each time.
	err := c.preparePeerConnection()
	if err != nil {
137
		return err
138
	}
139
140
	err = c.establishDataChannel()
	if err != nil {
141
		return errors.New("WebRTC: Could not establish DataChannel.")
142
	}
143
144
145
146
	err = c.exchangeSDP()
	if err != nil {
		return err
	}
147
	go c.checkForStaleness()
148
	return nil
149
150
}

151
// Create and prepare callbacks on a new WebRTC PeerConnection.
152
func (c *WebRTCPeer) preparePeerConnection() error {
153
	if nil != c.pc {
154
		c.pc.Close()
155
156
		c.pc = nil
	}
157
158
159
160
	s := webrtc.SettingEngine{}
	s.SetTrickle(true)
	api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
	pc, err := api.NewPeerConnection(*c.config)
161
	if err != nil {
162
163
		log.Printf("NewPeerConnection ERROR: %s", err)
		return err
164
165
	}
	// Prepare PeerConnection callbacks.
166
167
168
169
170
171
172
173
174
175
176
	// Allow candidates to accumulate until ICEGatheringStateComplete.
	pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
		if candidate == nil {
			log.Printf("WebRTC: Done gathering candidates")
		} else {
			log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
		}
	})
	pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
		if state == webrtc.ICEGathererStateComplete {
			log.Println("WebRTC: ICEGatheringStateComplete")
177
178
			c.offerChannel <- pc.LocalDescription()
		}
179
	})
180
181
	// This callback is not expected, as the Client initiates the creation
	// of the data channel, not the remote peer.
182
	pc.OnDataChannel(func(channel *webrtc.DataChannel) {
183
184
		log.Println("OnDataChannel")
		panic("Unexpected OnDataChannel!")
185
	})
186
	c.pc = pc
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
	go func() {
		offer, err := pc.CreateOffer(nil)
		// TODO: Potentially timeout and retry if ICE isn't working.
		if err != nil {
			c.errorChannel <- err
			return
		}
		log.Println("WebRTC: Created offer")
		err = pc.SetLocalDescription(offer)
		if err != nil {
			c.errorChannel <- err
			return
		}
		log.Println("WebRTC: Set local description")
	}()
202
	log.Println("WebRTC: PeerConnection created.")
203
	return nil
204
205
206
}

// Create a WebRTC DataChannel locally.
207
func (c *WebRTCPeer) establishDataChannel() error {
208
209
	c.lock.Lock()
	defer c.lock.Unlock()
210
	if c.transport != nil {
211
212
		panic("Unexpected datachannel already exists!")
	}
213
214
215
216
217
	ordered := true
	dataChannelOptions := &webrtc.DataChannelInit{
		Ordered: &ordered,
	}
	dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
218
219
220
221
	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
	// an SDP offer while other goroutines operating on this struct handle the
	// signaling. Eventually fires "OnOpen".
	if err != nil {
222
		log.Printf("CreateDataChannel ERROR: %s", err)
223
224
		return err
	}
225
	dc.OnOpen(func() {
226
227
		c.lock.Lock()
		defer c.lock.Unlock()
228
		log.Println("WebRTC: DataChannel.OnOpen")
229
230
		if nil != c.transport {
			panic("WebRTC: transport already exists.")
231
		}
232
		// Flush buffered outgoing SOCKS data if necessary.
233
234
235
236
237
238
		if c.buffer.Len() > 0 {
			dc.Send(c.buffer.Bytes())
			log.Println("Flushed", c.buffer.Len(), "bytes.")
			c.buffer.Reset()
		}
		// Then enable the datachannel.
239
		c.transport = dc
240
241
	})
	dc.OnClose(func() {
242
		c.lock.Lock()
243
		// Future writes will go to the buffer until a new DataChannel is available.
244
		if nil == c.transport {
245
246
			// Closed locally, as part of a reset.
			log.Println("WebRTC: DataChannel.OnClose [locally]")
247
			c.lock.Unlock()
248
			return
249
		}
250
251
252
		// Closed remotely, need to reset everything.
		// Disable the DataChannel as a write destination.
		log.Println("WebRTC: DataChannel.OnClose [remotely]")
253
		c.transport = nil
254
		dc.Close()
255
256
257
		// Unlock before Close'ing, since it calls cleanup and asks for the
		// lock to check if the transport needs to be be deleted.
		c.lock.Unlock()
258
		c.Close()
259
260
261
	})
	dc.OnMessage(func(msg webrtc.DataChannelMessage) {
		if len(msg.Data) <= 0 {
262
			log.Println("0 length message---")
263
		}
264
265
		c.BytesLogger.AddInbound(len(msg.Data))
		n, err := c.writePipe.Write(msg.Data)
266
267
268
269
270
		if err != nil {
			// TODO: Maybe shouldn't actually close.
			log.Println("Error writing to SOCKS pipe")
			c.writePipe.CloseWithError(err)
		}
271
		if n != len(msg.Data) {
272
			log.Println("Error: short write")
273
274
			panic("short write")
		}
275
		c.lastReceive = time.Now()
276
	})
277
278
279
280
	log.Println("WebRTC: DataChannel created.")
	return nil
}

281
func (c *WebRTCPeer) sendOfferToBroker() {
282
	if nil == c.broker {
283
284
285
286
287
		return
	}
	offer := c.pc.LocalDescription()
	answer, err := c.broker.Negotiate(offer)
	if nil != err || nil == answer {
288
		log.Printf("BrokerChannel Error: %s", err)
289
290
291
292
293
294
295
		answer = nil
	}
	c.answerChannel <- answer
}

// Block until an SDP offer is available, send it to either
// the Broker or signal pipe, then await for the SDP answer.
296
func (c *WebRTCPeer) exchangeSDP() error {
297
	select {
Arlo Breault's avatar
Arlo Breault committed
298
	case <-c.offerChannel:
299
	case err := <-c.errorChannel:
300
		log.Println("Failed to prepare offer", err)
301
		c.Close()
302
303
		return err
	}
304
305
306
307
308
309
	// Keep trying the same offer until a valid answer arrives.
	var ok bool
	var answer *webrtc.SessionDescription = nil
	for nil == answer {
		go c.sendOfferToBroker()
		answer, ok = <-c.answerChannel // Blocks...
310
311
312
		if !ok || nil == answer {
			log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
			<-time.After(time.Second * ReconnectTimeout)
313
			answer = nil
314
		}
315
	}
316
	log.Printf("Received Answer.\n")
317
	err := c.pc.SetRemoteDescription(*answer)
318
	if nil != err {
319
320
		log.Println("WebRTC: Unable to SetRemoteDescription:", err)
		return err
321
322
	}
	return nil
323
324
}

325
// Close all channels and transports
326
func (c *WebRTCPeer) cleanup() {
327
328
329
330
331
332
333
334
335
	if nil != c.offerChannel {
		close(c.offerChannel)
	}
	if nil != c.answerChannel {
		close(c.answerChannel)
	}
	if nil != c.errorChannel {
		close(c.errorChannel)
	}
336
337
338
339
340
	// Close this side of the SOCKS pipe.
	if nil != c.writePipe {
		c.writePipe.Close()
		c.writePipe = nil
	}
341
	c.lock.Lock()
342
	if nil != c.transport {
343
		log.Printf("WebRTC: closing DataChannel")
344
		dataChannel := c.transport
345
346
		// Setting transport to nil *before* dc Close indicates to OnClose that
		// this was locally triggered.
347
		c.transport = nil
348
349
350
351
		// Release the lock before calling DeleteDataChannel (which in turn
		// calls Close on the dataChannel), but after nil'ing out the transport,
		// since otherwise we'll end up in the onClose handler in a deadlock.
		c.lock.Unlock()
Arlo Breault's avatar
Arlo Breault committed
352
353
354
		if c.pc == nil {
			panic("DataChannel w/o PeerConnection, not good.")
		}
355
		dataChannel.(*webrtc.DataChannel).Close()
356
357
	} else {
		c.lock.Unlock()
358
359
360
	}
	if nil != c.pc {
		log.Printf("WebRTC: closing PeerConnection")
361
		err := c.pc.Close()
362
363
364
		if nil != err {
			log.Printf("Error closing peerconnection...")
		}
365
366
367
		c.pc = nil
	}
}