Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*
Package snowflake_server implements the functionality necessary to accept Snowflake
connections from Snowflake clients.
Included in the package is a Transport type that implements the Pluggable Transports v2.1 Go API
specification. To start a TLS Snowflake server using the golang.org/x/crypto/acme/autocert
library, configure a certificate manager for the server's domain name and then create a new
Transport as follows:
// The snowflake server runs a websocket server. To run this securely, you will
// need a valid certificate.
certManager := &autocert.Manager{
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist("snowflake.yourdomain.com"),
Email: "you@yourdomain.com",
}
transport := snowflake_server.NewSnowflakeServer(certManager.GetCertificate)
The Listen function starts a new listener, and Accept will return incoming Snowflake connections:
ln, err := transport.Listen(addr)
if err != nil {
// handle error
}
for {
conn, err := ln.Accept()
if err != nil {
// handle error
}
// handle conn
}
*/
package snowflake_server
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/turbotunnel"
"github.com/xtaci/kcp-go/v5"
"github.com/xtaci/smux"
"golang.org/x/net/http2"
)
// WindowSize is the number of packets in the send and receive window of a KCP connection.
// StreamSize controls the maximum amount of in flight data between a client and server.
StreamSize = 1048576 //1MB
)
// Transport is a structure with methods that conform to the Go PT v2.1 API
// https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
type Transport struct {
getCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error)
}
// NewSnowflakeServer returns a new server-side Transport for Snowflake.
func NewSnowflakeServer(getCertificate func(*tls.ClientHelloInfo) (*tls.Certificate, error)) *Transport {
return &Transport{getCertificate: getCertificate}
}
// Listen starts a listener on addr that will accept both turbotunnel
// and legacy Snowflake connections.
func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
listener := &SnowflakeListener{addr: addr, queue: make(chan net.Conn, 65534)}
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// pconn is shared among all connections to this server. It
// overlays packet-based client sessions on top of ephemeral
// WebSocket connections.
pconn: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
}
server := &http.Server{
Addr: addr.String(),
Handler: &handler,
ReadTimeout: requestTimeout,
}
// We need to override server.TLSConfig.GetCertificate--but first
// server.TLSConfig needs to be non-nil. If we just create our own new
// &tls.Config, it will lack the default settings that the net/http
// package sets up for things like HTTP/2. Therefore we first call
// http2.ConfigureServer for its side effect of initializing
// server.TLSConfig properly. An alternative would be to make a dummy
// net.Listener, call Serve on it, and let it return.
// https://github.com/golang/go/issues/16588#issuecomment-237386446
err := http2.ConfigureServer(server, nil)
if err != nil {
return nil, err
}
server.TLSConfig.GetCertificate = t.getCertificate
// Another unfortunate effect of the inseparable net/http ListenAndServe
// is that we can't check for Listen errors like "permission denied" and
// "address already in use" without potentially entering the infinite
// loop of Serve. The hack we apply here is to wait a short time,
// listenAndServeErrorTimeout, to see if an error is returned (because
// it's better if the error message goes to the tor log through
// SMETHOD-ERROR than if it only goes to the snowflake log).
errChan := make(chan error)
go func() {
if t.getCertificate == nil {
// TLS is disabled
log.Printf("listening with plain HTTP on %s", addr)
err := server.ListenAndServe()
if err != nil {
log.Printf("error in ListenAndServe: %s", err)
}
errChan <- err
} else {
log.Printf("listening with HTTPS on %s", addr)
err := server.ListenAndServeTLS("", "")
if err != nil {
log.Printf("error in ListenAndServeTLS: %s", err)
}
errChan <- err
}
}()
select {
case err = <-errChan:
break
case <-time.After(listenAndServeErrorTimeout):
break
}
listener.server = server
// Start a KCP engine, set up to read and write its packets over the
// WebSocket connections that arrive at the web server.
// handler.ServeHTTP is responsible for encapsulation/decapsulation of
// packets on behalf of KCP. KCP takes those packets and turns them into
// sessions which appear in the acceptSessions function.
ln, err := kcp.ServeConn(nil, 0, 0, handler.pconn)
if err != nil {
server.Close()
return nil, err
}
go func() {
defer ln.Close()
err := listener.acceptSessions(ln)
if err != nil {
log.Printf("acceptSessions: %v", err)
}
}()
listener.ln = ln
return listener, nil
}
type SnowflakeListener struct {
addr net.Addr
queue chan net.Conn
server *http.Server
ln *kcp.Listener
closed chan struct{}
closeOnce sync.Once
}
// Accept allows the caller to accept incoming Snowflake connections.
// We accept connections from a queue to accommodate both incoming
// smux Streams and legacy non-turbotunnel connections.
func (l *SnowflakeListener) Accept() (net.Conn, error) {
select {
case <-l.closed:
//channel has been closed, no longer accepting connections
return nil, io.ErrClosedPipe
case conn := <-l.queue:
return conn, nil
}
}
// Addr returns the address of the SnowflakeListener
func (l *SnowflakeListener) Addr() net.Addr {
return l.addr
}
// Close closes the Snowflake connection.
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
func (l *SnowflakeListener) Close() error {
// Close our HTTP server and our KCP listener
l.closeOnce.Do(func() {
close(l.closed)
l.server.Close()
l.ln.Close()
})
return nil
}
// acceptStreams layers an smux.Session on the KCP connection and awaits streams
// on it. Passes each stream to our SnowflakeListener accept queue.
func (l *SnowflakeListener) acceptStreams(conn *kcp.UDPSession) error {
// Look up the IP address associated with this KCP session, via the
// ClientID that is returned by the session's RemoteAddr method.
addr, ok := clientIDAddrMap.Get(conn.RemoteAddr().(turbotunnel.ClientID))
if !ok {
// This means that the map is tending to run over capacity, not
// just that there was not client_ip on the incoming connection.
// We store "" in the map in the absence of client_ip. This log
// message means you should increase clientIDAddrMapCapacity.
log.Printf("no address in clientID-to-IP map (capacity %d)", clientIDAddrMapCapacity)
}
smuxConfig := smux.DefaultConfig()
smuxConfig.Version = 2
smuxConfig.KeepAliveTimeout = 10 * time.Minute
smuxConfig.MaxStreamBuffer = StreamSize
sess, err := smux.Server(conn, smuxConfig)
if err != nil {
return err
}
for {
stream, err := sess.AcceptStream()
if err != nil {
if err, ok := err.(net.Error); ok && err.Temporary() {
continue
}
return err
}
l.queueConn(&SnowflakeClientConn{Conn: stream, address: addr})
}
}
// acceptSessions listens for incoming KCP connections and passes them to
// acceptStreams. It is handler.ServeHTTP that provides the network interface
// that drives this function.
func (l *SnowflakeListener) acceptSessions(ln *kcp.Listener) error {
for {
conn, err := ln.AcceptKCP()
if err != nil {
if err, ok := err.(net.Error); ok && err.Temporary() {
continue
}
return err
}
// Permit coalescing the payloads of consecutive sends.
conn.SetStreamMode(true)
// Set the maximum send and receive window sizes to a high number
// Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026
conn.SetWindowSize(WindowSize, WindowSize)
// Disable the dynamic congestion window (limit only by the
// maximum of local and remote static windows).
conn.SetNoDelay(
0, // default nodelay
0, // default interval
0, // default resend
1, // nc=1 => congestion window off
)
go func() {
defer conn.Close()
err := l.acceptStreams(conn)
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
log.Printf("acceptStreams: %v", err)
}
}()
}
}
func (l *SnowflakeListener) queueConn(conn net.Conn) error {
select {
case <-l.closed:
return fmt.Errorf("accepted connection on closed listener")
case l.queue <- conn:
return nil
}
}
// SnowflakeClientConn is a wrapper for the underlying turbotunnel
// conn. We need to reference our client address map to determine the
// remote address
type SnowflakeClientConn struct {
net.Conn
address net.Addr
}
// RemoteAddr returns the mapped client address of the Snowflake connection
func (conn *SnowflakeClientConn) RemoteAddr() net.Addr {
return conn.address
}