http.go 7.05 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
package lib

import (
	"bufio"
	"bytes"
	"fmt"
	"io"
	"log"
	"net"
	"net/http"
11
	"sync"
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
	"time"

	"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
	"git.torproject.org/pluggable-transports/snowflake.git/common/websocketconn"
	"github.com/gorilla/websocket"
)

const requestTimeout = 10 * time.Second

// How long to remember outgoing packets for a client, when we don't currently
// have an active WebSocket connection corresponding to that client. Because a
// client session may span multiple WebSocket connections, we keep packets we
// aren't able to send immediately in memory, for a little while but not
// indefinitely.
const clientMapTimeout = 1 * time.Minute

// How big to make the map of ClientIDs to IP addresses. The map is used in
// turbotunnelMode to store a reasonable IP address for a client session that
// may outlive any single WebSocket connection.
const clientIDAddrMapCapacity = 1024

// How long to wait for ListenAndServe or ListenAndServeTLS to return an error
// before deciding that it's not going to return.
const listenAndServeErrorTimeout = 100 * time.Millisecond

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool { return true },
}

// clientIDAddrMap stores short-term mappings from ClientIDs to IP addresses.
// When we call pt.DialOr, tor wants us to provide a USERADDR string that
// represents the remote IP address of the client (for metrics purposes, etc.).
// This data structure bridges the gap between ServeHTTP, which knows about IP
// addresses, and handleStream, which is what calls pt.DialOr. The common piece
// of information linking both ends of the chain is the ClientID, which is
// attached to the WebSocket connection and every session.
var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)

// overrideReadConn is a net.Conn with an overridden Read method. Compare to
// recordingConn at
// https://dave.cheney.net/2015/05/22/struct-composition-with-go.
type overrideReadConn struct {
	net.Conn
	io.Reader
}

func (conn *overrideReadConn) Read(p []byte) (int, error) {
	return conn.Reader.Read(p)
}

type HTTPHandler struct {
	// pconn is the adapter layer between stream-oriented WebSocket
	// connections and the packet-oriented KCP layer.
	pconn *turbotunnel.QueuePacketConn
	ln    *SnowflakeListener
}

func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	ws, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}

	conn := websocketconn.New(ws)
	defer conn.Close()

	// Pass the address of client as the remote address of incoming connection
	clientIPParam := r.URL.Query().Get("client_ip")
	addr := clientAddr(clientIPParam)

	var token [len(turbotunnel.Token)]byte
	_, err = io.ReadFull(conn, token[:])
	if err != nil {
		// Don't bother logging EOF: that happens with an unused
		// connection, which clients make frequently as they maintain a
		// pool of proxies.
		if err != io.EOF {
			log.Printf("reading token: %v", err)
		}
		return
	}

	switch {
	case bytes.Equal(token[:], turbotunnel.Token[:]):
		err = turbotunnelMode(conn, addr, handler.pconn)
	default:
		// We didn't find a matching token, which means that we are
		// dealing with a client that doesn't know about such things.
		// "Unread" the token by constructing a new Reader and pass it
		// to the old one-session-per-WebSocket mode.
		conn2 := &overrideReadConn{Conn: conn, Reader: io.MultiReader(bytes.NewReader(token[:]), conn)}
		err = oneshotMode(conn2, addr, handler.ln)
	}
	if err != nil {
		log.Println(err)
		return
	}
}

// oneshotMode handles clients that did not send turbotunnel.Token at the start
// of their stream. These clients use the WebSocket as a raw pipe, and expect
// their session to begin and end when this single WebSocket does.
func oneshotMode(conn net.Conn, addr net.Addr, ln *SnowflakeListener) error {
	return ln.QueueConn(&SnowflakeClientConn{Conn: conn, address: addr})
}

// turbotunnelMode handles clients that sent turbotunnel.Token at the start of
// their stream. These clients expect to send and receive encapsulated packets,
// with a long-lived session identified by ClientID.
func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacketConn) error {
	// Read the ClientID prefix. Every packet encapsulated in this WebSocket
	// connection pertains to the same ClientID.
	var clientID turbotunnel.ClientID
	_, err := io.ReadFull(conn, clientID[:])
	if err != nil {
		return fmt.Errorf("reading ClientID: %v", err)
	}

	// Store a a short-term mapping from the ClientID to the client IP
	// address attached to this WebSocket connection. tor will want us to
	// provide a client IP address when we call pt.DialOr. But a KCP session
	// does not necessarily correspond to any single IP address--it's
	// composed of packets that are carried in possibly multiple WebSocket
	// streams. We apply the heuristic that the IP address of the most
	// recent WebSocket connection that has had to do with a session, at the
	// time the session is established, is the IP address that should be
	// credited for the entire KCP session.
	clientIDAddrMap.Set(clientID, addr.String())

143
144
145
	var wg sync.WaitGroup
	wg.Add(2)
	done := make(chan struct{})
146
147
148
149
150
151

	// The remainder of the WebSocket stream consists of encapsulated
	// packets. We read them one by one and feed them into the
	// QueuePacketConn on which kcp.ServeConn was set up, which eventually
	// leads to KCP-level sessions in the acceptSessions function.
	go func() {
152
153
		defer wg.Done()
		defer close(done) // Signal the write loop to finish
154
155
156
		for {
			p, err := encapsulation.ReadData(conn)
			if err != nil {
157
				return
158
159
160
161
162
163
164
165
			}
			pconn.QueueIncoming(p, clientID)
		}
	}()

	// At the same time, grab packets addressed to this ClientID and
	// encapsulate them into the downstream.
	go func() {
166
167
168
		defer wg.Done()
		defer conn.Close() // Signal the read loop to finish

169
170
171
		// Buffer encapsulation.WriteData operations to keep length
		// prefixes in the same send as the data that follows.
		bw := bufio.NewWriter(conn)
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
		for {
			select {
			case <-done:
				return
			case p, ok := <-pconn.OutgoingQueue(clientID):
				if !ok {
					return
				}
				_, err := encapsulation.WriteData(bw, p)
				if err == nil {
					err = bw.Flush()
				}
				if err != nil {
					return
				}
187
188
189
190
			}
		}
	}()

191
	wg.Wait()
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223

	return nil
}

type ClientMapAddr string

func (addr ClientMapAddr) Network() string {
	return "snowflake"
}

func (addr ClientMapAddr) String() string {
	return string(addr)
}

// Return a client address
func clientAddr(clientIPParam string) net.Addr {
	if clientIPParam == "" {
		return ClientMapAddr("")
	}
	// Check if client addr is a valid IP
	clientIP := net.ParseIP(clientIPParam)
	if clientIP == nil {
		return ClientMapAddr("")
	}
	// Check if client addr is 0.0.0.0 or [::]. Some proxies erroneously
	// report an address of 0.0.0.0: https://bugs.torproject.org/33157.
	if clientIP.IsUnspecified() {
		return ClientMapAddr("")
	}
	// Add a stub port number. USERADDR requires a port number.
	return ClientMapAddr((&net.TCPAddr{IP: clientIP, Port: 1, Zone: ""}).String())
}