Verified Commit 211254fa authored by shelikhoo's avatar shelikhoo
Browse files

Add distinct IP counter

parent 97dea533
package ipsetsink
import (
"crypto/hmac"
"hash"
"hash/crc64"
"github.com/clarkduvall/hyperloglog"
"golang.org/x/crypto/sha3"
)
func NewIPSetSink(maskingKey string) *IPSetSink {
countDistinct, _ := hyperloglog.NewPlus(18)
return &IPSetSink{
ipMaskingKey: maskingKey,
countDistinct: countDistinct,
}
}
type IPSetSink struct {
ipMaskingKey string
countDistinct *hyperloglog.HyperLogLogPlus
}
func (s *IPSetSink) maskIPAddress(ipAddress string) []byte {
hmacIPMasker := hmac.New(func() hash.Hash {
return sha3.New256()
}, []byte(s.ipMaskingKey))
hmacIPMasker.Write([]byte(ipAddress))
return hmacIPMasker.Sum(nil)
}
func (s *IPSetSink) AddIPToSet(ipAddress string) {
s.countDistinct.Add(crc64FromBytes{hashValue(s.maskIPAddress(ipAddress))})
}
func (s *IPSetSink) Dump() ([]byte, error) {
return s.countDistinct.GobEncode()
}
func (s *IPSetSink) Reset() {
s.countDistinct.Clear()
}
type hashValue []byte
type crc64FromBytes struct {
hashValue
}
func (c crc64FromBytes) Sum64() uint64 {
return crc64.Checksum(c.hashValue, crc64.MakeTable(crc64.ECMA))
}
package ipsetsink
import (
"fmt"
"github.com/clarkduvall/hyperloglog"
"testing"
)
import . "github.com/smartystreets/goconvey/convey"
func TestSinkInit(t *testing.T) {
Convey("Context", t, func() {
sink := NewIPSetSink("demo")
sink.AddIPToSet("test1")
sink.AddIPToSet("test2")
data, err := sink.Dump()
So(err, ShouldBeNil)
structure, err := hyperloglog.NewPlus(18)
So(err, ShouldBeNil)
err = structure.GobDecode(data)
So(err, ShouldBeNil)
count := structure.Count()
So(count, ShouldBeBetweenOrEqual, 1, 3)
})
}
func TestSinkCounting(t *testing.T) {
Convey("Context", t, func() {
for itemCount := 300; itemCount <= 10000; itemCount += 200 {
sink := NewIPSetSink("demo")
for i := 0; i <= itemCount; i++ {
sink.AddIPToSet(fmt.Sprintf("demo%v", i))
}
for i := 0; i <= itemCount; i++ {
sink.AddIPToSet(fmt.Sprintf("demo%v", i))
}
data, err := sink.Dump()
So(err, ShouldBeNil)
structure, err := hyperloglog.NewPlus(18)
So(err, ShouldBeNil)
err = structure.GobDecode(data)
So(err, ShouldBeNil)
count := structure.Count()
So((float64(count)/float64(itemCount))-1.0, ShouldAlmostEqual, 0, 0.01)
}
})
}
package sinkcluster
import "time"
type SinkEntry struct {
RecordingStart time.Time `json:"recordingStart"`
RecordingEnd time.Time `json:"recordingEnd"`
Recorded []byte `json:"recorded"`
}
package sinkcluster
import (
"bufio"
"encoding/json"
"github.com/clarkduvall/hyperloglog"
"io"
"time"
)
func NewClusterCounter(from time.Time, to time.Time) *ClusterCounter {
return &ClusterCounter{from: from, to: to}
}
type ClusterCounter struct {
from time.Time
to time.Time
}
type ClusterCountResult struct {
Sum uint64
ChunkIncluded int64
}
func (c ClusterCounter) Count(reader io.Reader) (*ClusterCountResult, error) {
result := ClusterCountResult{}
counter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
inputScanner := bufio.NewScanner(reader)
for inputScanner.Scan() {
inputLine := inputScanner.Bytes()
sinkInfo := SinkEntry{}
if err := json.Unmarshal(inputLine, &sinkInfo); err != nil {
return nil, err
}
if (sinkInfo.RecordingStart.Before(c.from) && !sinkInfo.RecordingStart.Equal(c.from)) ||
sinkInfo.RecordingEnd.After(c.to) {
continue
}
restoredCounter, err := hyperloglog.NewPlus(18)
if err != nil {
return nil, err
}
err = restoredCounter.GobDecode(sinkInfo.Recorded)
if err != nil {
return nil, err
}
result.ChunkIncluded++
err = counter.Merge(restoredCounter)
if err != nil {
return nil, err
}
}
result.Sum = counter.Count()
return &result, nil
}
package sinkcluster
import (
"bytes"
"encoding/json"
"io"
"log"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink"
)
func NewClusterWriter(writer WriteSyncer, writeInterval time.Duration, sink *ipsetsink.IPSetSink) *ClusterWriter {
c := &ClusterWriter{
writer: writer,
lastWriteTime: time.Now(),
writeInterval: writeInterval,
current: sink,
}
return c
}
type ClusterWriter struct {
writer WriteSyncer
lastWriteTime time.Time
writeInterval time.Duration
current *ipsetsink.IPSetSink
}
type WriteSyncer interface {
Sync() error
io.Writer
}
func (c *ClusterWriter) WriteIPSetToDisk() {
currentTime := time.Now()
data, err := c.current.Dump()
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
entry := &SinkEntry{
RecordingStart: c.lastWriteTime,
RecordingEnd: currentTime,
Recorded: data,
}
jsonData, err := json.Marshal(entry)
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
jsonData = append(jsonData, byte('\n'))
_, err = io.Copy(c.writer, bytes.NewReader(jsonData))
if err != nil {
log.Println("unable able to write ipset to file:", err)
return
}
c.writer.Sync()
c.lastWriteTime = currentTime
c.current.Reset()
}
func (c *ClusterWriter) AddIPToSet(ipAddress string) {
if c.lastWriteTime.Add(c.writeInterval).Before(time.Now()) {
c.WriteIPSetToDisk()
}
c.current.AddIPToSet(ipAddress)
}
package sinkcluster
import (
"bytes"
"io"
"testing"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink"
. "github.com/smartystreets/goconvey/convey"
)
type writerStub struct {
io.Writer
}
func (w writerStub) Sync() error {
return nil
}
func TestSinkWriter(t *testing.T) {
Convey("Context", t, func() {
buffer := bytes.NewBuffer(nil)
writerStubInst := &writerStub{buffer}
sink := ipsetsink.NewIPSetSink("demo")
clusterWriter := NewClusterWriter(writerStubInst, time.Minute, sink)
clusterWriter.AddIPToSet("1")
clusterWriter.WriteIPSetToDisk()
So(buffer.Bytes(), ShouldNotBeNil)
})
}
package main
import (
"flag"
"fmt"
"log"
"os"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/ipsetsink/sinkcluster"
)
func main() {
inputFile := flag.String("in", "", "")
start := flag.String("start", "", "")
end := flag.String("end", "", "")
flag.Parse()
startTime, err := time.Parse(time.UnixDate, *start)
if err != nil {
log.Fatal("unable to parse start time:", err)
}
endTime, err := time.Parse(time.UnixDate, *end)
if err != nil {
log.Fatal("unable to parse end time:", err)
}
fd, err := os.Open(*inputFile)
if err != nil {
log.Fatal("unable to open input file:", err)
}
counter := sinkcluster.NewClusterCounter(startTime, endTime)
result, err := counter.Count(fd)
if err != nil {
log.Fatal("unable to count:", err)
}
fmt.Printf("sum = %v\n", result.Sum)
fmt.Printf("chunkIncluded = %v\n", result.ChunkIncluded)
}
......@@ -4,6 +4,7 @@ go 1.13
require (
git.torproject.org/pluggable-transports/goptlib.git v1.1.0
github.com/clarkduvall/hyperloglog v0.0.0-20171127014514-a0107a5d8004 // indirect
github.com/gorilla/websocket v1.4.1
github.com/pion/ice/v2 v2.2.6
github.com/pion/sdp/v3 v3.0.5
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment