snowflake.go 5.12 KB
Newer Older
1
2
3
package lib

import (
4
	"context"
5
6
7
8
	"errors"
	"io"
	"log"
	"net"
9
	"sync"
10
	"time"
11
12
13
14

	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux"
15
16
17
)

const (
18
19
	ReconnectTimeout = 10 * time.Second
	SnowflakeTimeout = 30 * time.Second
20
21
)

22
23
24
25
26
type dummyAddr struct{}

func (addr dummyAddr) Network() string { return "dummy" }
func (addr dummyAddr) String() string  { return "dummy" }

27
28
29
30
// newSession returns a new smux.Session and the net.PacketConn it is running
// over. The net.PacketConn successively connects through Snowflake proxies
// pulled from snowflakes.
func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
	clientID := turbotunnel.NewClientID()

	// We build a persistent KCP session on a sequence of ephemeral WebRTC
	// connections. This dialContext tells RedialPacketConn how to get a new
	// WebRTC connection when the previous one dies. Inside each WebRTC
	// connection, we use EncapsulationPacketConn to encode packets into a
	// stream.
	dialContext := func(ctx context.Context) (net.PacketConn, error) {
		log.Printf("redialing on same connection")
		// Obtain an available WebRTC remote. May block.
		conn := snowflakes.Pop()
		if conn == nil {
			return nil, errors.New("handler: Received invalid Snowflake")
		}
		log.Println("---- Handler: snowflake assigned ----")
		// Send the magic Turbo Tunnel token.
		_, err := conn.Write(turbotunnel.Token[:])
		if err != nil {
			return nil, err
		}
		// Send ClientID prefix.
		_, err = conn.Write(clientID[:])
		if err != nil {
			return nil, err
		}
		return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
57
	}
58
	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
59

60
61
62
63
64
65
	// conn is built on the underlying RedialPacketConn—when one WebRTC
	// connection dies, another one will be found to take its place. The
	// sequence of packets across multiple WebRTC connections drives the KCP
	// engine.
	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
	if err != nil {
66
67
		pconn.Close()
		return nil, nil, err
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
	}
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Disable the dynamic congestion window (limit only by the
	// maximum of local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// On the KCP connection we overlay an smux session and stream.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 10 * time.Minute
	sess, err := smux.Client(conn, smuxConfig)
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
	if err != nil {
		conn.Close()
		pconn.Close()
		return nil, nil, err
	}

	return pconn, sess, err
}

// sessionManager_ maintains a single global smux.Session that is shared among
// incoming SOCKS connections.
type sessionManager_ struct {
	mutex sync.Mutex
	sess  *smux.Session
}

// Get creates and returns a new global smux.Session if none exists yet. If one
// already exists, it returns the existing one. It monitors the returned session
// and if it ever fails, sets things up so the next call to Get will create a
// new session.
func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
	manager.mutex.Lock()
	defer manager.mutex.Unlock()

	if manager.sess == nil {
		log.Printf("starting a new session")
		pconn, sess, err := newSession(snowflakes)
		if err != nil {
			return nil, err
		}
		manager.sess = sess
		go func() {
			// If the session dies, set it to be recreated.
			for {
				<-time.After(5 * time.Second)
				if sess.IsClosed() {
					break
				}
			}
			log.Printf("discarding finished session")
			// Close the underlying to force any ongoing WebRTC
			// connection to close as well, and relinquish the
			// SnowflakeCollector.
			pconn.Close()
			manager.mutex.Lock()
			manager.sess = nil
			manager.mutex.Unlock()
		}()
	} else {
		log.Printf("reusing the existing session")
	}

	return manager.sess, nil
}

var sessionManager = sessionManager_{}

// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
	// Return the global smux.Session.
	sess, err := sessionManager.Get(snowflakes)
146
147
148
	if err != nil {
		return err
	}
149
150

	// On the smux session we overlay a stream.
151
152
153
154
155
	stream, err := sess.OpenStream()
	if err != nil {
		return err
	}
	defer stream.Close()
156

157
158
159
160
	// Begin exchanging data.
	log.Printf("---- Handler: begin stream %v ---", stream.ID())
	copyLoop(socks, stream)
	log.Printf("---- Handler: closed stream %v ---", stream.ID())
161
162
163
164
	return nil
}

// Exchanges bytes between two ReadWriters.
165
166
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
167
	done := make(chan struct{}, 2)
168
	go func() {
169
		if _, err := io.Copy(socks, stream); err != nil {
170
			log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
171
		}
172
		done <- struct{}{}
173
174
	}()
	go func() {
175
176
		if _, err := io.Copy(stream, socks); err != nil {
			log.Printf("copying SOCKS to stream resulted in error: %v", err)
177
		}
178
		done <- struct{}{}
179
	}()
180
	<-done
181
182
	log.Println("copy loop ended")
}