snowflake.go 5.22 KB
Newer Older
1
2
3
package lib

import (
4
	"context"
5
6
7
8
	"errors"
	"io"
	"log"
	"net"
9
	"sync"
10
	"time"
11
12
13
14

	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
15
16
17
)

const (
18
	ReconnectTimeout = 10 * time.Second
19
	SnowflakeTimeout = 20 * time.Second
20
	// How long to wait for the OnOpen callback on a DataChannel.
21
	DataChannelTimeout = 10 * time.Second
22
23
)

24
25
26
27
28
type dummyAddr struct{}

func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string  { return "dummy" }

29
30
31
32
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
	clientID := turbotunnel.NewClientID()

	// We build a persistent KCP session on a sequence of ephemeral WebRTC
	// connections. This dialContext tells RedialPacketConn how to get a new
	// WebRTC connection when the previous one dies. Inside each WebRTC
	// connection, we use EncapsulationPacketConn to encode packets into a
	// stream.
	dialContext := func(ctx context.Context) (net.PacketConn, error) {
		log.Printf("redialing on same connection")
		// Obtain an available WebRTC remote. May block.
		conn := snowflakes.Pop()
		if conn == nil {
			return nil, errors.New("handler: Received invalid Snowflake")
		}
		log.Println("---- Handler: snowflake assigned ----")
		// Send the magic Turbo Tunnel token.
		_, err := conn.Write(turbotunnel.Token[:])
		if err != nil {
			return nil, err
		}
		// Send ClientID prefix.
		_, err = conn.Write(clientID[:])
		if err != nil {
			return nil, err
		}
		return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
59
	}
60
	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
61

62
63
64
65
66
67
	// conn is built on the underlying RedialPacketConn—when one WebRTC
	// connection dies, another one will be found to take its place. The
	// sequence of packets across multiple WebRTC connections drives the KCP
	// engine.
	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
	if err != nil {
68
69
		pconn.Close()
		return nil, nil, err
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	}
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Disable the dynamic congestion window (limit only by the
	// maximum of local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// On the KCP connection we overlay an smux session and stream.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 10 * time.Minute
	sess, err := smux.Client(conn, smuxConfig)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
	if err != nil {
		conn.Close()
		pconn.Close()
		return nil, nil, err
	}

	return pconn, sess, err
}

// sessionManager_ maintains a single global smux.Session that is shared among
// incoming SOCKS connections.
type sessionManager_ struct {
	mutex sync.Mutex
	sess  *smux.Session
}

// Get creates and returns a new global smux.Session if none exists yet. If one
// already exists, it returns the existing one. It monitors the returned session
// and if it ever fails, sets things up so the next call to Get will create a
// new session.
func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()

	if manager.sess == nil {
		log.Printf("starting a new session")
		pconn, sess, err := newSession(snowflakes)
		if err != nil {
			return nil, err
		}
		manager.sess = sess
		go func() {
			// If the session dies, set it to be recreated.
			for {
				<-time.After(5 * time.Second)
				if sess.IsClosed() {
					break
				}
			}
			log.Printf("discarding finished session")
			// Close the underlying to force any ongoing WebRTC
			// connection to close as well, and relinquish the
			// SnowflakeCollector.
			pconn.Close()
			manager.mutex.Lock()
			manager.sess = nil
			manager.mutex.Unlock()
		}()
	} else {
		log.Printf("reusing the existing session")
	}

	return manager.sess, nil
}

var sessionManager = sessionManager_{}

// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
	// Return the global smux.Session.
	sess, err := sessionManager.Get(snowflakes)
148
149
150
	if err != nil {
		return err
	}
151
152

	// On the smux session we overlay a stream.
153
154
155
156
157
	stream, err := sess.OpenStream()
	if err != nil {
		return err
	}
	defer stream.Close()
158

159
160
161
162
	// Begin exchanging data.
	log.Printf("---- Handler: begin stream %v ---", stream.ID())
	copyLoop(socks, stream)
	log.Printf("---- Handler: closed stream %v ---", stream.ID())
163
164
165
166
	return nil
}

// Exchanges bytes between two ReadWriters.
167
168
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
169
	done := make(chan struct{}, 2)
170
	go func() {
171
		if _, err := io.Copy(socks, stream); err != nil {
172
			log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
173
		}
174
		done <- struct{}{}
175
176
	}()
	go func() {
177
178
		if _, err := io.Copy(stream, socks); err != nil {
			log.Printf("copying SOCKS to stream resulted in error: %v", err)
179
		}
180
		done <- struct{}{}
181
	}()
182
	<-done
183
184
	log.Println("copy loop ended")
}