Commit 6b002c5f authored by Cecylia Bocovich's avatar Cecylia Bocovich 💬
Browse files

Merge branch 'geoip_squashed'

parents 1133e013 ba4fe1a7
Loading
Loading
Loading
Loading
+69 −4
Original line number Diff line number Diff line
@@ -13,9 +13,12 @@ import (
	"io"
	"io/ioutil"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
@@ -36,14 +39,24 @@ type BrokerContext struct {
	metrics       *Metrics
}

func NewBrokerContext() *BrokerContext {
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
	snowflakes := new(SnowflakeHeap)
	heap.Init(snowflakes)
	metrics, err := NewMetrics(metricsLogger)

	if err != nil {
		panic(err.Error())
	}

	if metrics == nil {
		panic("Failed to create metrics")
	}

	return &BrokerContext{
		snowflakes:    snowflakes,
		idToSnowflake: make(map[string]*Snowflake),
		proxyPolls:    make(chan *ProxyPoll),
		metrics:       new(Metrics),
		metrics:       metrics,
	}
}

@@ -206,6 +219,16 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	// Get proxy country stats
	remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
	if err != nil {
		log.Println("Error processing proxy IP: ", err.Error())
	} else {

		ctx.metrics.UpdateCountryStats(remoteIP)
	}

	log.Println("Received answer.")
	snowflake.answerChannel <- body
}
@@ -228,24 +251,53 @@ func main() {
	var acmeEmail string
	var acmeHostnamesCommas string
	var addr string
	var geoipDatabase string
	var geoip6Database string
	var disableTLS bool
	var certFilename, keyFilename string
	var disableGeoip bool
	var metricsFilename string

	flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
	flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
	flag.StringVar(&certFilename, "cert", "", "TLS certificate file")
	flag.StringVar(&keyFilename, "key", "", "TLS private key file")
	flag.StringVar(&addr, "addr", ":443", "address to listen on")
	flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
	flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
	flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
	flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
	flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
	flag.Parse()

	var err error
	var metricsFile io.Writer = os.Stdout
	var logOutput io.Writer = os.Stderr
	//We want to send the log output through our scrubber first
	log.SetOutput(&safelog.LogScrubber{Output: logOutput})

	log.SetFlags(log.LstdFlags | log.LUTC)

	ctx := NewBrokerContext()
	if metricsFilename != "" {
		metricsFile, err = os.OpenFile(metricsFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)

		if err != nil {
			log.Fatal(err.Error())
		}
	} else {
		metricsFile = os.Stdout
	}

	metricsLogger := log.New(metricsFile, "", log.LstdFlags|log.LUTC)

	ctx := NewBrokerContext(metricsLogger)

	if !disableGeoip {
		err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
		if err != nil {
			log.Fatal(err.Error())
		}
	}

	go ctx.Broker()

@@ -256,11 +308,24 @@ func main() {
	http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
	http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})

	var err error
	server := http.Server{
		Addr: addr,
	}

	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGHUP)

	// go routine to handle a SIGHUP signal to allow the broker operator to send
	// a SIGHUP signal when the geoip database files are updated, without requiring
	// a restart of the broker
	go func() {
		for {
			signal := <-sigChan
			log.Println("Received signal:", signal, ". Reloading geoip databases.")
			ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
		}
	}()

	// Handle the various ways of setting up TLS. The legal configurations
	// are:
	//   --acme-hostnames (with optional --acme-email)

broker/geoip.go

0 → 100644
+240 −0
Original line number Diff line number Diff line
/*
This code is for loading database data that maps ip addresses to countries
for collecting and presenting statistics on snowflake use that might alert us
to censorship events.

The functions here are heavily based off of how tor maintains and searches their
geoip database

The tables used for geoip data must be structured as follows:

Recognized line format for IPv4 is:
    INTIPLOW,INTIPHIGH,CC
        where INTIPLOW and INTIPHIGH are IPv4 addresses encoded as big-endian 4-byte unsigned
        integers, and CC is a country code.

Note that the IPv4 line format
    "INTIPLOW","INTIPHIGH","CC","CC3","COUNTRY NAME"
is not currently supported.

Recognized line format for IPv6 is:
    IPV6LOW,IPV6HIGH,CC
        where IPV6LOW and IPV6HIGH are IPv6 addresses and CC is a country code.

It also recognizes, and skips over, blank lines and lines that start
with '#' (comments).

*/
package main

import (
	"bufio"
	"bytes"
	"crypto/sha1"
	"encoding/hex"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"sort"
	"strconv"
	"strings"
	"sync"
)

type GeoIPTable interface {
	parseEntry(string) (*GeoIPEntry, error)
	Len() int
	Append(GeoIPEntry)
	ElementAt(int) GeoIPEntry
	Lock()
	Unlock()
}

type GeoIPEntry struct {
	ipLow   net.IP
	ipHigh  net.IP
	country string
}

type GeoIPv4Table struct {
	table []GeoIPEntry

	lock sync.Mutex // synchronization for geoip table accesses and reloads
}

type GeoIPv6Table struct {
	table []GeoIPEntry

	lock sync.Mutex // synchronization for geoip table accesses and reloads
}

func (table GeoIPv4Table) Len() int { return len(table.table) }
func (table GeoIPv6Table) Len() int { return len(table.table) }

func (table *GeoIPv4Table) Append(entry GeoIPEntry) {
	(*table).table = append(table.table, entry)
}
func (table *GeoIPv6Table) Append(entry GeoIPEntry) {
	(*table).table = append(table.table, entry)
}

func (table GeoIPv4Table) ElementAt(i int) GeoIPEntry { return table.table[i] }
func (table GeoIPv6Table) ElementAt(i int) GeoIPEntry { return table.table[i] }

func (table *GeoIPv4Table) Lock() { (*table).lock.Lock() }
func (table *GeoIPv6Table) Lock() { (*table).lock.Lock() }

func (table *GeoIPv4Table) Unlock() { (*table).lock.Unlock() }
func (table *GeoIPv6Table) Unlock() { (*table).lock.Unlock() }

// Convert a geoip IP address represented as a big-endian unsigned integer to net.IP
func geoipStringToIP(ipStr string) (net.IP, error) {
	ip, err := strconv.ParseUint(ipStr, 10, 32)
	if err != nil {
		return net.IPv4(0, 0, 0, 0), fmt.Errorf("Error parsing IP %s", ipStr)
	}
	var bytes [4]byte
	bytes[0] = byte(ip & 0xFF)
	bytes[1] = byte((ip >> 8) & 0xFF)
	bytes[2] = byte((ip >> 16) & 0xFF)
	bytes[3] = byte((ip >> 24) & 0xFF)

	return net.IPv4(bytes[3], bytes[2], bytes[1], bytes[0]), nil
}

//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv4Table) parseEntry(candidate string) (*GeoIPEntry, error) {

	if candidate[0] == '#' {
		return nil, nil
	}

	parsedCandidate := strings.Split(candidate, ",")

	if len(parsedCandidate) != 3 {
		return nil, fmt.Errorf("Provided geoip file is incorrectly formatted. Could not parse line:\n%s", parsedCandidate)
	}

	low, err := geoipStringToIP(parsedCandidate[0])
	if err != nil {
		return nil, err
	}
	high, err := geoipStringToIP(parsedCandidate[1])
	if err != nil {
		return nil, err
	}

	geoipEntry := &GeoIPEntry{
		ipLow:   low,
		ipHigh:  high,
		country: parsedCandidate[2],
	}

	return geoipEntry, nil
}

//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv6Table) parseEntry(candidate string) (*GeoIPEntry, error) {

	if candidate[0] == '#' {
		return nil, nil
	}

	parsedCandidate := strings.Split(candidate, ",")

	if len(parsedCandidate) != 3 {
		return nil, fmt.Errorf("")
	}

	low := net.ParseIP(parsedCandidate[0])
	if low == nil {
		return nil, fmt.Errorf("")
	}
	high := net.ParseIP(parsedCandidate[1])
	if high == nil {
		return nil, fmt.Errorf("")
	}

	geoipEntry := &GeoIPEntry{
		ipLow:   low,
		ipHigh:  high,
		country: parsedCandidate[2],
	}

	return geoipEntry, nil
}

//Loads provided geoip file into our tables
//Entries are stored in a table
func GeoIPLoadFile(table GeoIPTable, pathname string) error {
	//open file
	geoipFile, err := os.Open(pathname)
	if err != nil {
		return err
	}
	defer geoipFile.Close()

	hash := sha1.New()

	table.Lock()
	defer table.Unlock()

	hashedFile := io.TeeReader(geoipFile, hash)

	//read in strings and call parse function
	scanner := bufio.NewScanner(hashedFile)
	for scanner.Scan() {
		entry, err := table.parseEntry(scanner.Text())
		if err != nil {
			return fmt.Errorf("Provided geoip file is incorrectly formatted. Line is: %+q", scanner.Text())
		}

		if entry != nil {
			table.Append(*entry)
		}

	}
	if err := scanner.Err(); err != nil {
		return err
	}

	sha1Hash := hex.EncodeToString(hash.Sum(nil))
	log.Println("Using geoip file ", pathname, " with checksum", sha1Hash)
	log.Println("Loaded ", table.Len(), " entries into table")

	return nil
}

//Returns the country location of an IPv4 or IPv6 address, and a boolean value
//that indicates whether the IP address was present in the geoip database
func GetCountryByAddr(table GeoIPTable, ip net.IP) (string, bool) {

	table.Lock()
	defer table.Unlock()

	//look IP up in database
	index := sort.Search(table.Len(), func(i int) bool {
		entry := table.ElementAt(i)
		return (bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0)
	})

	if index == table.Len() {
		return "", false
	}

	// check to see if addr is in the range specified by the returned index
	// search on IPs in invalid ranges (e.g., 127.0.0.0/8) will return the
	//country code of the next highest range
	entry := table.ElementAt(index)
	if !(bytes.Compare(ip.To16(), entry.ipLow.To16()) >= 0 &&
		bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0) {
		return "", false
	}

	return table.ElementAt(index).country, true

}
+93 −2
Original line number Diff line number Diff line
@@ -2,16 +2,107 @@ package main

import (
	// "golang.org/x/net/internal/timeseries"
	"fmt"
	"log"
	"net"
	"sync"
	"time"
)

var (
	once sync.Once
)

type CountryStats struct {
	counts map[string]int
}

// Implements Observable
type Metrics struct {
	tablev4      *GeoIPv4Table
	tablev6      *GeoIPv6Table
	countryStats CountryStats
	// snowflakes	timeseries.Float
	clientRoundtripEstimate time.Duration
}

func NewMetrics() *Metrics {
func (s CountryStats) Display() string {
	return fmt.Sprint(s.counts)
}

func (m *Metrics) UpdateCountryStats(addr string) {

	var country string
	var ok bool

	ip := net.ParseIP(addr)
	if ip.To4() != nil {
		//This is an IPv4 address
		if m.tablev4 == nil {
			return
		}
		country, ok = GetCountryByAddr(m.tablev4, ip)
	} else {
		if m.tablev6 == nil {
			return
		}
		country, ok = GetCountryByAddr(m.tablev6, ip)
	}

	if !ok {
		country = "??"
		log.Println("Unknown geoip")
	}

	//update map of countries and counts
	m.countryStats.counts[country]++

	return
}

func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {

	// Load geoip databases
	log.Println("Loading geoip databases")
	tablev4 := new(GeoIPv4Table)
	err := GeoIPLoadFile(tablev4, geoipDB)
	if err != nil {
		m.tablev4 = nil
		return err
	} else {
		m.tablev4 = tablev4
	}

	tablev6 := new(GeoIPv6Table)
	err = GeoIPLoadFile(tablev6, geoip6DB)
	if err != nil {
		m.tablev6 = nil
		return err
	} else {
		m.tablev6 = tablev6
	}

	return nil
}

func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
	m := new(Metrics)
	return m

	m.countryStats = CountryStats{
		counts: make(map[string]int),
	}

	// Write to log file every hour with updated metrics
	go once.Do(func() {
		heartbeat := time.Tick(time.Hour)
		for range heartbeat {
			metricsLogger.Println("Country stats: ", m.countryStats.Display())

			//restore all metrics to original values
			m.countryStats.counts = make(map[string]int)

		}
	})

	return m, nil
}
+103 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import (
	"bytes"
	"container/heap"
	. "github.com/smartystreets/goconvey/convey"
	"net"
	"net/http"
	"net/http/httptest"
	"testing"
@@ -268,3 +269,105 @@ func TestSnowflakeHeap(t *testing.T) {
		So(r.index, ShouldEqual, -1)
	})
}

func TestGeoip(t *testing.T) {
	Convey("Geoip", t, func() {
		tv4 := new(GeoIPv4Table)
		err := GeoIPLoadFile(tv4, "test_geoip")
		So(err, ShouldEqual, nil)
		tv6 := new(GeoIPv6Table)
		err = GeoIPLoadFile(tv6, "test_geoip6")
		So(err, ShouldEqual, nil)

		Convey("IPv4 Country Mapping Tests", func() {
			for _, test := range []struct {
				addr, cc string
				ok       bool
			}{
				{
					"129.97.208.23", //uwaterloo
					"CA",
					true,
				},
				{
					"127.0.0.1",
					"",
					false,
				},
				{
					"255.255.255.255",
					"",
					false,
				},
				{
					"0.0.0.0",
					"",
					false,
				},
				{
					"223.252.127.255", //test high end of range
					"JP",
					true,
				},
				{
					"223.252.127.255", //test low end of range
					"JP",
					true,
				},
			} {
				country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr))
				So(country, ShouldEqual, test.cc)
				So(ok, ShouldResemble, test.ok)
			}
		})

		Convey("IPv6 Country Mapping Tests", func() {
			for _, test := range []struct {
				addr, cc string
				ok       bool
			}{
				{
					"2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo
					"CA",
					true,
				},
				{
					"fd00:0:0:0:0:0:0:1",
					"",
					false,
				},
				{
					"0:0:0:0:0:0:0:0",
					"",
					false,
				},
				{
					"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
					"",
					false,
				},
				{
					"2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range
					"FR",
					true,
				},
				{
					"2a07:2e40::", //test low end of range
					"FR",
					true,
				},
			} {
				country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr))
				So(country, ShouldEqual, test.cc)
				So(ok, ShouldResemble, test.ok)
			}
		})

		// Make sure things behave properly if geoip file fails to load
		ctx := NewBrokerContext()
		ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6")
		ctx.metrics.UpdateCountryStats("127.0.0.1")
		So(ctx.metrics.tablev4, ShouldEqual, nil)

	})
}

broker/test_geoip

0 → 100644
+1236 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading