Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
Snowflake
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
David Fifield
Snowflake
Commits
17dc8cad
Commit
17dc8cad
authored
Oct 1, 2022
by
David Fifield
Browse files
Options
Downloads
Patches
Plain Diff
Use multiple parallel KCP state machines.
To distribute CPU load.
parent
57c9aa34
No related branches found
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
server/lib/http.go
+47
-5
47 additions, 5 deletions
server/lib/http.go
server/lib/snowflake.go
+26
-22
26 additions, 22 deletions
server/lib/snowflake.go
with
73 additions
and
27 deletions
server/lib/http.go
+
47
−
5
View file @
17dc8cad
...
...
@@ -3,6 +3,10 @@ package snowflake_server
import
(
"bufio"
"bytes"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"fmt"
"io"
"log"
...
...
@@ -49,9 +53,45 @@ var upgrader = websocket.Upgrader{
var
clientIDAddrMap
=
newClientIDMap
(
clientIDAddrMapCapacity
)
type
httpHandler
struct
{
// pconn is the adapter layer between stream-oriented WebSocket
// connections and the packet-oriented KCP layer.
pconn
*
turbotunnel
.
QueuePacketConn
// pconns is the adapter layer between stream-oriented WebSocket
// connections and the packet-oriented KCP layer. There are multiple of
// these, corresponding to the multiple kcp.ServeConn in
// Transport.Listen. Clients are assigned to a particular instance by a
// hash of ClientID, indexed by a hash of the ClientID, in order to
// distribute KCP processing load across CPU cores.
pconns
[]
*
turbotunnel
.
QueuePacketConn
// clientIDLookupKey is a secret key used to tweak the hash-based
// assignement of ClientID to pconn, in order to avoid manipulation of
// hash assignments.
clientIDLookupKey
[]
byte
}
// newHTTPHandler creates a new http.Handler that exchanges encapsulated packets
// over incoming WebSocket connections.
func
newHTTPHandler
(
localAddr
net
.
Addr
,
numInstances
int
)
*
httpHandler
{
pconns
:=
make
([]
*
turbotunnel
.
QueuePacketConn
,
0
,
numInstances
)
for
i
:=
0
;
i
<
numInstances
;
i
++
{
pconns
=
append
(
pconns
,
turbotunnel
.
NewQueuePacketConn
(
localAddr
,
clientMapTimeout
))
}
clientIDLookupKey
:=
make
([]
byte
,
16
)
_
,
err
:=
rand
.
Read
(
clientIDLookupKey
)
if
err
!=
nil
{
panic
(
err
)
}
return
&
httpHandler
{
pconns
:
pconns
,
clientIDLookupKey
:
clientIDLookupKey
,
}
}
// lookupPacketConn returns the element of pconns that corresponds to client ID,
// according to the hash-based mapping.
func
(
handler
*
httpHandler
)
lookupPacketConn
(
clientID
turbotunnel
.
ClientID
)
*
turbotunnel
.
QueuePacketConn
{
s
:=
hmac
.
New
(
sha256
.
New
,
handler
.
clientIDLookupKey
)
.
Sum
(
clientID
[
:
])
return
handler
.
pconns
[
binary
.
LittleEndian
.
Uint64
(
s
)
%
uint64
(
len
(
handler
.
pconns
))]
}
func
(
handler
*
httpHandler
)
ServeHTTP
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
...
...
@@ -82,7 +122,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch
{
case
bytes
.
Equal
(
token
[
:
],
turbotunnel
.
Token
[
:
])
:
err
=
turbotunnelMode
(
conn
,
addr
,
handler
.
pconn
)
err
=
handler
.
turbotunnelMode
(
conn
,
addr
)
default
:
// We didn't find a matching token, which means that we are
// dealing with a client that doesn't know about such things.
...
...
@@ -100,7 +140,7 @@ func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// turbotunnelMode handles clients that sent turbotunnel.Token at the start of
// their stream. These clients expect to send and receive encapsulated packets,
// with a long-lived session identified by ClientID.
func
turbotunnelMode
(
conn
net
.
Conn
,
addr
net
.
Addr
,
pconn
*
turbotunnel
.
QueuePacketConn
)
error
{
func
(
handler
*
httpHandler
)
turbotunnelMode
(
conn
net
.
Conn
,
addr
net
.
Addr
)
error
{
// Read the ClientID prefix. Every packet encapsulated in this WebSocket
// connection pertains to the same ClientID.
var
clientID
turbotunnel
.
ClientID
...
...
@@ -120,6 +160,8 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
// credited for the entire KCP session.
clientIDAddrMap
.
Set
(
clientID
,
addr
)
pconn
:=
handler
.
lookupPacketConn
(
clientID
)
var
wg
sync
.
WaitGroup
wg
.
Add
(
2
)
done
:=
make
(
chan
struct
{})
...
...
This diff is collapsed.
Click to expand it.
server/lib/snowflake.go
+
26
−
22
View file @
17dc8cad
...
...
@@ -58,6 +58,11 @@ const (
WindowSize
=
65535
// StreamSize controls the maximum amount of in flight data between a client and server.
StreamSize
=
1048576
//1MB
// numKCPInstances is the number of parallel KCP state machines to run.
// Clients are assigned to a particular KCP instance by a hash of their
// ClientID.
numKCPInstances
=
2
)
// Transport is a structure with methods that conform to the Go PT v2.1 API
...
...
@@ -79,17 +84,13 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
addr
:
addr
,
queue
:
make
(
chan
net
.
Conn
,
65534
),
closed
:
make
(
chan
struct
{}),
ln
:
make
([]
*
kcp
.
Listener
,
0
,
numKCPInstances
),
}
handler
:=
httpHandler
{
// pconn is shared among all connections to this server. It
// overlays packet-based client sessions on top of ephemeral
// WebSocket connections.
pconn
:
turbotunnel
.
NewQueuePacketConn
(
addr
,
clientMapTimeout
),
}
handler
:=
newHTTPHandler
(
addr
,
numKCPInstances
)
server
:=
&
http
.
Server
{
Addr
:
addr
.
String
(),
Handler
:
&
handler
,
Handler
:
handler
,
ReadTimeout
:
requestTimeout
,
}
// We need to override server.TLSConfig.GetCertificate--but first
...
...
@@ -142,12 +143,13 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
listener
.
server
=
server
// Start
a
KCP engine, set up to read and write its packets over the
// Start
the
KCP engine
s
, set up to read and write its packets over the
// WebSocket connections that arrive at the web server.
// handler.ServeHTTP is responsible for encapsulation/decapsulation of
// packets on behalf of KCP. KCP takes those packets and turns them into
// sessions which appear in the acceptSessions function.
ln
,
err
:=
kcp
.
ServeConn
(
nil
,
0
,
0
,
handler
.
pconn
)
for
i
,
pconn
:=
range
handler
.
pconns
{
ln
,
err
:=
kcp
.
ServeConn
(
nil
,
0
,
0
,
pconn
)
if
err
!=
nil
{
server
.
Close
()
return
nil
,
err
...
...
@@ -156,11 +158,11 @@ func (t *Transport) Listen(addr net.Addr) (*SnowflakeListener, error) {
defer
ln
.
Close
()
err
:=
listener
.
acceptSessions
(
ln
)
if
err
!=
nil
{
log
.
Printf
(
"acceptSessions: %v"
,
err
)
log
.
Printf
(
"acceptSessions
%d
: %v"
,
i
,
err
)
}
}()
listener
.
ln
=
ln
listener
.
ln
=
append
(
listener
.
ln
,
ln
)
}
return
listener
,
nil
...
...
@@ -170,7 +172,7 @@ type SnowflakeListener struct {
addr
net
.
Addr
queue
chan
net
.
Conn
server
*
http
.
Server
ln
*
kcp
.
Listener
ln
[]
*
kcp
.
Listener
closed
chan
struct
{}
closeOnce
sync
.
Once
}
...
...
@@ -199,7 +201,9 @@ func (l *SnowflakeListener) Close() error {
l
.
closeOnce
.
Do
(
func
()
{
close
(
l
.
closed
)
l
.
server
.
Close
()
l
.
ln
.
Close
()
for
_
,
ln
:=
range
l
.
ln
{
ln
.
Close
()
}
})
return
nil
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment