snowflake.go 6.11 KB
Newer Older
1
// Client transport plugin for the Snowflake pluggable transport.
2
3
4
package main

import (
5
	"errors"
6
	"flag"
7
	"io"
8
	"io/ioutil"
9
10
11
12
	"log"
	"net"
	"os"
	"os/signal"
13
	"strings"
14
15
	"sync"
	"syscall"
16
	"time"
17

18
	"git.torproject.org/pluggable-transports/goptlib.git"
19
20
21
	"github.com/keroserene/go-webrtc"
)

22
const (
23
24
	ReconnectTimeout         = 10
	DefaultSnowflakeCapacity = 1
25
	SnowflakeTimeout         = 30
26
27
)

28
29
30
31
// When a connection handler starts, +1 is written to this channel; when it
// ends, -1 is written.
var handlerChan = make(chan int)

32
33
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
34
func ConnectLoop(snowflakes SnowflakeCollector) {
35
	for {
36
		// Check if ending is necessary.
37
		_, err := snowflakes.Collect()
38
		if nil != err {
39
			log.Println("WebRTC:", err,
40
				" Retrying in", ReconnectTimeout, "seconds...")
41
42
43
		}
		select {
		case <-time.After(time.Second * ReconnectTimeout):
44
			continue
45
46
47
		case <-snowflakes.Melted():
			log.Println("ConnectLoop: stopped.")
			return
48
49
50
51
		}
	}
}

52
// Accept local SOCKS connections and pass them to the handler.
53
func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
54
	defer ln.Close()
55
	log.Println("Started SOCKS listener.")
56
	for {
57
		log.Println("SOCKS listening...")
58
		conn, err := ln.AcceptSocks()
59
		log.Println("SOCKS accepted: ", conn.Req)
60
61
62
63
64
65
66
67
68
69
		if err != nil {
			if e, ok := err.(net.Error); ok && e.Temporary() {
				continue
			}
			return err
		}
		err = handler(conn, snowflakes)
		if err != nil {
			log.Printf("handler error: %s", err)
		}
70
	}
71
72
}

73
74
75
// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
76
77
78
79
	handlerChan <- 1
	defer func() {
		handlerChan <- -1
	}()
80
81
	// Obtain an available WebRTC remote. May block.
	snowflake := snowflakes.Pop()
82
83
	if nil == snowflake {
		socks.Reject()
84
		return errors.New("handler: Received invalid Snowflake")
85
	}
86
	defer socks.Close()
87
	defer snowflake.Close()
88
	log.Println("---- Handler: snowflake assigned ----")
89
	err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
90
91
92
93
	if err != nil {
		return err
	}

94
	go func() {
95
		// When WebRTC resets, close the SOCKS connection too.
96
97
98
		snowflake.WaitForReset()
		socks.Close()
	}()
99

100
101
	// Begin exchanging data. Either WebRTC or localhost SOCKS will close first.
	// In eithercase, this closes the handler and induces a new handler.
102
103
	copyLoop(socks, snowflake)
	log.Println("---- Handler: closed ---")
104
105
106
	return nil
}

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Exchanges bytes between two ReadWriters.
// (In this case, between a SOCKS and WebRTC connection.)
func copyLoop(a, b io.ReadWriter) {
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		io.Copy(b, a)
		wg.Done()
	}()
	go func() {
		io.Copy(a, b)
		wg.Done()
	}()
	wg.Wait()
	log.Println("copy loop ended")
}

124
func main() {
125
	iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
126
127
	brokerURL := flag.String("url", "", "URL of signaling broker")
	frontDomain := flag.String("front", "", "front domain")
128
	logFilename := flag.String("log", "", "name of log file")
129
130
	max := flag.Int("max", DefaultSnowflakeCapacity,
		"capacity for number of multiplexed WebRTC peers")
131
132
	flag.Parse()

133
	webrtc.SetLoggingVerbosity(1)
134
	log.SetFlags(log.LstdFlags | log.LUTC)
135
136
137
138
139
140
141
142
143
144
145
146
	if *logFilename != "" {
		logFile, err := os.OpenFile(*logFilename,
			os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
		if err != nil {
			log.Fatal(err)
		}
		defer logFile.Close()
		log.SetOutput(logFile)
	}

	log.Println("\n\n\n --- Starting Snowflake Client ---")

147
148
149
150
151
152
153
154
155
	var iceServers IceServerList
	log.Println("IceServerList:")
	for _, server := range strings.Split(*iceServersCommas, ",") {
		// TODO: STUN / TURN url format validation?
		log.Println(server)
		option := webrtc.OptionIceServer(server)
		iceServers = append(iceServers, option)
	}

156
	// Prepare to collect remote WebRTC peers.
157
	snowflakes := NewPeers(*max)
158
159
160
161
162
163
164
165
	if "" != *brokerURL {
		// Use potentially domain-fronting broker to rendezvous.
		broker := NewBrokerChannel(*brokerURL, *frontDomain, CreateBrokerTransport())
		snowflakes.Tongue = NewWebRTCDialer(broker, iceServers)
	} else {
		// Otherwise, use manual copy and pasting of SDP messages.
		snowflakes.Tongue = NewCopyPasteDialer(iceServers)
	}
166
167
168
169
	if nil == snowflakes.Tongue {
		log.Fatal("Unable to prepare rendezvous method.")
		return
	}
170
	// Use a real logger to periodically output how much traffic is happening.
171
	snowflakes.BytesLogger = &BytesSyncLogger{
172
173
174
		inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
		inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
	}
175
	go snowflakes.BytesLogger.Log()
176

177
	go ConnectLoop(snowflakes)
178

179
180
	// Begin goptlib client process.
	ptInfo, err := pt.ClientSetup(nil)
181
	if err != nil {
David Fifield's avatar
David Fifield committed
182
		log.Fatal(err)
183
184
185
186
187
188
189
190
	}
	if ptInfo.ProxyURL != nil {
		pt.ProxyError("proxy is not supported")
		os.Exit(1)
	}
	listeners := make([]net.Listener, 0)
	for _, methodName := range ptInfo.MethodNames {
		switch methodName {
Arlo Breault's avatar
Arlo Breault committed
191
		case "snowflake":
192
			// TODO: Be able to recover when SOCKS dies.
193
194
195
196
197
			ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
			if err != nil {
				pt.CmethodError(methodName, err.Error())
				break
			}
198
			go socksAcceptLoop(ln, snowflakes)
199
200
201
202
203
204
205
206
207
208
209
			pt.Cmethod(methodName, ln.Version(), ln.Addr())
			listeners = append(listeners, ln)
		default:
			pt.CmethodError(methodName, "no such method")
		}
	}
	pt.CmethodsDone()

	var numHandlers int = 0
	var sig os.Signal
	sigChan := make(chan os.Signal, 1)
210
	signal.Notify(sigChan, syscall.SIGTERM)
211

212
213
214
215
216
217
218
219
220
221
	if os.Getenv("TOR_PT_EXIT_ON_STDIN_CLOSE") == "1" {
		// This environment variable means we should treat EOF on stdin
		// just like SIGTERM: https://bugs.torproject.org/15435.
		go func() {
			io.Copy(ioutil.Discard, os.Stdin)
			log.Printf("synthesizing SIGTERM because of stdin close")
			sigChan <- syscall.SIGTERM
		}()
	}

222
	// keep track of handlers and wait for a signal
223
224
225
226
227
228
229
230
	sig = nil
	for sig == nil {
		select {
		case n := <-handlerChan:
			numHandlers += n
		case sig = <-sigChan:
		}
	}
231
232

	// signal received, shut down
233
234
235
	for _, ln := range listeners {
		ln.Close()
	}
236
	snowflakes.End()
237
238
239
240
	for n := range handlerChan {
		numHandlers += n
		if numHandlers == 0 {
			break
241
242
		}
	}
243
	log.Println("snowflake is done.")
244
}