snowflake.go 6.09 KB
Newer Older
1
2
3
package lib

import (
4
	"context"
5
6
7
8
	"errors"
	"io"
	"log"
	"net"
9
	"sync"
10
	"time"
11
12
13
14

	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
15
16
17
)

const (
18
	ReconnectTimeout = 10 * time.Second
19
	SnowflakeTimeout = 20 * time.Second
20
	// How long to wait for the OnOpen callback on a DataChannel.
21
	DataChannelTimeout = 10 * time.Second
22
23
)

24
25
26
27
28
type dummyAddr struct{}

func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string  { return "dummy" }

29
30
31
32
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
	clientID := turbotunnel.NewClientID()

	// We build a persistent KCP session on a sequence of ephemeral WebRTC
	// connections. This dialContext tells RedialPacketConn how to get a new
	// WebRTC connection when the previous one dies. Inside each WebRTC
	// connection, we use EncapsulationPacketConn to encode packets into a
	// stream.
	dialContext := func(ctx context.Context) (net.PacketConn, error) {
		log.Printf("redialing on same connection")
		// Obtain an available WebRTC remote. May block.
		conn := snowflakes.Pop()
		if conn == nil {
			return nil, errors.New("handler: Received invalid Snowflake")
		}
		log.Println("---- Handler: snowflake assigned ----")
		// Send the magic Turbo Tunnel token.
		_, err := conn.Write(turbotunnel.Token[:])
		if err != nil {
			return nil, err
		}
		// Send ClientID prefix.
		_, err = conn.Write(clientID[:])
		if err != nil {
			return nil, err
		}
		return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
59
	}
60
	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
61

62
63
64
65
66
67
	// conn is built on the underlying RedialPacketConn—when one WebRTC
	// connection dies, another one will be found to take its place. The
	// sequence of packets across multiple WebRTC connections drives the KCP
	// engine.
	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
	if err != nil {
68
69
		pconn.Close()
		return nil, nil, err
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	}
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Disable the dynamic congestion window (limit only by the
	// maximum of local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// On the KCP connection we overlay an smux session and stream.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 10 * time.Minute
	sess, err := smux.Client(conn, smuxConfig)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
	if err != nil {
		conn.Close()
		pconn.Close()
		return nil, nil, err
	}

	return pconn, sess, err
}

// sessionManager_ maintains a single global smux.Session that is shared among
// incoming SOCKS connections.
type sessionManager_ struct {
	mutex sync.Mutex
	sess  *smux.Session
}

// Get creates and returns a new global smux.Session if none exists yet. If one
// already exists, it returns the existing one. It monitors the returned session
// and if it ever fails, sets things up so the next call to Get will create a
// new session.
func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()

	if manager.sess == nil {
		log.Printf("starting a new session")
		pconn, sess, err := newSession(snowflakes)
		if err != nil {
			return nil, err
		}
		manager.sess = sess
		go func() {
			// If the session dies, set it to be recreated.
			for {
				<-time.After(5 * time.Second)
				if sess.IsClosed() {
					break
				}
			}
			log.Printf("discarding finished session")
			// Close the underlying to force any ongoing WebRTC
			// connection to close as well, and relinquish the
			// SnowflakeCollector.
			pconn.Close()
			manager.mutex.Lock()
			manager.sess = nil
			manager.mutex.Unlock()
		}()
	} else {
		log.Printf("reusing the existing session")
	}

	return manager.sess, nil
}

var sessionManager = sessionManager_{}

// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
145
146
func Handler(socks net.Conn, tongue Tongue) error {
	// Prepare to collect remote WebRTC peers.
147
148
149
150
	snowflakes, err := NewPeers(tongue)
	if err != nil {
		return err
	}
151
152
153
154
155
156
157

	// Use a real logger to periodically output how much traffic is happening.
	snowflakes.BytesLogger = NewBytesSyncLogger()

	log.Printf("---- Handler: begin collecting snowflakes ---")
	go connectLoop(snowflakes)

158
159
	// Return the global smux.Session.
	sess, err := sessionManager.Get(snowflakes)
160
161
162
	if err != nil {
		return err
	}
163
164

	// On the smux session we overlay a stream.
165
166
167
168
169
	stream, err := sess.OpenStream()
	if err != nil {
		return err
	}
	defer stream.Close()
170

171
172
173
174
	// Begin exchanging data.
	log.Printf("---- Handler: begin stream %v ---", stream.ID())
	copyLoop(socks, stream)
	log.Printf("---- Handler: closed stream %v ---", stream.ID())
175
176
	snowflakes.End()
	log.Printf("---- Handler: end collecting snowflakes ---")
177
178
179
	return nil
}

180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
	for {
		// Check if ending is necessary.
		_, err := snowflakes.Collect()
		if err != nil {
			log.Printf("WebRTC: %v  Retrying in %v...",
				err, ReconnectTimeout)
		}
		select {
		case <-time.After(ReconnectTimeout):
			continue
		case <-snowflakes.Melted():
			log.Println("ConnectLoop: stopped.")
			return
		}
	}
}

200
// Exchanges bytes between two ReadWriters.
201
202
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
203
	done := make(chan struct{}, 2)
204
	go func() {
205
		if _, err := io.Copy(socks, stream); err != nil {
206
			log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
207
		}
208
		done <- struct{}{}
209
210
	}()
	go func() {
211
212
		if _, err := io.Copy(stream, socks); err != nil {
			log.Printf("copying SOCKS to stream resulted in error: %v", err)
213
		}
214
		done <- struct{}{}
215
	}()
216
	<-done
217
218
	log.Println("copy loop ended")
}