Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
Snowflake
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
shelikhoo
Snowflake
Commits
19244c71
Commit
19244c71
authored
5 years ago
by
Cecylia Bocovich
Browse files
Options
Downloads
Plain Diff
Merge branch 'ticket21315'
parents
fdc10fd0
908cf3fc
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
broker/broker.go
+13
-10
13 additions, 10 deletions
broker/broker.go
broker/metrics.go
+105
-15
105 additions, 15 deletions
broker/metrics.go
broker/snowflake-broker_test.go
+129
-0
129 additions, 0 deletions
broker/snowflake-broker_test.go
with
247 additions
and
25 deletions
broker/broker.go
+
13
−
10
View file @
19244c71
...
...
@@ -149,10 +149,20 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
log
.
Println
(
"Received snowflake: "
,
id
)
// Log geoip stats
remoteIP
,
_
,
err
:=
net
.
SplitHostPort
(
r
.
RemoteAddr
)
if
err
!=
nil
{
log
.
Println
(
"Error processing proxy IP: "
,
err
.
Error
())
}
else
{
ctx
.
metrics
.
UpdateCountryStats
(
remoteIP
)
}
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer
:=
ctx
.
RequestOffer
(
id
)
if
nil
==
offer
{
log
.
Println
(
"Proxy "
+
id
+
" did not receive a Client offer."
)
ctx
.
metrics
.
proxyIdleCount
++
w
.
WriteHeader
(
http
.
StatusGatewayTimeout
)
return
}
...
...
@@ -176,6 +186,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Immediately fail if there are no snowflakes available.
if
ctx
.
snowflakes
.
Len
()
<=
0
{
log
.
Println
(
"Client: No snowflake proxies available."
)
ctx
.
metrics
.
clientDeniedCount
++
w
.
WriteHeader
(
http
.
StatusServiceUnavailable
)
return
}
...
...
@@ -189,6 +200,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
select
{
case
answer
:=
<-
snowflake
.
answerChannel
:
log
.
Println
(
"Client: Retrieving answer"
)
ctx
.
metrics
.
clientProxyMatchCount
++
w
.
Write
(
answer
)
// Initial tracking of elapsed time.
ctx
.
metrics
.
clientRoundtripEstimate
=
time
.
Since
(
startTime
)
/
...
...
@@ -221,15 +233,6 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
// Get proxy country stats
remoteIP
,
_
,
err
:=
net
.
SplitHostPort
(
r
.
RemoteAddr
)
if
err
!=
nil
{
log
.
Println
(
"Error processing proxy IP: "
,
err
.
Error
())
}
else
{
ctx
.
metrics
.
UpdateCountryStats
(
remoteIP
)
}
log
.
Println
(
"Received answer."
)
snowflake
.
answerChannel
<-
body
}
...
...
@@ -291,7 +294,7 @@ func main() {
metricsFile
=
os
.
Stdout
}
metricsLogger
:=
log
.
New
(
metricsFile
,
""
,
log
.
LstdFlags
|
log
.
LUTC
)
metricsLogger
:=
log
.
New
(
metricsFile
,
""
,
0
)
ctx
:=
NewBrokerContext
(
metricsLogger
)
...
...
This diff is collapsed.
Click to expand it.
broker/metrics.go
+
105
−
15
View file @
19244c71
/*
We export metrics in the following format:
"snowflake-stats-end" YYYY-MM-DD HH:MM:SS (NSEC s) NL
[At most once.]
YYYY-MM-DD HH:MM:SS defines the end of the included measurement
interval of length NSEC seconds (86400 seconds by default).
"snowflake-ips" CC=NUM,CC=NUM,... NL
[At most once.]
List of mappings from two-letter country codes to the number of
unique IP addresses of snowflake proxies that have polled.
"snowflake-ips-total" NUM NL
[At most once.]
A count of the total number of unique IP addresses of snowflake
proxies that have polled.
"snowflake-idle-count" NUM NL
[At most once.]
A count of the number of times a proxy has polled but received
no client offer, rounded up to the nearest multiple of 8.
"client-denied-count" NUM NL
[At most once.]
A count of the number of times a client has requested a proxy
from the broker but no proxies were available, rounded up to
the nearest multiple of 8.
"client-snowflake-match-count" NUM NL
[At most once.]
A count of the number of times a client successfully received a
proxy from the broker, rounded up to the nearest multiple of 8.
*/
package
main
import
(
// "golang.org/x/net/internal/timeseries"
"fmt"
"log"
"math"
"net"
"sync"
"time"
...
...
@@ -13,21 +55,38 @@ var (
once
sync
.
Once
)
const
metricsResolution
=
60
*
60
*
24
*
time
.
Second
//86400 seconds
type
CountryStats
struct
{
addrs
map
[
string
]
bool
counts
map
[
string
]
int
}
// Implements Observable
type
Metrics
struct
{
tablev4
*
GeoIPv4Table
tablev6
*
GeoIPv6Table
countryStats
CountryStats
// snowflakes timeseries.Float
logger
*
log
.
Logger
tablev4
*
GeoIPv4Table
tablev6
*
GeoIPv6Table
countryStats
CountryStats
clientRoundtripEstimate
time
.
Duration
proxyIdleCount
uint
clientDeniedCount
uint
clientProxyMatchCount
uint
}
func
(
s
CountryStats
)
Display
()
string
{
return
fmt
.
Sprint
(
s
.
counts
)
output
:=
""
for
cc
,
count
:=
range
s
.
counts
{
output
+=
fmt
.
Sprintf
(
"%s=%d,"
,
cc
,
count
)
}
// cut off trailing ","
if
len
(
output
)
>
0
{
return
output
[
:
len
(
output
)
-
1
]
}
return
output
}
func
(
m
*
Metrics
)
UpdateCountryStats
(
addr
string
)
{
...
...
@@ -35,6 +94,10 @@ func (m *Metrics) UpdateCountryStats(addr string) {
var
country
string
var
ok
bool
if
m
.
countryStats
.
addrs
[
addr
]
{
return
}
ip
:=
net
.
ParseIP
(
addr
)
if
ip
.
To4
()
!=
nil
{
//This is an IPv4 address
...
...
@@ -54,8 +117,9 @@ func (m *Metrics) UpdateCountryStats(addr string) {
log
.
Println
(
"Unknown geoip"
)
}
//update map of
countrie
s and counts
//update map of
unique ip
s and counts
m
.
countryStats
.
counts
[
country
]
++
m
.
countryStats
.
addrs
[
addr
]
=
true
return
}
...
...
@@ -90,19 +154,45 @@ func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m
.
countryStats
=
CountryStats
{
counts
:
make
(
map
[
string
]
int
),
addrs
:
make
(
map
[
string
]
bool
),
}
m
.
logger
=
metricsLogger
// Write to log file every hour with updated metrics
go
once
.
Do
(
func
()
{
heartbeat
:=
time
.
Tick
(
time
.
Hour
)
for
range
heartbeat
{
metricsLogger
.
Println
(
"Country stats: "
,
m
.
countryStats
.
Display
())
go
once
.
Do
(
m
.
logMetrics
)
return
m
,
nil
}
//restore all metrics to original values
m
.
countryStats
.
counts
=
make
(
map
[
string
]
int
)
// Logs metrics in intervals specified by metricsResolution
func
(
m
*
Metrics
)
logMetrics
()
{
heartbeat
:=
time
.
Tick
(
metricsResolution
)
for
range
heartbeat
{
m
.
printMetrics
()
m
.
zeroMetrics
()
}
}
}
})
func
(
m
*
Metrics
)
printMetrics
()
{
m
.
logger
.
Println
(
"snowflake-stats-end"
,
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
),
fmt
.
Sprintf
(
"(%d s)"
,
int
(
metricsResolution
.
Seconds
())))
m
.
logger
.
Println
(
"snowflake-ips"
,
m
.
countryStats
.
Display
())
m
.
logger
.
Println
(
"snowflake-ips-total"
,
len
(
m
.
countryStats
.
addrs
))
m
.
logger
.
Println
(
"snowflake-idle-count"
,
binCount
(
m
.
proxyIdleCount
))
m
.
logger
.
Println
(
"client-denied-count"
,
binCount
(
m
.
clientDeniedCount
))
m
.
logger
.
Println
(
"client-snowflake-match-count"
,
binCount
(
m
.
clientProxyMatchCount
))
}
return
m
,
nil
// Restores all metrics to original values
func
(
m
*
Metrics
)
zeroMetrics
()
{
m
.
proxyIdleCount
=
0
m
.
clientDeniedCount
=
0
m
.
clientProxyMatchCount
=
0
m
.
countryStats
.
counts
=
make
(
map
[
string
]
int
)
m
.
countryStats
.
addrs
=
make
(
map
[
string
]
bool
)
}
// Rounds up a count to the nearest multiple of 8.
func
binCount
(
count
uint
)
uint
{
return
uint
((
math
.
Ceil
(
float64
(
count
)
/
8
))
*
8
)
}
This diff is collapsed.
Click to expand it.
broker/snowflake-broker_test.go
+
129
−
0
View file @
19244c71
...
...
@@ -11,6 +11,7 @@ import (
"net/http/httptest"
"os"
"testing"
"time"
)
func
NullLogger
()
*
log
.
Logger
{
...
...
@@ -390,3 +391,131 @@ func TestGeoip(t *testing.T) {
})
}
func
TestMetrics
(
t
*
testing
.
T
)
{
Convey
(
"Test metrics..."
,
t
,
func
()
{
done
:=
make
(
chan
bool
)
buf
:=
new
(
bytes
.
Buffer
)
ctx
:=
NewBrokerContext
(
log
.
New
(
buf
,
""
,
0
))
err
:=
ctx
.
metrics
.
LoadGeoipDatabases
(
"test_geoip"
,
"test_geoip6"
)
So
(
err
,
ShouldEqual
,
nil
)
//Test addition of proxy polls
Convey
(
"for proxy polls"
,
func
()
{
w
:=
httptest
.
NewRecorder
()
data
:=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
:=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/proxy"
,
data
)
r
.
Header
.
Set
(
"X-Session-ID"
,
"test"
)
r
.
RemoteAddr
=
"129.97.208.23:8888"
//CA geoip
So
(
err
,
ShouldBeNil
)
go
func
(
ctx
*
BrokerContext
)
{
proxyPolls
(
ctx
,
w
,
r
)
done
<-
true
}(
ctx
)
p
:=
<-
ctx
.
proxyPolls
//manually unblock poll
p
.
offerChannel
<-
nil
<-
done
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips CA=1
\n
snowflake-ips-total 1
\n
snowflake-idle-count 8
\n
client-denied-count 0
\n
client-snowflake-match-count 0
\n
"
)
})
//Test addition of client failures
Convey
(
"for no proxies available"
,
func
()
{
w
:=
httptest
.
NewRecorder
()
data
:=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
:=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/client"
,
data
)
So
(
err
,
ShouldBeNil
)
clientOffers
(
ctx
,
w
,
r
)
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips
\n
snowflake-ips-total 0
\n
snowflake-idle-count 0
\n
client-denied-count 8
\n
client-snowflake-match-count 0
\n
"
)
// Test reset
buf
.
Reset
()
ctx
.
metrics
.
zeroMetrics
()
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips
\n
snowflake-ips-total 0
\n
snowflake-idle-count 0
\n
client-denied-count 0
\n
client-snowflake-match-count 0
\n
"
)
})
//Test addition of client matches
Convey
(
"for client-proxy match"
,
func
()
{
w
:=
httptest
.
NewRecorder
()
data
:=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
:=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/client"
,
data
)
So
(
err
,
ShouldBeNil
)
// Prepare a fake proxy to respond with.
snowflake
:=
ctx
.
AddSnowflake
(
"fake"
)
go
func
()
{
clientOffers
(
ctx
,
w
,
r
)
done
<-
true
}()
offer
:=
<-
snowflake
.
offerChannel
So
(
offer
,
ShouldResemble
,
[]
byte
(
"test"
))
snowflake
.
answerChannel
<-
[]
byte
(
"fake answer"
)
<-
done
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips
\n
snowflake-ips-total 0
\n
snowflake-idle-count 0
\n
client-denied-count 0
\n
client-snowflake-match-count 8
\n
"
)
})
//Test rounding boundary
Convey
(
"binning boundary"
,
func
()
{
w
:=
httptest
.
NewRecorder
()
data
:=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
:=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/client"
,
data
)
So
(
err
,
ShouldBeNil
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
clientOffers
(
ctx
,
w
,
r
)
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips
\n
snowflake-ips-total 0
\n
snowflake-idle-count 0
\n
client-denied-count 8
\n
client-snowflake-match-count 0
\n
"
)
clientOffers
(
ctx
,
w
,
r
)
buf
.
Reset
()
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips
\n
snowflake-ips-total 0
\n
snowflake-idle-count 0
\n
client-denied-count 16
\n
client-snowflake-match-count 0
\n
"
)
})
//Test unique ip
Convey
(
"proxy counts by unique ip"
,
func
()
{
w
:=
httptest
.
NewRecorder
()
data
:=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
:=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/proxy"
,
data
)
r
.
Header
.
Set
(
"X-Session-ID"
,
"test"
)
r
.
RemoteAddr
=
"129.97.208.23:8888"
//CA geoip
So
(
err
,
ShouldBeNil
)
go
func
(
ctx
*
BrokerContext
)
{
proxyPolls
(
ctx
,
w
,
r
)
done
<-
true
}(
ctx
)
p
:=
<-
ctx
.
proxyPolls
//manually unblock poll
p
.
offerChannel
<-
nil
<-
done
data
=
bytes
.
NewReader
([]
byte
(
"test"
))
r
,
err
=
http
.
NewRequest
(
"POST"
,
"snowflake.broker/proxy"
,
data
)
r
.
Header
.
Set
(
"X-Session-ID"
,
"test"
)
r
.
RemoteAddr
=
"129.97.208.23:8888"
//CA geoip
go
func
(
ctx
*
BrokerContext
)
{
proxyPolls
(
ctx
,
w
,
r
)
done
<-
true
}(
ctx
)
p
=
<-
ctx
.
proxyPolls
//manually unblock poll
p
.
offerChannel
<-
nil
<-
done
ctx
.
metrics
.
printMetrics
()
So
(
buf
.
String
(),
ShouldResemble
,
"snowflake-stats-end "
+
time
.
Now
()
.
UTC
()
.
Format
(
"2006-01-02 15:04:05"
)
+
" (86400 s)
\n
snowflake-ips CA=1
\n
snowflake-ips-total 1
\n
snowflake-idle-count 8
\n
client-denied-count 0
\n
client-snowflake-match-count 0
\n
"
)
})
})
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment