Skip to content
Snippets Groups Projects
Commit 596393ef authored by David Fifield's avatar David Fifield
Browse files

Use multiple parallel KCP state machines in the server.

parent e851861e
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,10 @@ package snowflake_server
import (
"bufio"
"bytes"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"fmt"
"io"
"log"
......@@ -49,9 +53,45 @@ var upgrader = websocket.Upgrader{
var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)
type httpHandler struct {
// pconn is the adapter layer between stream-oriented WebSocket
// connections and the packet-oriented KCP layer.
pconn *turbotunnel.QueuePacketConn
// pconns is the adapter layer between stream-oriented WebSocket
// connections and the packet-oriented KCP layer. There are multiple of
// these, corresponding to the multiple kcp.ServeConn in
// Transport.Listen. Clients are assigned to a particular instance by a
// hash of ClientID, indexed by a hash of the ClientID, in order to
// distribute KCP processing load across CPU cores.
pconns []*turbotunnel.QueuePacketConn
// clientIDLookupKey is a secret key used to tweak the hash-based
// assignement of ClientID to pconn, in order to avoid manipulation of
// hash assignments.
clientIDLookupKey []byte
}
// newHTTPHandler creates a new http.Handler that exchanges encapsulated packets
// over incoming WebSocket connections.
func newHTTPHandler(localAddr net.Addr, numInstances int) *httpHandler {
pconns := make([]*turbotunnel.QueuePacketConn, 0, numInstances)
for i := 0; i < numInstances; i++ {
pconns = append(pconns, turbotunnel.NewQueuePacketConn(localAddr, clientMapTimeout))
}
clientIDLookupKey := make([]byte, 16)
_, err := rand.Read(clientIDLookupKey)
if err != nil {
panic(err)
}
return &httpHandler{
pconns: pconns,
clientIDLookupKey: clientIDLookupKey,
}
}
// lookupPacketConn returns the element of pconns that corresponds to client ID,
// according to the hash-based mapping.
func (handler *httpHandler) lookupPacketConn(clientID turbotunnel.ClientID) *turbotunnel.QueuePacketConn {
s := hmac.New(sha256.New, handler.clientIDLookupKey).Sum(clientID[:])
return handler.pconns[binary.LittleEndian.Uint64(s)%uint64(len(handler.pconns))]
}
func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
......@@ -82,7 +122,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case bytes.Equal(token[:], turbotunnel.Token[:]):
err = turbotunnelMode(conn, addr, handler.pconn)
err = handler.turbotunnelMode(conn, addr)
default:
// We didn't find a matching token, which means that we are
// dealing with a client that doesn't know about such things.
......@@ -100,7 +140,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// turbotunnelMode handles clients that sent turbotunnel.Token at the start of
// their stream. These clients expect to send and receive encapsulated packets,
// with a long-lived session identified by ClientID.
func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacketConn) error {
func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error {
// Read the ClientID prefix. Every packet encapsulated in this WebSocket
// connection pertains to the same ClientID.
var clientID turbotunnel.ClientID
......@@ -120,6 +160,8 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// credited for the entire KCP session.
clientIDAddrMap.Set(clientID, addr)
pconn := handler.lookupPacketConn(clientID)
var wg sync.WaitGroup
wg.Add(2)
done := make(chan struct{})
......
......@@ -55,6 +55,11 @@ const (
WindowSize = 65535
// StreamSize controls the maximum amount of in flight data between a client and server.
StreamSize = 1048576 //1MB
// numKCPInstances is the number of parallel KCP state machines to run.
// Clients are assigned to a particular KCP instance by a hash of their
// ClientID.
numKCPInstances = 2
)
// Transport is a structure with methods that conform to the Go PT v2.1 API
......@@ -76,17 +81,13 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
addr: addr,
queue: make(chan net.Conn, 65534),
closed: make(chan struct{}),
ln: make([]*kcp.Listener, 0, numKCPInstances),
}
handler := httpHandler{
// pconn is shared among all connections to this server. It
// overlays packet-based client sessions on top of ephemeral
// WebSocket connections.
pconn: turbotunnel.NewQueuePacketConn(addr, clientMapTimeout),
}
handler := newHTTPHandler(addr, numKCPInstances)
server := &http.Server{
Addr: addr.String(),
Handler: &handler,
Handler: handler,
ReadTimeout: requestTimeout,
}
// We need to override server.TLSConfig.GetCertificate--but first
......@@ -139,25 +140,26 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
listener.server = server
// Start a KCP engine, set up to read and write its packets over the
// Start the KCP engines, set up to read and write its packets over the
// WebSocket connections that arrive at the web server.
// handler.ServeHTTP is responsible for encapsulation/decapsulation of
// packets on behalf of KCP. KCP takes those packets and turns them into
// sessions which appear in the acceptSessions function.
ln, err := kcp.ServeConn(nil, 0, 0, handler.pconn)
if err != nil {
server.Close()
return nil, err
}
go func() {
defer ln.Close()
err := listener.acceptSessions(ln)
for i, pconn := range handler.pconns {
ln, err := kcp.ServeConn(nil, 0, 0, pconn)
if err != nil {
log.Printf("acceptSessions: %v", err)
server.Close()
return nil, err
}
}()
listener.ln = ln
go func() {
defer ln.Close()
err := listener.acceptSessions(ln)
if err != nil {
log.Printf("acceptSessions %d: %v", i, err)
}
}()
listener.ln = append(listener.ln, ln)
}
return listener, nil
......@@ -167,7 +169,7 @@ type SnowflakeListener struct {
addr net.Addr
queue chan net.Conn
server *http.Server
ln *kcp.Listener
ln []*kcp.Listener
closed chan struct{}
closeOnce sync.Once
}
......@@ -196,7 +198,9 @@ func (l *SnowflakeListener) Close() error {
l.closeOnce.Do(func() {
close(l.closed)
l.server.Close()
l.ln.Close()
for _, ln := range l.ln {
ln.Close()
}
})
return nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment