snowflake.go 6.08 KB
Newer Older
1
2
3
package lib

import (
4
	"context"
5
6
7
8
	"errors"
	"io"
	"log"
	"net"
9
	"sync"
10
	"time"
11
12
13
14

	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
15
16
17
)

const (
18
	ReconnectTimeout = 10 * time.Second
19
	SnowflakeTimeout = 20 * time.Second
20
	// How long to wait for the OnOpen callback on a DataChannel.
21
	DataChannelTimeout = 10 * time.Second
22
23
)

24
25
26
27
28
type dummyAddr struct{}

func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string  { return "dummy" }

29
30
31
32
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
	clientID := turbotunnel.NewClientID()

	// We build a persistent KCP session on a sequence of ephemeral WebRTC
	// connections. This dialContext tells RedialPacketConn how to get a new
	// WebRTC connection when the previous one dies. Inside each WebRTC
	// connection, we use EncapsulationPacketConn to encode packets into a
	// stream.
	dialContext := func(ctx context.Context) (net.PacketConn, error) {
		log.Printf("redialing on same connection")
		// Obtain an available WebRTC remote. May block.
		conn := snowflakes.Pop()
		if conn == nil {
			return nil, errors.New("handler: Received invalid Snowflake")
		}
		log.Println("---- Handler: snowflake assigned ----")
		// Send the magic Turbo Tunnel token.
		_, err := conn.Write(turbotunnel.Token[:])
		if err != nil {
			return nil, err
		}
		// Send ClientID prefix.
		_, err = conn.Write(clientID[:])
		if err != nil {
			return nil, err
		}
		return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
59
	}
60
	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
61

62
63
64
65
66
67
	// conn is built on the underlying RedialPacketConn—when one WebRTC
	// connection dies, another one will be found to take its place. The
	// sequence of packets across multiple WebRTC connections drives the KCP
	// engine.
	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
	if err != nil {
68
69
		pconn.Close()
		return nil, nil, err
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	}
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Disable the dynamic congestion window (limit only by the
	// maximum of local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// On the KCP connection we overlay an smux session and stream.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 10 * time.Minute
	sess, err := smux.Client(conn, smuxConfig)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
	if err != nil {
		conn.Close()
		pconn.Close()
		return nil, nil, err
	}

	return pconn, sess, err
}

// sessionManager_ maintains a single global smux.Session that is shared among
// incoming SOCKS connections.
type sessionManager_ struct {
	mutex sync.Mutex
	sess  *smux.Session
}

// Get creates and returns a new global smux.Session if none exists yet. If one
// already exists, it returns the existing one. It monitors the returned session
// and if it ever fails, sets things up so the next call to Get will create a
// new session.
func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()

	if manager.sess == nil {
		log.Printf("starting a new session")
		pconn, sess, err := newSession(snowflakes)
		if err != nil {
			return nil, err
		}
		manager.sess = sess
		go func() {
			// If the session dies, set it to be recreated.
			for {
				<-time.After(5 * time.Second)
				if sess.IsClosed() {
					break
				}
			}
			log.Printf("discarding finished session")
			// Close the underlying to force any ongoing WebRTC
			// connection to close as well, and relinquish the
			// SnowflakeCollector.
			pconn.Close()
			manager.mutex.Lock()
			manager.sess = nil
			manager.mutex.Unlock()
		}()
	} else {
		log.Printf("reusing the existing session")
	}

	return manager.sess, nil
}

var sessionManager = sessionManager_{}

// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
145
146
147
148
149
150
151
152
153
154
155
func Handler(socks net.Conn, tongue Tongue) error {
	// Prepare to collect remote WebRTC peers.
	snowflakes := NewPeers(1)
	snowflakes.Tongue = tongue

	// Use a real logger to periodically output how much traffic is happening.
	snowflakes.BytesLogger = NewBytesSyncLogger()

	log.Printf("---- Handler: begin collecting snowflakes ---")
	go connectLoop(snowflakes)

156
157
	// Return the global smux.Session.
	sess, err := sessionManager.Get(snowflakes)
158
159
160
	if err != nil {
		return err
	}
161
162

	// On the smux session we overlay a stream.
163
164
165
166
167
	stream, err := sess.OpenStream()
	if err != nil {
		return err
	}
	defer stream.Close()
168

169
170
171
172
	// Begin exchanging data.
	log.Printf("---- Handler: begin stream %v ---", stream.ID())
	copyLoop(socks, stream)
	log.Printf("---- Handler: closed stream %v ---", stream.ID())
173
174
	snowflakes.End()
	log.Printf("---- Handler: end collecting snowflakes ---")
175
176
177
	return nil
}

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
	for {
		// Check if ending is necessary.
		_, err := snowflakes.Collect()
		if err != nil {
			log.Printf("WebRTC: %v  Retrying in %v...",
				err, ReconnectTimeout)
		}
		select {
		case <-time.After(ReconnectTimeout):
			continue
		case <-snowflakes.Melted():
			log.Println("ConnectLoop: stopped.")
			return
		}
	}
}

198
// Exchanges bytes between two ReadWriters.
199
200
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
201
	done := make(chan struct{}, 2)
202
	go func() {
203
		if _, err := io.Copy(socks, stream); err != nil {
204
			log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
205
		}
206
		done <- struct{}{}
207
208
	}()
	go func() {
209
210
		if _, err := io.Copy(stream, socks); err != nil {
			log.Printf("copying SOCKS to stream resulted in error: %v", err)
211
		}
212
		done <- struct{}{}
213
	}()
214
	<-done
215
216
	log.Println("copy loop ended")
}