Commit 171c55a9 authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Implemented geoip lookups for the snowflake broker. This heavily based off of...

Implemented geoip lookups for the snowflake broker. This heavily based off of how tor maps IP addresses to country codes, and relies on the provided ipv4 and ipv6 files.
parent bf82ef51
......@@ -12,8 +12,12 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"golang.org/x/crypto/acme/autocert"
......@@ -36,11 +40,21 @@ type BrokerContext struct {
func NewBrokerContext() *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
metrics, err := NewMetrics()
if err != nil {
panic(err.Error())
}
if metrics == nil {
panic("Failed to create metrics")
}
return &BrokerContext{
snowflakes: snowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: new(Metrics),
metrics: metrics,
}
}
......@@ -203,6 +217,16 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
// Get proxy country stats
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {
ctx.metrics.UpdateCountryStats(remoteIP)
}
log.Println("Received answer: ", body)
snowflake.answerChannel <- body
}
......@@ -213,6 +237,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s += fmt.Sprintf("\nsnowflake %d: %s", snowflake.index, snowflake.id)
}
s += fmt.Sprintf("\n\nroundtrip avg: %d", ctx.metrics.clientRoundtripEstimate)
s += fmt.Sprintf("\n\nsnowflake country stats: %s", ctx.metrics.countryStats.Display())
w.Write([]byte(s))
}
......@@ -225,18 +250,31 @@ func main() {
var acmeEmail string
var acmeHostnamesCommas string
var addr string
var geoipDatabase string
var geoip6Database string
var disableTLS bool
var disableGeoip bool
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
flag.StringVar(&addr, "addr", ":443", "address to listen on")
flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.Parse()
log.SetFlags(log.LstdFlags | log.LUTC)
ctx := NewBrokerContext()
if !disableGeoip {
err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
if err != nil {
log.Fatal(err.Error())
}
}
go ctx.Broker()
http.HandleFunc("/robots.txt", robotsTxtHandler)
......@@ -251,6 +289,20 @@ func main() {
Addr: addr,
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
// go routine to handle a SIGHUP signal to allow the broker operator to send
// a SIGHUP signal when the geoip database files are updated, without requiring
// a restart of the broker
go func() {
for {
signal := <-sigChan
log.Println("Received signal:", signal, ". Reloading geoip databases.")
ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
}
}()
if acmeHostnamesCommas != "" {
acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
log.Printf("ACME hostnames: %q", acmeHostnames)
......
/*
This code is for loading database data that maps ip addresses to countries
for collecting and presenting statistics on snowflake use that might alert us
to censorship events.
The functions here are heavily based off of how tor maintains and searches their
geoip database
The tables used for geoip data must be structured as follows:
Recognized line format for IPv4 is:
INTIPLOW,INTIPHIGH,CC
where INTIPLOW and INTIPHIGH are IPv4 addresses encoded as big-endian 4-byte unsigned
integers, and CC is a country code.
Note that the IPv4 line format
"INTIPLOW","INTIPHIGH","CC","CC3","COUNTRY NAME"
is not currently supported.
Recognized line format for IPv6 is:
IPV6LOW,IPV6HIGH,CC
where IPV6LOW and IPV6HIGH are IPv6 addresses and CC is a country code.
It also recognizes, and skips over, blank lines and lines that start
with '#' (comments).
*/
package main
import (
"bufio"
"bytes"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"sort"
"strconv"
"strings"
"sync"
)
type GeoIPTable interface {
parseEntry(string) (*GeoIPEntry, error)
Len() int
Append(GeoIPEntry)
ElementAt(int) GeoIPEntry
Lock()
Unlock()
}
type GeoIPEntry struct {
ipLow net.IP
ipHigh net.IP
country string
}
type GeoIPv4Table struct {
table []GeoIPEntry
lock sync.Mutex // synchronization for geoip table accesses and reloads
}
type GeoIPv6Table struct {
table []GeoIPEntry
lock sync.Mutex // synchronization for geoip table accesses and reloads
}
func (table GeoIPv4Table) Len() int { return len(table.table) }
func (table GeoIPv6Table) Len() int { return len(table.table) }
func (table *GeoIPv4Table) Append(entry GeoIPEntry) {
(*table).table = append(table.table, entry)
}
func (table *GeoIPv6Table) Append(entry GeoIPEntry) {
(*table).table = append(table.table, entry)
}
func (table GeoIPv4Table) ElementAt(i int) GeoIPEntry { return table.table[i] }
func (table GeoIPv6Table) ElementAt(i int) GeoIPEntry { return table.table[i] }
func (table *GeoIPv4Table) Lock() { (*table).lock.Lock() }
func (table *GeoIPv6Table) Lock() { (*table).lock.Lock() }
func (table *GeoIPv4Table) Unlock() { (*table).lock.Unlock() }
func (table *GeoIPv6Table) Unlock() { (*table).lock.Unlock() }
// Convert a geoip IP address represented as a big-endian unsigned integer to net.IP
func geoipStringToIP(ipStr string) (net.IP, error) {
ip, err := strconv.ParseUint(ipStr, 10, 32)
if err != nil {
return net.IPv4(0, 0, 0, 0), fmt.Errorf("Error parsing IP %s", ipStr)
}
var bytes [4]byte
bytes[0] = byte(ip & 0xFF)
bytes[1] = byte((ip >> 8) & 0xFF)
bytes[2] = byte((ip >> 16) & 0xFF)
bytes[3] = byte((ip >> 24) & 0xFF)
return net.IPv4(bytes[3], bytes[2], bytes[1], bytes[0]), nil
}
//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv4Table) parseEntry(candidate string) (*GeoIPEntry, error) {
if candidate[0] == '#' {
return nil, nil
}
parsedCandidate := strings.Split(candidate, ",")
if len(parsedCandidate) != 3 {
return nil, fmt.Errorf("Provided geoip file is incorrectly formatted. Could not parse line:\n%s", parsedCandidate)
}
low, err := geoipStringToIP(parsedCandidate[0])
if err != nil {
return nil, err
}
high, err := geoipStringToIP(parsedCandidate[1])
if err != nil {
return nil, err
}
geoipEntry := &GeoIPEntry{
ipLow: low,
ipHigh: high,
country: parsedCandidate[2],
}
return geoipEntry, nil
}
//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv6Table) parseEntry(candidate string) (*GeoIPEntry, error) {
if candidate[0] == '#' {
return nil, nil
}
parsedCandidate := strings.Split(candidate, ",")
if len(parsedCandidate) != 3 {
return nil, fmt.Errorf("")
}
low := net.ParseIP(parsedCandidate[0])
if low == nil {
return nil, fmt.Errorf("")
}
high := net.ParseIP(parsedCandidate[1])
if high == nil {
return nil, fmt.Errorf("")
}
geoipEntry := &GeoIPEntry{
ipLow: low,
ipHigh: high,
country: parsedCandidate[2],
}
return geoipEntry, nil
}
//Loads provided geoip file into our tables
//Entries are stored in a table
func GeoIPLoadFile(table GeoIPTable, pathname string) error {
//open file
geoipFile, err := os.Open(pathname)
if err != nil {
return err
}
defer geoipFile.Close()
hash := sha1.New()
table.Lock()
defer table.Unlock()
hashedFile := io.TeeReader(geoipFile, hash)
//read in strings and call parse function
scanner := bufio.NewScanner(hashedFile)
for scanner.Scan() {
entry, err := table.parseEntry(scanner.Text())
if err != nil {
return fmt.Errorf("Provided geoip file is incorrectly formatted. Line is: %+q", scanner.Text())
}
if entry != nil {
table.Append(*entry)
}
}
if err := scanner.Err(); err != nil {
return err
}
sha1Hash := hex.EncodeToString(hash.Sum(nil))
log.Println("Using geoip file ", pathname, " with checksum", sha1Hash)
log.Println("Loaded ", table.Len(), " entries into table")
return nil
}
//Returns the country location of an IPv4 or IPv6 address.
func GetCountryByAddr(table GeoIPTable, ip net.IP) (string, bool) {
table.Lock()
defer table.Unlock()
//look IP up in database
index := sort.Search(table.Len(), func(i int) bool {
entry := table.ElementAt(i)
return (bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0)
})
if index == table.Len() {
return "", false
}
// check to see if addr is in the range specified by the returned index
// search on IPs in invalid ranges (e.g., 127.0.0.0/8) will return the
//country code of the next highest range
entry := table.ElementAt(index)
if !(bytes.Compare(ip.To16(), entry.ipLow.To16()) >= 0 &&
bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0) {
return "", false
}
return table.ElementAt(index).country, true
}
......@@ -2,16 +2,118 @@ package main
import (
// "golang.org/x/net/internal/timeseries"
"fmt"
"log"
"net"
"os"
"sync"
"time"
)
var (
once sync.Once
)
type CountryStats struct {
counts map[string]int
}
// Implements Observable
type Metrics struct {
tablev4 *GeoIPv4Table
tablev6 *GeoIPv6Table
countryStats CountryStats
// snowflakes timeseries.Float
clientRoundtripEstimate time.Duration
}
func NewMetrics() *Metrics {
func (s CountryStats) Display() string {
return fmt.Sprintln(s.counts)
}
func (m *Metrics) UpdateCountryStats(addr string) {
var country string
var ok bool
ip := net.ParseIP(addr)
if ip.To4() != nil {
//This is an IPv4 address
if m.tablev4 == nil {
return
}
country, ok = GetCountryByAddr(m.tablev4, ip)
} else {
if m.tablev6 == nil {
return
}
country, ok = GetCountryByAddr(m.tablev6, ip)
}
if !ok {
country = "??"
log.Println("Unknown geoip")
}
//update map of countries and counts
if country != "" {
m.countryStats.counts[country]++
}
return
}
func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
// Load geoip databases
log.Println("Loading geoip databases")
tablev4 := new(GeoIPv4Table)
err := GeoIPLoadFile(tablev4, geoipDB)
if err != nil {
m.tablev4 = nil
return err
} else {
m.tablev4 = tablev4
}
tablev6 := new(GeoIPv6Table)
err = GeoIPLoadFile(tablev6, geoip6DB)
if err != nil {
m.tablev6 = nil
return err
} else {
m.tablev6 = tablev6
}
return nil
}
func NewMetrics() (*Metrics, error) {
m := new(Metrics)
return m
f, err := os.OpenFile("metrics.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
metricsLogger := log.New(f, "", log.LstdFlags|log.LUTC)
m.countryStats = CountryStats{
counts: make(map[string]int),
}
// Write to log file every hour with updated metrics
go once.Do(func() {
heartbeat := time.Tick(time.Hour)
for range heartbeat {
metricsLogger.Println("Country stats: ", m.countryStats.Display())
//restore all metrics to original values
m.countryStats.counts = make(map[string]int)
}
})
return m, nil
}
......@@ -4,6 +4,7 @@ import (
"bytes"
"container/heap"
. "github.com/smartystreets/goconvey/convey"
"net"
"net/http"
"net/http/httptest"
"testing"
......@@ -268,3 +269,105 @@ func TestSnowflakeHeap(t *testing.T) {
So(r.index, ShouldEqual, -1)
})
}
func TestGeoip(t *testing.T) {
Convey("Geoip", t, func() {
tv4 := new(GeoIPv4Table)
err := GeoIPLoadFile(tv4, "test_geoip")
So(err, ShouldEqual, nil)
tv6 := new(GeoIPv6Table)
err = GeoIPLoadFile(tv6, "test_geoip6")
So(err, ShouldEqual, nil)
Convey("IPv4 Country Mapping Tests", func() {
for _, test := range []struct {
addr, cc string
ok bool
}{
{
"129.97.208.23", //uwaterloo
"CA",
true,
},
{
"127.0.0.1",
"",
false,
},
{
"255.255.255.255",
"",
false,
},
{
"0.0.0.0",
"",
false,
},
{
"223.252.127.255", //test high end of range
"JP",
true,
},
{
"223.252.127.255", //test low end of range
"JP",
true,
},
} {
country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr))
So(country, ShouldEqual, test.cc)
So(ok, ShouldResemble, test.ok)
}
})
Convey("IPv6 Country Mapping Tests", func() {
for _, test := range []struct {
addr, cc string
ok bool
}{
{
"2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo
"CA",
true,
},
{
"fd00:0:0:0:0:0:0:1",
"",
false,
},
{
"0:0:0:0:0:0:0:0",
"",
false,
},
{
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"",
false,
},
{
"2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range
"FR",
true,
},
{
"2a07:2e40::", //test low end of range
"FR",
true,
},
} {
country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr))
So(country, ShouldEqual, test.cc)
So(ok, ShouldResemble, test.ok)
}
})
// Make sure things behave properly if geoip file fails to load
ctx := NewBrokerContext()
ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6")
ctx.metrics.UpdateCountryStats("127.0.0.1")
So(ctx.metrics.tablev4, ShouldEqual, nil)
})
}
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment