snowflake.go 11.9 KB
Newer Older
1
2
3
4
package main

import (
	"bytes"
David Fifield's avatar
fmt    
David Fifield committed
5
	"crypto/rand"
6
7
8
9
10
11
12
13
14
	"encoding/base64"
	"flag"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"net"
	"net/http"
	"net/url"
David Fifield's avatar
David Fifield committed
15
	"os"
16
	"regexp"
17
18
19
20
	"strings"
	"sync"
	"time"

21
	"git.torproject.org/pluggable-transports/snowflake.git/common/messages"
22
	"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
23
	"git.torproject.org/pluggable-transports/snowflake.git/common/util"
24
25
	"git.torproject.org/pluggable-transports/snowflake.git/common/websocketconn"
	"github.com/gorilla/websocket"
26
	"github.com/pion/webrtc/v2"
27
28
)

David Fifield's avatar
David Fifield committed
29
const defaultBrokerURL = "https://snowflake-broker.bamsoftware.com/"
30
31
const defaultRelayURL = "wss://snowflake.bamsoftware.com/"
const defaultSTUNURL = "stun:stun.l.google.com:19302"
32
const pollInterval = 5 * time.Second
33

34
35
//amount of time after sending an SDP answer before the proxy assumes the
//client is not going to connect
Cecylia Bocovich's avatar
Cecylia Bocovich committed
36
const dataChannelTimeout = 20 * time.Second
37

38
39
const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request

40
var broker *Broker
41
var relayURL string
42
43
44
45
46
47
48

const (
	sessionIDLength = 16
)

var (
	tokens chan bool
49
	config webrtc.Configuration
50
51
52
	client http.Client
)

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
var remoteIPPatterns = []*regexp.Regexp{
	/* IPv4 */
	regexp.MustCompile(`(?m)^c=IN IP4 ([\d.]+)(?:(?:\/\d+)?\/\d+)?(:? |\r?\n)`),
	/* IPv6 */
	regexp.MustCompile(`(?m)^c=IN IP6 ([0-9A-Fa-f:.]+)(?:\/\d+)?(:? |\r?\n)`),
}

// https://tools.ietf.org/html/rfc4566#section-5.7
func remoteIPFromSDP(sdp string) net.IP {
	for _, pattern := range remoteIPPatterns {
		m := pattern.FindStringSubmatch(sdp)
		if m != nil {
			// Ignore parsing errors, ParseIP returns nil.
			return net.ParseIP(m[1])
		}
	}
	return nil
}

72
73
74
75
76
type Broker struct {
	url       *url.URL
	transport http.RoundTripper
}

77
78
79
80
type webRTCConn struct {
	dc *webrtc.DataChannel
	pc *webrtc.PeerConnection
	pr *io.PipeReader
81
82
83

	lock sync.Mutex // Synchronization for DataChannel destruction
	once sync.Once  // Synchronization for PeerConnection destruction
84
85
86
87
88
89
90
}

func (c *webRTCConn) Read(b []byte) (int, error) {
	return c.pr.Read(b)
}

func (c *webRTCConn) Write(b []byte) (int, error) {
91
92
	c.lock.Lock()
	defer c.lock.Unlock()
93
94
95
	if c.dc != nil {
		c.dc.Send(b)
	}
96
97
98
	return len(b), nil
}

99
100
func (c *webRTCConn) Close() (err error) {
	c.once.Do(func() {
101
		err = c.pc.Close()
102
103
	})
	return
104
105
106
107
108
109
110
}

func (c *webRTCConn) LocalAddr() net.Addr {
	return nil
}

func (c *webRTCConn) RemoteAddr() net.Addr {
111
	//Parse Remote SDP offer and extract client IP
112
	clientIP := remoteIPFromSDP(c.pc.RemoteDescription().SDP)
113
114
115
	if clientIP == nil {
		return nil
	}
Arlo Breault's avatar
Arlo Breault committed
116
	return &net.IPAddr{IP: clientIP, Zone: ""}
117
118
119
}

func (c *webRTCConn) SetDeadline(t time.Time) error {
120
	// nolint: golint
121
122
123
124
	return fmt.Errorf("SetDeadline not implemented")
}

func (c *webRTCConn) SetReadDeadline(t time.Time) error {
125
	// nolint: golint
126
127
128
129
	return fmt.Errorf("SetReadDeadline not implemented")
}

func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
130
	// nolint: golint
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
	return fmt.Errorf("SetWriteDeadline not implemented")
}

func getToken() {
	<-tokens
}

func retToken() {
	tokens <- true
}

func genSessionID() string {
	buf := make([]byte, sessionIDLength)
	_, err := rand.Read(buf)
	if err != nil {
		panic(err.Error())
	}
	return strings.TrimRight(base64.StdEncoding.EncodeToString(buf), "=")
}

151
func limitedRead(r io.Reader, limit int64) ([]byte, error) {
152
	p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1})
153
154
	if err != nil {
		return p, err
155
156
	} else if int64(len(p)) == limit+1 {
		return p[0:limit], io.ErrUnexpectedEOF
157
158
159
160
	}
	return p, err
}

161
162
func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
	brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
163
	timeOfNextPoll := time.Now()
164
	for {
165
166
167
168
169
170
171
172
173
174
175
		// Sleep until we're scheduled to poll again.
		now := time.Now()
		time.Sleep(timeOfNextPoll.Sub(now))
		// Compute the next time to poll -- if it's in the past, that
		// means that the POST took longer than pollInterval, so we're
		// allowed to do another one immediately.
		timeOfNextPoll = timeOfNextPoll.Add(pollInterval)
		if timeOfNextPoll.Before(now) {
			timeOfNextPoll = now
		}

176
		body, err := messages.EncodePollRequest(sid, "standalone")
177
178
179
180
		if err != nil {
			log.Printf("Error encoding poll message: %s", err.Error())
			return nil
		}
181
182
		req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
		resp, err := b.transport.RoundTrip(req)
183
		if err != nil {
David Fifield's avatar
David Fifield committed
184
			log.Printf("error polling broker: %s", err)
185
186
187
188
189
		} else {
			defer resp.Body.Close()
			if resp.StatusCode != http.StatusOK {
				log.Printf("broker returns: %d", resp.StatusCode)
			} else {
190
				body, err := limitedRead(resp.Body, readLimit)
191
				if err != nil {
David Fifield's avatar
David Fifield committed
192
					log.Printf("error reading broker response: %s", err)
193
				} else {
194
195
196
197
198
199
200
201

					offer, err := messages.DecodePollResponse(body)
					if err != nil {
						log.Printf("error reading broker response: %s", err.Error())
						log.Printf("body: %s", body)
						return nil
					}
					if offer != "" {
202
						return util.DeserializeSessionDescription(offer)
203
					}
204
205
206
207
208
209
				}
			}
		}
	}
}

210
211
func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
	brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
212
	answer := string([]byte(util.SerializeSessionDescription(pc.LocalDescription())))
213
	body, err := messages.EncodeAnswerRequest(answer, sid)
214
215
216
	if err != nil {
		return err
	}
217
218
	req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
	resp, err := b.transport.RoundTrip(req)
219
220
221
222
223
224
	if err != nil {
		return err
	}
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("broker returned %d", resp.StatusCode)
	}
225

226
	body, err = limitedRead(resp.Body, readLimit)
227
228
229
230
231
232
233
234
235
236
237
	if err != nil {
		return fmt.Errorf("error reading broker response: %s", err)
	}
	success, err := messages.DecodeAnswerResponse(body)
	if err != nil {
		return err
	}
	if !success {
		return fmt.Errorf("broker returned client timeout")
	}

238
239
240
	return nil
}

Arlo Breault's avatar
Arlo Breault committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) {
	var wg sync.WaitGroup
	copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
		defer wg.Done()
		if _, err := io.Copy(dst, src); err != nil {
			log.Printf("io.Copy inside CopyLoop generated an error: %v", err)
		}
		dst.Close()
		src.Close()
	}
	wg.Add(2)
	go copyer(c1, c2)
	go copyer(c2, c1)
	wg.Wait()
}

257
258
259
260
261
// We pass conn.RemoteAddr() as an additional parameter, rather than calling
// conn.RemoteAddr() inside this function, as a workaround for a hang that
// otherwise occurs inside of conn.pc.RemoteDescription() (called by
// RemoteAddr). https://bugs.torproject.org/18628#comment:8
func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
262
263
264
	defer conn.Close()
	defer retToken()

265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
	u, err := url.Parse(relayURL)
	if err != nil {
		log.Fatalf("invalid relay url: %s", err)
	}

	// Retrieve client IP address
	if remoteAddr != nil {
		// Encode client IP address in relay URL
		q := u.Query()
		clientIP := remoteAddr.String()
		q.Set("client_ip", clientIP)
		u.RawQuery = q.Encode()
	} else {
		log.Printf("no remote address given in websocket")
	}

281
	ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
282
	if err != nil {
David Fifield's avatar
David Fifield committed
283
		log.Printf("error dialing relay: %s", err)
284
285
		return
	}
286
	wsConn := websocketconn.New(ws)
287
288
	log.Printf("connected to relay")
	defer wsConn.Close()
289
	CopyLoop(conn, wsConn)
290
291
292
293
294
295
296
	log.Printf("datachannelHandler ends")
}

// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
297
func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
298
299
300
301
	pc, err := webrtc.NewPeerConnection(config)
	if err != nil {
		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
	}
302
	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
303
		log.Println("OnDataChannel")
Cecylia Bocovich's avatar
Cecylia Bocovich committed
304
		close(dataChan)
305
306

		pr, pw := io.Pipe()
307
308
		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}

309
		dc.OnOpen(func() {
310
			log.Println("OnOpen channel")
311
312
		})
		dc.OnClose(func() {
313
314
			conn.lock.Lock()
			defer conn.lock.Unlock()
315
			log.Println("OnClose channel")
316
			conn.dc = nil
317
			dc.Close()
318
			pw.Close()
319
320
		})
		dc.OnMessage(func(msg webrtc.DataChannelMessage) {
321
322
			var n int
			n, err = pw.Write(msg.Data)
323
			if err != nil {
324
325
326
				if inerr := pw.CloseWithError(err); inerr != nil {
					log.Printf("close with error generated an error: %v", inerr)
				}
327
			}
328
			if n != len(msg.Data) {
329
330
				panic("short write")
			}
331
		})
332
333

		go datachannelHandler(conn, conn.RemoteAddr())
334
	})
335

336
	err = pc.SetRemoteDescription(*sdp)
337
	if err != nil {
338
339
340
		if inerr := pc.Close(); inerr != nil {
			log.Printf("unable to call pc.Close after pc.SetRemoteDescription with error: %v", inerr)
		}
341
342
343
344
		return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
	}
	log.Println("sdp offer successfully received.")

Cecylia Bocovich's avatar
Cecylia Bocovich committed
345
	log.Println("Generating answer...")
346
	answer, err := pc.CreateAnswer(nil)
347
348
349
	// blocks on ICE gathering. we need to add a timeout if needed
	// not putting this in a separate go routine, because we need
	// SetLocalDescription(answer) to be called before sendAnswer
Cecylia Bocovich's avatar
Cecylia Bocovich committed
350
	if err != nil {
351
352
353
		if inerr := pc.Close(); inerr != nil {
			log.Printf("ICE gathering has generated an error when calling pc.Close: %v", inerr)
		}
Cecylia Bocovich's avatar
Cecylia Bocovich committed
354
355
		return nil, err
	}
356

Cecylia Bocovich's avatar
Cecylia Bocovich committed
357
358
	err = pc.SetLocalDescription(answer)
	if err != nil {
359
360
361
		if err = pc.Close(); err != nil {
			log.Printf("pc.Close after setting local description returned : %v", err)
		}
362
363
		return nil, err
	}
364

365
366
367
368
	return pc, nil
}

func runSession(sid string) {
369
	offer := broker.pollOffer(sid)
370
371
372
373
374
	if offer == nil {
		log.Printf("bad offer from broker")
		retToken()
		return
	}
375
376
	dataChan := make(chan struct{})
	pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
377
	if err != nil {
David Fifield's avatar
David Fifield committed
378
		log.Printf("error making WebRTC connection: %s", err)
379
380
381
		retToken()
		return
	}
382
	err = broker.sendAnswer(sid, pc)
383
	if err != nil {
David Fifield's avatar
David Fifield committed
384
		log.Printf("error sending answer to client through broker: %s", err)
385
386
387
		if inerr := pc.Close(); inerr != nil {
			log.Printf("error calling pc.Close: %v", inerr)
		}
388
389
390
		retToken()
		return
	}
391
392
393
394
395
396
397
398
	// Set a timeout on peerconnection. If the connection state has not
	// advanced to PeerConnectionStateConnected in this time,
	// destroy the peer connection and return the token.
	select {
	case <-dataChan:
		log.Println("Connection successful.")
	case <-time.After(dataChannelTimeout):
		log.Println("Timed out waiting for client to open data channel.")
399
400
401
		if err := pc.Close(); err != nil {
			log.Printf("error calling pc.Close: %v", err)
		}
402
403
		retToken()
	}
404
405
406
}

func main() {
407
408
	var capacity uint
	var stunURL string
David Fifield's avatar
David Fifield committed
409
	var logFilename string
410
	var rawBrokerURL string
411
412

	flag.UintVar(&capacity, "capacity", 10, "maximum concurrent clients")
413
	flag.StringVar(&rawBrokerURL, "broker", defaultBrokerURL, "broker URL")
414
415
	flag.StringVar(&relayURL, "relay", defaultRelayURL, "websocket relay URL")
	flag.StringVar(&stunURL, "stun", defaultSTUNURL, "stun URL")
David Fifield's avatar
David Fifield committed
416
	flag.StringVar(&logFilename, "log", "", "log filename")
417
418
	flag.Parse()

419
	var logOutput io.Writer = os.Stderr
David Fifield's avatar
David Fifield committed
420
	log.SetFlags(log.LstdFlags | log.LUTC)
David Fifield's avatar
David Fifield committed
421
	if logFilename != "" {
David Fifield's avatar
fmt    
David Fifield committed
422
		f, err := os.OpenFile(logFilename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
David Fifield's avatar
David Fifield committed
423
424
425
426
		if err != nil {
			log.Fatal(err)
		}
		defer f.Close()
427
		logOutput = io.MultiWriter(os.Stderr, f)
David Fifield's avatar
David Fifield committed
428
	}
429
430
	//We want to send the log output through our scrubber first
	log.SetOutput(&safelog.LogScrubber{Output: logOutput})
David Fifield's avatar
David Fifield committed
431

432
433
	log.Println("starting")

434
	var err error
435
436
	broker = new(Broker)
	broker.url, err = url.Parse(rawBrokerURL)
437
438
439
	if err != nil {
		log.Fatalf("invalid broker url: %s", err)
	}
440
	_, err = url.Parse(stunURL)
441
442
443
	if err != nil {
		log.Fatalf("invalid stun url: %s", err)
	}
444
	_, err = url.Parse(relayURL)
445
446
447
448
	if err != nil {
		log.Fatalf("invalid relay url: %s", err)
	}

449
	broker.transport = http.DefaultTransport.(*http.Transport)
450
451
452
453
454
455
456
	config = webrtc.Configuration{
		ICEServers: []webrtc.ICEServer{
			{
				URLs: []string{stunURL},
			},
		},
	}
457
458
	tokens = make(chan bool, capacity)
	for i := uint(0); i < capacity; i++ {
459
460
461
462
463
464
465
466
467
		tokens <- true
	}

	for {
		getToken()
		sessionID := genSessionID()
		runSession(sessionID)
	}
}