webrtc.go 8.87 KB
Newer Older
1
2
3
4
package main

import (
	"bytes"
5
	"errors"
6
7
8
	"io"
	"log"
	"time"
9
10
11

	"github.com/dchest/uniuri"
	"github.com/keroserene/go-webrtc"
12
13
)

14
15
16
// Remote WebRTC peer.
// Implements the |Snowflake| interface, which includes
// |io.ReadWriter|, |Resetter|, and |Connector|.
17
18
19
20
21
//
// Handles preparation of go-webrtc PeerConnection. Only ever has
// one DataChannel.
type WebRTCPeer struct {
	id        string
22
23
	config    *webrtc.Configuration
	pc        *webrtc.PeerConnection
24
	transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
25
26
	broker    *BrokerChannel

27
28
29
30
31
	offerChannel  chan *webrtc.SessionDescription
	answerChannel chan *webrtc.SessionDescription
	errorChannel  chan error
	recvPipe      *io.PipeReader
	writePipe     *io.PipeWriter
32
	lastReceive   time.Time
33
34
	buffer        bytes.Buffer
	reset         chan struct{}
35
36

	closed bool
37
38

	BytesLogger
39
40
}

41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Construct a WebRTC PeerConnection.
func NewWebRTCPeer(config *webrtc.Configuration,
	broker *BrokerChannel) *WebRTCPeer {
	connection := new(WebRTCPeer)
	connection.id = "snowflake-" + uniuri.New()
	connection.config = config
	connection.broker = broker
	connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
	connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
	// Error channel is mostly for reporting during the initial SDP offer
	// creation & local description setting, which happens asynchronously.
	connection.errorChannel = make(chan error, 1)
	connection.reset = make(chan struct{}, 1)

	// Override with something that's not NullLogger to have real logging.
	connection.BytesLogger = &BytesNullLogger{}

	// Pipes remain the same even when DataChannel gets switched.
	connection.recvPipe, connection.writePipe = io.Pipe()
	return connection
}

63
// Read bytes from local SOCKS.
64
// As part of |io.ReadWriter|
65
func (c *WebRTCPeer) Read(b []byte) (int, error) {
66
67
68
	return c.recvPipe.Read(b)
}

69
70
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
71
func (c *WebRTCPeer) Write(b []byte) (int, error) {
72
	c.BytesLogger.AddOutbound(len(b))
73
	// TODO: Buffering could be improved / separated out of WebRTCPeer.
74
	if nil == c.transport {
75
76
77
		log.Printf("Buffered %d bytes --> WebRTC", len(b))
		c.buffer.Write(b)
	} else {
78
		c.transport.Send(b)
79
	}
80
81
82
	return len(b), nil
}

83
// As part of |Snowflake|
84
func (c *WebRTCPeer) Close() error {
85
86
87
	if c.closed { // Skip if already closed.
		return nil
	}
88
89
	// Mark for deletion.
	c.closed = true
90
91
92
	c.cleanup()
	c.Reset()
	log.Printf("WebRTC: Closing")
93
	return nil
94
95
}

96
// As part of |Resetter|
97
func (c *WebRTCPeer) Reset() {
98
99
100
101
	if nil == c.reset {
		return
	}
	c.reset <- struct{}{}
102
103
}

104
// As part of |Resetter|
105
func (c *WebRTCPeer) WaitForReset() { <-c.reset }
106

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Prevent long-lived broken remotes.
// Should also update the DataChannel in underlying go-webrtc's to make Closes
// more immediate / responsive.
func (c *WebRTCPeer) checkForStaleness() {
	c.lastReceive = time.Now()
	for {
		if c.closed {
			return
		}
		if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
			log.Println("WebRTC: No messages received for", SnowflakeTimeout,
				"seconds -- closing stale connection.")
			c.Close()
			return
		}
		<-time.After(time.Second)
	}
124
125
}

126
// As part of |Connector| interface.
127
128
func (c *WebRTCPeer) Connect() error {
	log.Println(c.id, " connecting...")
129
130
131
132
	// TODO: When go-webrtc is more stable, it's possible that a new
	// PeerConnection won't need to be re-prepared each time.
	err := c.preparePeerConnection()
	if err != nil {
133
		return err
134
	}
135
136
	err = c.establishDataChannel()
	if err != nil {
137
		return errors.New("WebRTC: Could not establish DataChannel.")
138
	}
139
140
141
142
	err = c.exchangeSDP()
	if err != nil {
		return err
	}
143
	go c.checkForStaleness()
144
	return nil
145
146
}

147
// Create and prepare callbacks on a new WebRTC PeerConnection.
148
func (c *WebRTCPeer) preparePeerConnection() error {
149
150
151
152
153
154
	if nil != c.pc {
		c.pc.Close()
		c.pc = nil
	}
	pc, err := webrtc.NewPeerConnection(c.config)
	if err != nil {
155
156
		log.Printf("NewPeerConnection ERROR: %s", err)
		return err
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
	}
	// Prepare PeerConnection callbacks.
	pc.OnNegotiationNeeded = func() {
		log.Println("WebRTC: OnNegotiationNeeded")
		go func() {
			offer, err := pc.CreateOffer()
			// TODO: Potentially timeout and retry if ICE isn't working.
			if err != nil {
				c.errorChannel <- err
				return
			}
			err = pc.SetLocalDescription(offer)
			if err != nil {
				c.errorChannel <- err
				return
			}
		}()
	}
	// Allow candidates to accumulate until OnIceComplete.
	pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
		log.Printf(candidate.Candidate)
	}
	// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
	pc.OnIceComplete = func() {
		log.Printf("WebRTC: OnIceComplete")
		c.offerChannel <- pc.LocalDescription()
	}
	// This callback is not expected, as the Client initiates the creation
	// of the data channel, not the remote peer.
	pc.OnDataChannel = func(channel *webrtc.DataChannel) {
		log.Println("OnDataChannel")
		panic("Unexpected OnDataChannel!")
	}
	c.pc = pc
	log.Println("WebRTC: PeerConnection created.")
192
	return nil
193
194
195
}

// Create a WebRTC DataChannel locally.
196
func (c *WebRTCPeer) establishDataChannel() error {
197
	if c.transport != nil {
198
199
		panic("Unexpected datachannel already exists!")
	}
200
	dc, err := c.pc.CreateDataChannel(c.id, webrtc.Init{})
201
202
203
204
	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
	// an SDP offer while other goroutines operating on this struct handle the
	// signaling. Eventually fires "OnOpen".
	if err != nil {
205
		log.Printf("CreateDataChannel ERROR: %s", err)
206
207
208
209
		return err
	}
	dc.OnOpen = func() {
		log.Println("WebRTC: DataChannel.OnOpen")
210
211
		if nil != c.transport {
			panic("WebRTC: transport already exists.")
212
		}
213
		// Flush buffered outgoing SOCKS data if necessary.
214
215
216
217
218
219
		if c.buffer.Len() > 0 {
			dc.Send(c.buffer.Bytes())
			log.Println("Flushed", c.buffer.Len(), "bytes.")
			c.buffer.Reset()
		}
		// Then enable the datachannel.
220
		c.transport = dc
221
222
223
	}
	dc.OnClose = func() {
		// Future writes will go to the buffer until a new DataChannel is available.
224
		if nil == c.transport {
225
226
227
			// Closed locally, as part of a reset.
			log.Println("WebRTC: DataChannel.OnClose [locally]")
			return
228
		}
229
230
231
		// Closed remotely, need to reset everything.
		// Disable the DataChannel as a write destination.
		log.Println("WebRTC: DataChannel.OnClose [remotely]")
232
		c.transport = nil
233
		c.Close()
234
235
	}
	dc.OnMessage = func(msg []byte) {
236
		if len(msg) <= 0 {
237
			log.Println("0 length message---")
238
		}
239
		c.BytesLogger.AddInbound(len(msg))
240
241
242
243
244
245
246
		n, err := c.writePipe.Write(msg)
		if err != nil {
			// TODO: Maybe shouldn't actually close.
			log.Println("Error writing to SOCKS pipe")
			c.writePipe.CloseWithError(err)
		}
		if n != len(msg) {
247
			log.Println("Error: short write")
248
249
			panic("short write")
		}
250
		c.lastReceive = time.Now()
251
252
253
254
255
	}
	log.Println("WebRTC: DataChannel created.")
	return nil
}

256
func (c *WebRTCPeer) sendOfferToBroker() {
257
	if nil == c.broker {
258
259
260
261
262
		return
	}
	offer := c.pc.LocalDescription()
	answer, err := c.broker.Negotiate(offer)
	if nil != err || nil == answer {
263
		log.Printf("BrokerChannel Error: %s", err)
264
265
266
267
268
269
270
		answer = nil
	}
	c.answerChannel <- answer
}

// Block until an SDP offer is available, send it to either
// the Broker or signal pipe, then await for the SDP answer.
271
func (c *WebRTCPeer) exchangeSDP() error {
272
273
	select {
	case offer := <-c.offerChannel:
274
		// Display for copy-paste when no broker available.
275
		if nil == c.broker {
276
277
			log.Printf("Please Copy & Paste the following to the peer:")
			log.Printf("----------------")
278
			log.Printf("\n\n" + offer.Serialize() + "\n\n")
279
280
281
			log.Printf("----------------")
		}
	case err := <-c.errorChannel:
282
		log.Println("Failed to prepare offer", err)
283
		c.Close()
284
285
		return err
	}
286
287
288
289
290
291
	// Keep trying the same offer until a valid answer arrives.
	var ok bool
	var answer *webrtc.SessionDescription = nil
	for nil == answer {
		go c.sendOfferToBroker()
		answer, ok = <-c.answerChannel // Blocks...
292
293
294
		if !ok || nil == answer {
			log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
			<-time.After(time.Second * ReconnectTimeout)
295
			answer = nil
296
		}
297
298
299
300
	}
	log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
	err := c.pc.SetRemoteDescription(answer)
	if nil != err {
301
302
		log.Println("WebRTC: Unable to SetRemoteDescription:", err)
		return err
303
304
	}
	return nil
305
306
}

307
// Close all channels and transports
308
func (c *WebRTCPeer) cleanup() {
309
310
311
312
313
314
315
316
317
	if nil != c.offerChannel {
		close(c.offerChannel)
	}
	if nil != c.answerChannel {
		close(c.answerChannel)
	}
	if nil != c.errorChannel {
		close(c.errorChannel)
	}
318
319
320
321
322
	// Close this side of the SOCKS pipe.
	if nil != c.writePipe {
		c.writePipe.Close()
		c.writePipe = nil
	}
323
	if nil != c.transport {
324
		log.Printf("WebRTC: closing DataChannel")
325
		dataChannel := c.transport
326
327
		// Setting transport to nil *before* dc Close indicates to OnClose that
		// this was locally triggered.
328
		c.transport = nil
329
		dataChannel.Close()
330
331
332
	}
	if nil != c.pc {
		log.Printf("WebRTC: closing PeerConnection")
333
334
335
336
		err := c.pc.Close()
		if nil != err {
			log.Printf("Error closing peerconnection...")
		}
337
338
339
		c.pc = nil
	}
}