Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tpo/anti-censorship/pluggable-transports/snowflake-webext
  • cohosh/snowflake-webext
  • meskio/snowflake-webext
  • shelikhoo/snowflake-webext
  • WofWca/snowflake-webext
  • emmapeel/snowflake-webext
  • gus/snowflake-webext
  • luciole/snowflake-webext
  • devilkiller-ag/snowflake-webext
  • dcf/snowflake-webext
  • lavamind/snowflake-webext
11 results
Show changes
Commits on Source (361)
build/
build-webext/
test/
webext/snowflake.js
snowflake-library.js
# FIXME: Whittle these away
spec/
......
......@@ -8,6 +8,10 @@
"indent": ["error", 2, {
"SwitchCase": 1,
"MemberExpression": 0
}]
}],
"semi": "error"
},
"parserOptions": {
"ecmaVersion": 8
}
}
......@@ -3,22 +3,11 @@
*.swn
*.swm
.DS_Store
datadir/
broker/broker
client/client
server-webrtc/server-webrtc
server/server
proxy-go/proxy-go
snowflake.log
proxy/test
proxy/build
proxy/node_modules
proxy/spec/support
proxy/webext/snowflake.js
proxy/webext/popup.js
proxy/webext/embed.html
proxy/webext/embed.css
proxy/webext/assets/
proxy/webext/_locales/
test
build
/build-webext
node_modules
snowflake-library.js
spec/support
ignore/
npm-debug.log
image: golang:1.10-stretch
---
variables:
SITE_URL: embed-snowflake.torproject.org
STAGING_URL: embed-snowflake.staging.torproject.net
SUBDIR: build/
SKIP_REVIEW_APPS: 1
cache:
paths:
- .gradle/wrapper
- .gradle/caches
include:
- project: tpo/tpa/ci-templates
file: scripts/apt.yml
- project: tpo/tpa/ci-templates
file: static-shim-deploy.yml
before_script:
# Create symbolic links under $GOPATH, this is needed for local build
- export src=$GOPATH/src
- mkdir -p $src/git.torproject.org/pluggable-transports
- mkdir -p $src/gitlab.com/$CI_PROJECT_NAMESPACE
- ln -s $CI_PROJECT_DIR $src/git.torproject.org/pluggable-transports/snowflake.git
- ln -s $CI_PROJECT_DIR $src/gitlab.com/$CI_PROJECT_PATH
stages:
- test
- build
- deploy
.npm-job:
image: containers.torproject.org/tpo/tpa/base-images/node:stable
interruptible: true
cache: # Cache modules in between jobs
key: $CI_COMMIT_REF_SLUG
paths:
- .npm/
before_script:
- npm ci --cache .npm --prefer-offline
lint:
extends: .npm-job
stage: test
script:
- npm run lint
test:
extends: .npm-job
stage: test
script:
- npm run test
build:
extends: .npm-job
stage: build
artifacts:
paths:
- build/
exclude:
- build/bootstrap.css
- build/index.css
- build/index.js
script:
- apt-get -qy update
- apt-get -qy install libx11-dev
- cd $src/gitlab.com/$CI_PROJECT_PATH/client
- go get ./...
- go build ./...
- go vet ./...
- go test -v -race ./...
- !reference [.apt-init]
- apt install -yqq git
- npm run build
- sed -i '/mv3messaging.js/d' build/embed.html
after_script:
# this file changes every time but should not be cached
- rm -f $GRADLE_USER_HOME/caches/modules-2/modules-2.lock
- rm -fr $GRADLE_USER_HOME/caches/*/plugin-resolution/
# never deploy this project to gitlab pages
.pages-deploy-rules:
- when: never
[submodule "proxy/translation"]
path = proxy/translation
url = https://git.torproject.org/translation.git
branch = snowflakeaddon-messages.json_completed
path = translation
url = https://gitlab.torproject.org/tpo/translation.git
branch = snowflake
language: go
language: node_js
dist: xenial
go_import_path: git.torproject.org/pluggable-transports/snowflake.git
addons:
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-5
- gcc-5
node_js:
- 10
go:
- 1.10.x
env:
- TRAVIS_NODE_VERSION="8" CC="gcc-5" CXX="g++-5"
before_install:
- nvm install $TRAVIS_NODE_VERSION
dist: xenial
install:
- go get -u github.com/smartystreets/goconvey
- go get -u github.com/keroserene/go-webrtc
- go get -u github.com/pion/webrtc
- go get -u github.com/dchest/uniuri
- go get -u git.torproject.org/pluggable-transports/goptlib.git
- go get -u git.torproject.org/pluggable-transports/websocket.git/websocket
- go get -u google.golang.org/appengine
- go get -u golang.org/x/crypto/acme/autocert
- go get -u golang.org/x/net/http2
- pushd proxy
- npm install
- popd
script:
- test -z "$(go fmt ./...)"
- go vet ./...
- go test -v -race ./...
- cd proxy
- npm run lint
- npm test
- When editing Go, please run `go fmt` before every commit.
- You may run tests locally with either `go test` or `npm test` for Go and
JavaScript, respectively.
- You may run tests locally with `npm test`
......@@ -3,7 +3,8 @@
================================================================================
Copyright (c) 2016, Serene Han, Arlo Breault
All rights reserved.
Copyright (c) 2019-2020, The Tor Project, Inc
Copyright (c) 2022, WofWca <wofwca@protonmail.com>
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
......
# Snowflake
[![Build Status](https://travis-ci.org/keroserene/snowflake.svg?branch=master)](https://travis-ci.org/keroserene/snowflake)
This is the browser proxy component of Snowflake.
Pluggable Transport using WebRTC, inspired by Flashproxy.
### Building the webextension
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents**
1.
```
npm install
```
2.
* For Gecko (e.g. Firefox):
- [Usage](#usage)
- [Dependencies](#dependencies)
- [More Info](#more-info)
- [Building](#building)
- [Test Environment](#test-environment)
- [FAQ](#faq)
- [Appendix](#appendix)
- [-- Testing directly via WebRTC Server --](#---testing-directly-via-webrtc-server---)
```bash
npm run webext gecko
```
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
* For Chromium (e.g. Chrome, Edge)
### Usage
```bash
npm run webext chromium
```
and then load the `build-webext/` directory as an unpacked extension.
* https://firefox-source-docs.mozilla.org/devtools-user/about_colon_debugging/index.html#loading-a-temporary-extension
* https://developer.chrome.com/docs/extensions/get-started/tutorial/hello-world#load-unpacked
if using `git`, be sure to first initialize the translations submodule before building:
```
cd client/
go get
go build
tor -f torrc
git submodule update --init
```
This should start the client plugin, bootstrapping to 100% using WebRTC.
#### Dependencies
### Embedding
Client:
- [pion/webrtc](https://github.com/pion/webrtc)
- Go 1.10+
See https://snowflake.torproject.org/ for more info:
```
<iframe src="https://snowflake.torproject.org/embed.html" width="320" height="240" frameborder="0" scrolling="no"></iframe>
```
Proxy:
- JavaScript
### Building the badge / snowflake.torproject.org
---
```
npm install
npm run build
```
#### More Info
which outputs to the `build/` directory.
Tor can plug in the Snowflake client via a correctly configured `torrc`.
For example:
### Testing
Unit testing with Jasmine are available with:
```
npm install
npm test
```
ClientTransportPlugin snowflake exec ./client \
-url https://snowflake-broker.azureedge.net/ \
-front ajax.aspnetcdn.com \
-ice stun:stun.l.google.com:19302
-max 3
To run locally, first build it with:
```
npm run build
```
Then start an HTTP server in `build/` and navigate to `/embed.html`.
The flags `-url` and `-front` allow the Snowflake client to speak to the Broker,
in order to get connected with some volunteer's browser proxy. `-ice` is a
comma-separated list of ICE servers, which are required for NAT traversal.
### Preparing to deploy
For logging, run `tail -F snowflake.log` in a second terminal.
Background information:
* https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/23947#note_2591838
* https://help.torproject.org/tsa/doc/static-sites/
* https://help.torproject.org/tsa/doc/ssh-jump-host/
You can modify the `torrc` to use your own broker:
You need to be in LDAP group "snowflake" and have set up an SSH key with your LDAP account.
In your ~/.ssh/config file, you should have something like:
```
ClientTransportPlugin snowflake exec ./client --meek
Host staticiforme
HostName staticiforme.torproject.org
User <your user name>
ProxyJump people.torproject.org
IdentityFile ~/.ssh/tor
```
### Deploying
#### Building
```
npm install
npm run build
```
Do a "dry run" rsync with `-n` to check that only expected files are being changed. If you don't understand why a file would be updated, you can add the `-i` option to see the reason.
This describes how to build the in-browser snowflake. For the client, see Usage,
above.
```
rsync -n --chown=:snowflake --chmod ug=rw,D+x --perms --delete -crv build/ staticiforme:/srv/snowflake.torproject.org/htdocs/
```
The client will only work if there are browser snowflakes available.
To run your own:
If it looks good, then repeat the rsync without `-n`.
```
cd proxy/
npm run build
rsync --chown=:snowflake --chmod ug=rw,D+x --perms --delete -crv build/ staticiforme:/srv/snowflake.torproject.org/htdocs/
```
Then, start a local http server in the `proxy/build/` in any way you like.
For instance:
You can ignore errors of the form `rsync: failed to set permissions on "<dirname>/": Operation not permitted (1)`.
Then run the command to copy the new files to the live web servers:
```
cd build/
python -m http.server
ssh staticiforme 'static-update-component snowflake.torproject.org'
```
Then, open a browser tab to `http://127.0.0.1:8000/embed.html` to view
the debug-console of the snowflake.,
So long as that tab is open, you are an ephemeral Tor bridge.
### Publishing
Making a new release involves updating a few places,
#### Test Environment
1. Uploading the webextension to the Firefox Add-ons and Chrome Web Store
2. Publishing the new version to the npm repository
3. Deploying the badge to snowflake.torproject.org
There is a Docker-based test environment at https://github.com/cohosh/snowbox.
The following is a rough guide to getting that done:
```
# Clean things up
npm run clean
# Maybe check what's left behind
git clean -n -d -x
# Be sure that website.json and messages.json in translation/en/ have been
# populated with any new strings that may have been merged in the recent
# patches. It may take some time for weblate to have updated. You can
# check with the following,
git submodule update --remote
### FAQ
# We manage the translations we include manually. These translations are
# tracked separately for the website and the badge. Visit weblate to see
# the percentage completion for each language and update `websiteLangs`
# and `badgeLangs` in `make.js` accordingly. The links for weblate are:
# https://hosted.weblate.org/projects/tor/snowflake-badge/
# https://hosted.weblate.org/projects/tor/snowflake-website/
**Q: How does it work?**
# But note that it's also run as part of the "pack-webext" script, so return
# it to previously committed state,
git submodule update
In the Tor use-case:
# Bump and pack the webext, where "x.y.z" is the version being released
npm run pack-webext x.y.z
1. Volunteers visit websites which host the "snowflake" proxy. (just
like flashproxy)
2. Tor clients automatically find available browser proxies via the Broker
(the domain fronted signaling channel).
3. Tor client and browser proxy establish a WebRTC peer connection.
4. Proxy connects to some relay.
5. Tor occurs.
# Push the bump commit and tags
git push origin main
git push origin --tags
More detailed information about how clients, snowflake proxies, and the Broker
fit together on the way...
# Upload the generated build-webext-chromium.zip, build-webext-gecko.zip (and source.zip) to the webextension stores, respectively,
# 1. https://chrome.google.com/webstore/devconsole/
# 2. https://addons.mozilla.org/en-US/developers/addon/torproject-snowflake/versions/submit/
**Q: What are the benefits of this PT compared with other PTs?**
# Note: Mozilla's add-on store has a very robust review process. Make sure to copy
# the build instructions in the notes to reviewers. These should be:
# - npm install
# - npm run webext gecko
Snowflake combines the advantages of flashproxy and meek. Primarily:
# This time, really clean, because we don't want any extraneous files uploaded
git clean -f -d -x
- It has the convenience of Meek, but can support magnitudes more
users with negligible CDN costs. (Domain fronting is only used for brief
signalling / NAT-piercing to setup the P2P WebRTC DataChannels which handle
the actual traffic.)
# Send it off to npm
npm publish
- Arbitrarily high numbers of volunteer proxies are possible like in
flashproxy, but NATs are no longer a usability barrier - no need for
manual port forwarding!
# Clean things up
npm run clean
# From here on out, follow the "Deploying" section of the README
```
**Q: Why is this called Snowflake?**
### Parameters
It utilizes the "ICE" negotiation via WebRTC, and also involves a great
abundance of ephemeral and short-lived (and special!) volunteer proxies...
With no parameters,
snowflake uses the default relay `snowflake.freehaven.net:443` and
uses automatic signaling with the default broker at
`https://snowflake-broker.freehaven.net/`.
### Appendix
### Reuse as a library
##### -- Testing with Standalone Proxy --
The badge and the webextension make use of the same underlying library and
only differ in their UI. That same library can be produced for use with other
interfaces, such as [Cupcake][1], by running,
```
cd proxy-go
go build
./proxy-go
npm install
npm run library
```
##### -- Testing directly via WebRTC Server --
which outputs a `./snowflake-library.js`.
You'd then want to create a subclass of `UI` to perform various actions as
the state of the snowflake changes,
```
class MyUI extends UI {
...
}
```
See `WebExtUI` in `init-webext.js` and `BadgeUI` in `init-badge.js` for
examples.
Finally, initialize the snowflake with,
```
var log = function(msg) {
return console.log('Snowflake: ' + msg);
};
var dbg = log;
var config = new Config("myui"); // NOTE: Set a unique proxy type for metrics
var ui = new MyUI(); // NOTE: Using the class defined above
var broker = new Broker(config.brokerUrl);
var snowflake = new Snowflake(config, ui, broker);
snowflake.beginServingClients();
```
See server-webrtc/README.md for information on connecting directly to a
WebRTC server transport plugin, bypassing the Broker and browser proxy.
This minimal setup is pretty much what's currently in `init-node.js`.
More documentation on the way.
When configuring the snowflake, set a unique `proxyType` (first argument
to `Config`) that will be used when recording metrics at the broker. Also,
it would be helpful to get in touch with the [Anti-Censorship Team][2] at the
Tor Project to let them know about your tool.
Also available at:
[torproject.org/pluggable-transports/snowflake](https://gitweb.torproject.org/pluggable-transports/snowflake.git/)
[1]: https://chrome.google.com/webstore/detail/cupcake/dajjbehmbnbppjkcnpdkaniapgdppdnc
[2]: https://gitlab.torproject.org/tpo/anti-censorship/team
This component runs on Google App Engine. It reflects domain-fronted
requests from a client to the Snowflake broker.
You need the Go App Engine SDK in order to deploy the app.
https://cloud.google.com/sdk/docs/#linux
After unpacking, install the app-engine-go component:
google-cloud-sdk/bin/gcloud components install app-engine-go
To test locally, run
google-cloud-sdk/bin/dev_appserver.py app.yaml
The app will be running at http://127.0.0.1:8080/.
To deploy to App Engine, first create a new project and app. You have to
think of a unique name (marked as "<appname>" in the commands). You only
have to do the "create" step once; subsequent times you can go straight
to the "deploy" step. The "gcloud auth login" command will open a
browser window so you can log in to a Google account.
google-cloud-sdk/bin/gcloud auth login
google-cloud-sdk/bin/gcloud projects create <appname>
google-cloud-sdk/bin/gcloud app create --project=<appname>
Then to deploy the project, run:
google-cloud-sdk/bin/gcloud app deploy --project=<appname>
To configure the Snowflake client to talk to the App Engine app, provide
"https://<appname>.appspot.com/" as the --url option.
UseBridges 1
Bridge snowflake 0.0.2.0:1
ClientTransportPlugin snowflake exec ./client -url https://<appname>.appspot.com/ -front www.google.com
runtime: go
api_version: go1
handlers:
- url: /.*
script: _go_app
secure: always
// A web app for Google App Engine that proxies HTTP requests and responses to
// the Snowflake broker.
package reflect
import (
"context"
"io"
"net/http"
"net/url"
"time"
"google.golang.org/appengine"
"google.golang.org/appengine/log"
"google.golang.org/appengine/urlfetch"
)
const (
forwardURL = "https://snowflake-broker.bamsoftware.com/"
// A timeout of 0 means to use the App Engine default (5 seconds).
urlFetchTimeout = 20 * time.Second
)
var ctx context.Context
// Join two URL paths.
func pathJoin(a, b string) string {
if len(a) > 0 && a[len(a)-1] == '/' {
a = a[:len(a)-1]
}
if len(b) == 0 || b[0] != '/' {
b = "/" + b
}
return a + b
}
// We reflect only a whitelisted set of header fields. Otherwise, we may copy
// headers like Transfer-Encoding that interfere with App Engine's own
// hop-by-hop headers.
var reflectedHeaderFields = []string{
"Content-Type",
"X-Session-Id",
}
// Make a copy of r, with the URL being changed to be relative to forwardURL,
// and including only the headers in reflectedHeaderFields.
func copyRequest(r *http.Request) (*http.Request, error) {
u, err := url.Parse(forwardURL)
if err != nil {
return nil, err
}
// Append the requested path to the path in forwardURL, so that
// forwardURL can be something like "https://example.com/reflect".
u.Path = pathJoin(u.Path, r.URL.Path)
c, err := http.NewRequest(r.Method, u.String(), r.Body)
if err != nil {
return nil, err
}
for _, key := range reflectedHeaderFields {
values, ok := r.Header[key]
if ok {
for _, value := range values {
c.Header.Add(key, value)
}
}
}
return c, nil
}
func handler(w http.ResponseWriter, r *http.Request) {
ctx = appengine.NewContext(r)
fr, err := copyRequest(r)
if err != nil {
log.Errorf(ctx, "copyRequest: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if urlFetchTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, urlFetchTimeout)
defer cancel()
}
// Use urlfetch.Transport directly instead of urlfetch.Client because we
// want only a single HTTP transaction, not following redirects.
transport := urlfetch.Transport{
Context: ctx,
}
resp, err := transport.RoundTrip(fr)
if err != nil {
log.Errorf(ctx, "RoundTrip: %s", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
for _, key := range reflectedHeaderFields {
values, ok := resp.Header[key]
if ok {
for _, value := range values {
w.Header().Add(key, value)
}
}
}
w.WriteHeader(resp.StatusCode)
n, err := io.Copy(w, resp.Body)
if err != nil {
log.Errorf(ctx, "io.Copy after %d bytes: %s", n, err)
}
}
func init() {
http.HandleFunc("/", handler)
}
/* global log, dbg, snowflake */
/*
/**
Communication with the snowflake broker.
Browser snowflakes must register with the broker in order
......@@ -10,22 +10,21 @@ to get assigned to clients.
// Represents a broker running remotely.
class Broker {
// When interacting with the Broker, snowflake must generate a unique session
// ID so the Broker can keep track of each proxy's signalling channels.
// On construction, this Broker object does not do anything until
// |getClientOffer| is called.
constructor(url) {
// Promises some client SDP Offer.
// Registers this Snowflake with the broker using an HTTP POST request, and
// waits for a response containing some client offer that the Broker chooses
// for this proxy..
// TODO: Actually support multiple clients.
/**
* When interacting with the Broker, snowflake must generate a unique session
* ID so the Broker can keep track of each proxy's signalling channels.
* On construction, this Broker object does not do anything until
* `getClientOffer` is called.
* @param {Config} config
*/
constructor(config) {
this.getClientOffer = this.getClientOffer.bind(this);
// urlSuffix for the broker is different depending on what action
// is desired.
this._postRequest = this._postRequest.bind(this);
this.url = url;
this.clients = 0;
this.setNATType = this.setNATType.bind(this);
this.config = config;
this.url = config.brokerUrl;
this.natType = "unknown";
if (0 === this.url.indexOf('localhost', 0)) {
// Ensure url has the right protocol + trailing slash.
this.url = 'http://' + this.url;
......@@ -38,62 +37,95 @@ class Broker {
}
}
getClientOffer(id) {
/**
* Promises some client SDP Offer.
* Registers this Snowflake with the broker using an HTTP POST request, and
* waits for a response containing some client offer that the Broker chooses
* for this proxy..
* Rejects on timeout or on error.
* TODO: Actually support multiple clients.
*/
getClientOffer(id, numClientsConnected) {
return new Promise((fulfill, reject) => {
var xhr;
xhr = new XMLHttpRequest();
const xhr = new XMLHttpRequest();
xhr.onreadystatechange = function() {
if (xhr.DONE !== xhr.readyState) {
return;
}
switch (xhr.status) {
case Broker.STATUS.OK:
return fulfill(xhr.responseText); // Should contain offer.
case Broker.STATUS.GATEWAY_TIMEOUT:
return reject(Broker.MESSAGE.TIMEOUT);
default:
log('Broker ERROR: Unexpected ' + xhr.status + ' - ' + xhr.statusText);
snowflake.ui.setStatus(' failure. Please refresh.');
return reject(Broker.MESSAGE.UNEXPECTED);
if (xhr.status !== Broker.CODE.OK) {
log('Broker ERROR: Unexpected ' + xhr.status + ' - ' + xhr.statusText);
snowflake.ui.setStatus(' failure. Please refresh.');
reject(Broker.MESSAGE.UNEXPECTED);
return;
}
const response = JSON.parse(xhr.responseText);
switch (response.Status) {
case Broker.STATUS.MATCH: fulfill(response); return;
case Broker.STATUS.TIMEOUT: reject(Broker.MESSAGE.TIMEOUT); return;
default: {
log('Broker ERROR: Unexpected ' + response.Status);
reject(Broker.MESSAGE.UNEXPECTED);
return;
}
}
};
this._xhr = xhr; // Used by spec to fake async Broker interaction
return this._postRequest(id, xhr, 'proxy', id);
const clients = Math.floor(numClientsConnected / 8) * 8;
const data = {
Version: "1.3",
Sid: id,
Type: this.config.proxyType,
NAT: this.natType,
Clients: clients,
AcceptedRelayPattern: this.config.allowedRelayPattern,
};
this._postRequest(xhr, 'proxy', JSON.stringify(data));
});
}
// Assumes getClientOffer happened, and a WebRTC SDP answer has been generated.
// Sends it back to the broker, which passes it to back to the original client.
/**
* Assumes getClientOffer happened, and a WebRTC SDP answer has been generated.
* Sends it back to the broker, which passes it to back to the original client.
* @param {string} id
* @param {RTCSessionDescription} answer
*/
sendAnswer(id, answer) {
var xhr;
dbg(id + ' - Sending answer back to broker...\n');
dbg(answer.sdp);
xhr = new XMLHttpRequest();
const xhr = new XMLHttpRequest();
xhr.onreadystatechange = function() {
if (xhr.DONE !== xhr.readyState) {
return;
}
switch (xhr.status) {
case Broker.STATUS.OK:
case Broker.CODE.OK:
dbg('Broker: Successfully replied with answer.');
return dbg(xhr.responseText);
case Broker.STATUS.GONE:
return dbg('Broker: No longer valid to reply with answer.');
dbg(xhr.responseText);
break;
default:
dbg('Broker ERROR: Unexpected ' + xhr.status + ' - ' + xhr.statusText);
return snowflake.ui.setStatus(' failure. Please refresh.');
snowflake.ui.setStatus(' failure. Please refresh.');
break;
}
};
return this._postRequest(id, xhr, 'answer', JSON.stringify(answer));
const data = {"Version": "1.0", "Sid": id, "Answer": JSON.stringify(answer)};
this._postRequest(xhr, 'answer', JSON.stringify(data));
}
setNATType(natType) {
this.natType = natType;
}
_postRequest(id, xhr, urlSuffix, payload) {
var err;
/**
* @param {XMLHttpRequest} xhr
* @param {string} urlSuffix for the broker is different depending on what action
* is desired.
* @param {string} payload
*/
_postRequest(xhr, urlSuffix, payload) {
try {
xhr.open('POST', this.url + urlSuffix);
xhr.setRequestHeader('X-Session-ID', id);
} catch (error) {
err = error;
} catch (err) {
/*
An exception happens here when, for example, NoScript allows the domain
on which the proxy badge runs, but not the domain to which it's trying
......@@ -103,15 +135,20 @@ class Broker {
log('Broker: exception while connecting: ' + err.message);
return;
}
return xhr.send(payload);
xhr.send(payload);
}
}
Broker.STATUS = {
Broker.CODE = {
OK: 200,
GONE: 410,
GATEWAY_TIMEOUT: 504
BAD_REQUEST: 400,
INTERNAL_SERVER_ERROR: 500
};
Broker.STATUS = {
MATCH: "client match",
TIMEOUT: "no match"
};
Broker.MESSAGE = {
......@@ -119,4 +156,3 @@ Broker.MESSAGE = {
UNEXPECTED: 'Unexpected status.'
};
Broker.prototype.clients = 0;
This is the Broker component of Snowflake.
### Overview
The Broker handles the rendezvous by matching Snowflake
Clients with Proxies, and passing their WebRTC Session Descriptions
(the "signaling" step). This allows Clients and Proxies to establish
a Peer connection.
It is analogous to Flashproxy's
[Facilitator](https://trac.torproject.org/projects/tor/wiki/FlashProxyFAQ),
but bidirectional and domain-fronted.
The Broker expects:
- Clients to send their SDP offer in a POST request, which will then block
until the Broker responds with the answer of the matched Proxy.
- Proxies to announce themselves with a POST request, to which the Broker
responds with some Client's SDP offer. The Proxy should then send a second
POST request soon after containing its SDP answer, which the Broker passes
back to the same Client.
### Running your own
The server uses TLS by default.
There is a `--disable-tls` option for testing purposes,
but you should use TLS in production.
The server automatically fetches certificates
from [Let's Encrypt](https://en.wikipedia.org/wiki/Let's_Encrypt) as needed.
Use the `--acme-hostnames` option to tell the server
what hostnames it may request certificates for.
You can optionally provide a contact email address,
using the `--acme-email` option,
so that Let's Encrypt can inform you of any problems.
In order to fetch certificates automatically,
the server needs to open an additional HTTP listener on port 80.
On Linux, you can use the `setcap` program,
part of libcap2, to enable the broker to bind to low-numbered ports
without having to run as root:
```
setcap 'cap_net_bind_service=+ep' /usr/local/bin/broker
```
You can control the listening broker port with the --addr option.
Port 443 is the default.
You'll need to provide the URL of the custom broker
to the client plugin using the `--url $URL` flag.
/*
Broker acts as the HTTP signaling channel.
It matches clients and snowflake proxies by passing corresponding
SessionDescriptions in order to negotiate a WebRTC connection.
*/
package main
import (
"container/heap"
"crypto/tls"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
"golang.org/x/crypto/acme/autocert"
)
const (
ClientTimeout = 10
ProxyTimeout = 10
readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
)
type BrokerContext struct {
snowflakes *SnowflakeHeap
// Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST.
idToSnowflake map[string]*Snowflake
proxyPolls chan *ProxyPoll
metrics *Metrics
}
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
metrics, err := NewMetrics(metricsLogger)
if err != nil {
panic(err.Error())
}
if metrics == nil {
panic("Failed to create metrics")
}
return &BrokerContext{
snowflakes: snowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: metrics,
}
}
// Implements the http.Handler interface
type SnowflakeHandler struct {
*BrokerContext
handle func(*BrokerContext, http.ResponseWriter, *http.Request)
}
// Implements the http.Handler interface
type MetricsHandler struct {
logFilename string
handle func(string, http.ResponseWriter, *http.Request)
}
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
// Return early if it's CORS preflight.
if "OPTIONS" == r.Method {
return
}
sh.handle(sh.BrokerContext, w, r)
}
func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
// Return early if it's CORS preflight.
if "OPTIONS" == r.Method {
return
}
mh.handle(mh.logFilename, w, r)
}
// Proxies may poll for client offers concurrently.
type ProxyPoll struct {
id string
offerChannel chan []byte
}
// Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler.
func (ctx *BrokerContext) RequestOffer(id string) []byte {
request := new(ProxyPoll)
request.id = id
request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer.
offer := <-request.offerChannel
return offer
}
// goroutine which matches clients to proxies and sends SDP offers along.
// Safely processes proxy requests, responding to them with either an available
// client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls {
snowflake := ctx.AddSnowflake(request.id)
// Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) {
select {
case offer := <-snowflake.offerChannel:
request.offerChannel <- offer
case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients.
// TODO: Fix race using a delete channel
heap.Remove(ctx.snowflakes, snowflake.index)
delete(ctx.idToSnowflake, snowflake.id)
request.offerChannel <- nil
}
}(request)
}
}
// Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer.
func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
snowflake := new(Snowflake)
snowflake.id = id
snowflake.clients = 0
snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte)
heap.Push(ctx.snowflakes, snowflake)
ctx.idToSnowflake[id] = snowflake
return snowflake
}
/*
For snowflake proxies to request a client from the Broker.
*/
func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("X-Session-ID")
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
if string(body) != id {
log.Println("Mismatched IDs!")
w.WriteHeader(http.StatusBadRequest)
return
}
// Log geoip stats
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
} else {
ctx.metrics.UpdateCountryStats(remoteIP)
}
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
offer := ctx.RequestOffer(id)
if nil == offer {
ctx.metrics.proxyIdleCount++
w.WriteHeader(http.StatusGatewayTimeout)
return
}
log.Println("Passing client offer to snowflake.")
if _, err := w.Write(offer); err != nil {
log.Printf("proxyPolls unable to write offer with error: %v", err)
}
}
/*
Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
offer, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
// Immediately fail if there are no snowflakes available.
if ctx.snowflakes.Len() <= 0 {
ctx.metrics.clientDeniedCount++
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
defer delete(ctx.idToSnowflake, snowflake.id)
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel or timeout.
select {
case answer := <-snowflake.answerChannel:
ctx.metrics.clientProxyMatchCount++
if _, err := w.Write(answer); err != nil {
log.Printf("unable to write answer with error: %v", err)
}
// Initial tracking of elapsed time.
ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
time.Millisecond
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
w.WriteHeader(http.StatusGatewayTimeout)
if _, err := w.Write([]byte("timed out waiting for answer!")); err != nil {
log.Printf("unable to write timeout error, failed with error: %v", err)
}
}
}
/*
Expects snowflake proxes which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client.
*/
func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("X-Session-ID")
snowflake, ok := ctx.idToSnowflake[id]
if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker.
w.WriteHeader(http.StatusGone)
return
}
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if nil != err || nil == body || len(body) <= 0 {
log.Println("Invalid data.")
w.WriteHeader(http.StatusBadRequest)
return
}
snowflake.answerChannel <- body
}
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
var browsers, standalones int
for _, snowflake := range ctx.idToSnowflake {
if len(snowflake.id) < 16 {
browsers++
} else {
standalones++
}
}
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
if _, err := w.Write([]byte(s)); err != nil {
log.Printf("writing proxy information returned error: %v ", err)
}
}
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
if _, err := w.Write([]byte("User-agent: *\nDisallow: /\n")); err != nil {
log.Printf("robotsTxtHandler unable to write, with this error: %v", err)
}
}
func metricsHandler(metricsFilename string, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
if metricsFilename == "" {
http.NotFound(w, r)
return
}
metricsFile, err := os.OpenFile(metricsFilename, os.O_RDONLY, 0644)
if err != nil {
log.Println("Error opening metrics file for reading")
http.NotFound(w, r)
return
}
if _, err := io.Copy(w, metricsFile); err != nil {
log.Printf("copying metricsFile returned error: %v", err)
}
}
func main() {
var acmeEmail string
var acmeHostnamesCommas string
var acmeCertCacheDir string
var addr string
var geoipDatabase string
var geoip6Database string
var disableTLS bool
var certFilename, keyFilename string
var disableGeoip bool
var metricsFilename string
flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
flag.StringVar(&certFilename, "cert", "", "TLS certificate file")
flag.StringVar(&keyFilename, "key", "", "TLS private key file")
flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached")
flag.StringVar(&addr, "addr", ":443", "address to listen on")
flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
flag.Parse()
var err error
var metricsFile io.Writer
var logOutput io.Writer = os.Stderr
//We want to send the log output through our scrubber first
log.SetOutput(&safelog.LogScrubber{Output: logOutput})
log.SetFlags(log.LstdFlags | log.LUTC)
if metricsFilename != "" {
metricsFile, err = os.OpenFile(metricsFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err.Error())
}
} else {
metricsFile = os.Stdout
}
metricsLogger := log.New(metricsFile, "", 0)
ctx := NewBrokerContext(metricsLogger)
if !disableGeoip {
err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
if err != nil {
log.Fatal(err.Error())
}
}
go ctx.Broker()
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls})
http.Handle("/client", SnowflakeHandler{ctx, clientOffers})
http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
server := http.Server{
Addr: addr,
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
// go routine to handle a SIGHUP signal to allow the broker operator to send
// a SIGHUP signal when the geoip database files are updated, without requiring
// a restart of the broker
go func() {
for {
signal := <-sigChan
log.Printf("Received signal: %s. Reloading geoip databases.", signal)
if err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database); err != nil {
log.Fatalf("reload of Geo IP databases on signal %s returned error: %v", signal, err)
}
}
}()
// Handle the various ways of setting up TLS. The legal configurations
// are:
// --acme-hostnames (with optional --acme-email and/or --acme-cert-cache)
// --cert and --key together
// --disable-tls
// The outputs of this block of code are the disableTLS,
// needHTTP01Listener, certManager, and getCertificate variables.
if acmeHostnamesCommas != "" {
acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
log.Printf("ACME hostnames: %q", acmeHostnames)
var cache autocert.Cache
if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil {
log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err)
} else {
cache = autocert.DirCache(acmeCertCacheDir)
}
certManager := autocert.Manager{
Cache: cache,
Prompt: autocert.AcceptTOS,
HostPolicy: autocert.HostWhitelist(acmeHostnames...),
Email: acmeEmail,
}
go func() {
log.Printf("Starting HTTP-01 listener")
log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil)))
}()
server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
err = server.ListenAndServeTLS("", "")
} else if certFilename != "" && keyFilename != "" {
if acmeEmail != "" || acmeHostnamesCommas != "" {
log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.")
}
err = server.ListenAndServeTLS(certFilename, keyFilename)
} else if disableTLS {
err = server.ListenAndServe()
} else {
log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required")
}
if err != nil {
log.Fatal(err)
}
}
/*
This code is for loading database data that maps ip addresses to countries
for collecting and presenting statistics on snowflake use that might alert us
to censorship events.
The functions here are heavily based off of how tor maintains and searches their
geoip database
The tables used for geoip data must be structured as follows:
Recognized line format for IPv4 is:
INTIPLOW,INTIPHIGH,CC
where INTIPLOW and INTIPHIGH are IPv4 addresses encoded as big-endian 4-byte unsigned
integers, and CC is a country code.
Note that the IPv4 line format
"INTIPLOW","INTIPHIGH","CC","CC3","COUNTRY NAME"
is not currently supported.
Recognized line format for IPv6 is:
IPV6LOW,IPV6HIGH,CC
where IPV6LOW and IPV6HIGH are IPv6 addresses and CC is a country code.
It also recognizes, and skips over, blank lines and lines that start
with '#' (comments).
*/
package main
import (
"bufio"
"bytes"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"sort"
"strconv"
"strings"
"sync"
)
type GeoIPTable interface {
parseEntry(string) (*GeoIPEntry, error)
Len() int
Append(GeoIPEntry)
ElementAt(int) GeoIPEntry
Lock()
Unlock()
}
type GeoIPEntry struct {
ipLow net.IP
ipHigh net.IP
country string
}
type GeoIPv4Table struct {
table []GeoIPEntry
lock sync.Mutex // synchronization for geoip table accesses and reloads
}
type GeoIPv6Table struct {
table []GeoIPEntry
lock sync.Mutex // synchronization for geoip table accesses and reloads
}
func (table *GeoIPv4Table) Len() int { return len(table.table) }
func (table *GeoIPv6Table) Len() int { return len(table.table) }
func (table *GeoIPv4Table) Append(entry GeoIPEntry) {
(*table).table = append(table.table, entry)
}
func (table *GeoIPv6Table) Append(entry GeoIPEntry) {
(*table).table = append(table.table, entry)
}
func (table *GeoIPv4Table) ElementAt(i int) GeoIPEntry { return table.table[i] }
func (table *GeoIPv6Table) ElementAt(i int) GeoIPEntry { return table.table[i] }
func (table *GeoIPv4Table) Lock() { (*table).lock.Lock() }
func (table *GeoIPv6Table) Lock() { (*table).lock.Lock() }
func (table *GeoIPv4Table) Unlock() { (*table).lock.Unlock() }
func (table *GeoIPv6Table) Unlock() { (*table).lock.Unlock() }
// Convert a geoip IP address represented as a big-endian unsigned integer to net.IP
func geoipStringToIP(ipStr string) (net.IP, error) {
ip, err := strconv.ParseUint(ipStr, 10, 32)
if err != nil {
return net.IPv4(0, 0, 0, 0), fmt.Errorf("error parsing IP %s", ipStr)
}
var bytes [4]byte
bytes[0] = byte(ip & 0xFF)
bytes[1] = byte((ip >> 8) & 0xFF)
bytes[2] = byte((ip >> 16) & 0xFF)
bytes[3] = byte((ip >> 24) & 0xFF)
return net.IPv4(bytes[3], bytes[2], bytes[1], bytes[0]), nil
}
//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv4Table) parseEntry(candidate string) (*GeoIPEntry, error) {
if candidate[0] == '#' {
return nil, nil
}
parsedCandidate := strings.Split(candidate, ",")
if len(parsedCandidate) != 3 {
return nil, fmt.Errorf("provided geoip file is incorrectly formatted. Could not parse line:\n%s", parsedCandidate)
}
low, err := geoipStringToIP(parsedCandidate[0])
if err != nil {
return nil, err
}
high, err := geoipStringToIP(parsedCandidate[1])
if err != nil {
return nil, err
}
geoipEntry := &GeoIPEntry{
ipLow: low,
ipHigh: high,
country: parsedCandidate[2],
}
return geoipEntry, nil
}
//Parses a line in the provided geoip file that corresponds
//to an address range and a two character country code
func (table *GeoIPv6Table) parseEntry(candidate string) (*GeoIPEntry, error) {
if candidate[0] == '#' {
return nil, nil
}
parsedCandidate := strings.Split(candidate, ",")
if len(parsedCandidate) != 3 {
return nil, fmt.Errorf("")
}
low := net.ParseIP(parsedCandidate[0])
if low == nil {
return nil, fmt.Errorf("")
}
high := net.ParseIP(parsedCandidate[1])
if high == nil {
return nil, fmt.Errorf("")
}
geoipEntry := &GeoIPEntry{
ipLow: low,
ipHigh: high,
country: parsedCandidate[2],
}
return geoipEntry, nil
}
//Loads provided geoip file into our tables
//Entries are stored in a table
func GeoIPLoadFile(table GeoIPTable, pathname string) error {
//open file
geoipFile, err := os.Open(pathname)
if err != nil {
return err
}
defer geoipFile.Close()
hash := sha1.New()
table.Lock()
defer table.Unlock()
hashedFile := io.TeeReader(geoipFile, hash)
//read in strings and call parse function
scanner := bufio.NewScanner(hashedFile)
for scanner.Scan() {
entry, err := table.parseEntry(scanner.Text())
if err != nil {
return fmt.Errorf("provided geoip file is incorrectly formatted. Line is: %+q", scanner.Text())
}
if entry != nil {
table.Append(*entry)
}
}
if err := scanner.Err(); err != nil {
return err
}
sha1Hash := hex.EncodeToString(hash.Sum(nil))
log.Println("Using geoip file ", pathname, " with checksum", sha1Hash)
log.Println("Loaded ", table.Len(), " entries into table")
return nil
}
//Returns the country location of an IPv4 or IPv6 address, and a boolean value
//that indicates whether the IP address was present in the geoip database
func GetCountryByAddr(table GeoIPTable, ip net.IP) (string, bool) {
table.Lock()
defer table.Unlock()
//look IP up in database
index := sort.Search(table.Len(), func(i int) bool {
entry := table.ElementAt(i)
return (bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0)
})
if index == table.Len() {
return "", false
}
// check to see if addr is in the range specified by the returned index
// search on IPs in invalid ranges (e.g., 127.0.0.0/8) will return the
//country code of the next highest range
entry := table.ElementAt(index)
if !(bytes.Compare(ip.To16(), entry.ipLow.To16()) >= 0 &&
bytes.Compare(ip.To16(), entry.ipHigh.To16()) <= 0) {
return "", false
}
return table.ElementAt(index).country, true
}
/*
We export metrics in the following format:
"snowflake-stats-end" YYYY-MM-DD HH:MM:SS (NSEC s) NL
[At most once.]
YYYY-MM-DD HH:MM:SS defines the end of the included measurement
interval of length NSEC seconds (86400 seconds by default).
"snowflake-ips" CC=NUM,CC=NUM,... NL
[At most once.]
List of mappings from two-letter country codes to the number of
unique IP addresses of snowflake proxies that have polled.
"snowflake-ips-total" NUM NL
[At most once.]
A count of the total number of unique IP addresses of snowflake
proxies that have polled.
"snowflake-idle-count" NUM NL
[At most once.]
A count of the number of times a proxy has polled but received
no client offer, rounded up to the nearest multiple of 8.
"client-denied-count" NUM NL
[At most once.]
A count of the number of times a client has requested a proxy
from the broker but no proxies were available, rounded up to
the nearest multiple of 8.
"client-snowflake-match-count" NUM NL
[At most once.]
A count of the number of times a client successfully received a
proxy from the broker, rounded up to the nearest multiple of 8.
*/
package main
import (
// "golang.org/x/net/internal/timeseries"
"fmt"
"log"
"math"
"net"
"sync"
"time"
)
var (
once sync.Once
)
const metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds
type CountryStats struct {
addrs map[string]bool
counts map[string]int
}
// Implements Observable
type Metrics struct {
logger *log.Logger
tablev4 *GeoIPv4Table
tablev6 *GeoIPv6Table
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
clientDeniedCount uint
clientProxyMatchCount uint
}
func (s CountryStats) Display() string {
output := ""
for cc, count := range s.counts {
output += fmt.Sprintf("%s=%d,", cc, count)
}
// cut off trailing ","
if len(output) > 0 {
return output[:len(output)-1]
}
return output
}
func (m *Metrics) UpdateCountryStats(addr string) {
var country string
var ok bool
if m.countryStats.addrs[addr] {
return
}
ip := net.ParseIP(addr)
if ip.To4() != nil {
//This is an IPv4 address
if m.tablev4 == nil {
return
}
country, ok = GetCountryByAddr(m.tablev4, ip)
} else {
if m.tablev6 == nil {
return
}
country, ok = GetCountryByAddr(m.tablev6, ip)
}
if !ok {
country = "??"
}
//update map of unique ips and counts
m.countryStats.counts[country]++
m.countryStats.addrs[addr] = true
}
func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
// Load geoip databases
log.Println("Loading geoip databases")
tablev4 := new(GeoIPv4Table)
err := GeoIPLoadFile(tablev4, geoipDB)
if err != nil {
m.tablev4 = nil
return err
}
m.tablev4 = tablev4
tablev6 := new(GeoIPv6Table)
err = GeoIPLoadFile(tablev6, geoip6DB)
if err != nil {
m.tablev6 = nil
return err
}
m.tablev6 = tablev6
return nil
}
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
m.countryStats = CountryStats{
counts: make(map[string]int),
addrs: make(map[string]bool),
}
m.logger = metricsLogger
// Write to log file every hour with updated metrics
go once.Do(m.logMetrics)
return m, nil
}
// Logs metrics in intervals specified by metricsResolution
func (m *Metrics) logMetrics() {
heartbeat := time.Tick(metricsResolution)
for range heartbeat {
m.printMetrics()
m.zeroMetrics()
}
}
func (m *Metrics) printMetrics() {
m.logger.Println("snowflake-stats-end", time.Now().UTC().Format("2006-01-02 15:04:05"), fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())))
m.logger.Println("snowflake-ips", m.countryStats.Display())
m.logger.Println("snowflake-ips-total", len(m.countryStats.addrs))
m.logger.Println("snowflake-idle-count", binCount(m.proxyIdleCount))
m.logger.Println("client-denied-count", binCount(m.clientDeniedCount))
m.logger.Println("client-snowflake-match-count", binCount(m.clientProxyMatchCount))
}
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
m.clientDeniedCount = 0
m.clientProxyMatchCount = 0
m.countryStats.counts = make(map[string]int)
m.countryStats.addrs = make(map[string]bool)
}
// Rounds up a count to the nearest multiple of 8.
func binCount(count uint) uint {
return uint((math.Ceil(float64(count) / 8)) * 8)
}
package main
import (
"bytes"
"container/heap"
"io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
)
func NullLogger() *log.Logger {
logger := log.New(os.Stdout, "", 0)
logger.SetOutput(ioutil.Discard)
return logger
}
func TestBroker(t *testing.T) {
Convey("Context", t, func() {
ctx := NewBrokerContext(NullLogger())
Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.idToSnowflake), ShouldEqual, 0)
ctx.AddSnowflake("foo")
So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.idToSnowflake), ShouldEqual, 1)
})
Convey("Broker goroutine matches clients with proxies", func() {
p := new(ProxyPoll)
p.id = "test"
p.offerChannel = make(chan []byte)
go func(ctx *BrokerContext) {
ctx.proxyPolls <- p
close(ctx.proxyPolls)
}(ctx)
ctx.Broker()
So(ctx.snowflakes.Len(), ShouldEqual, 1)
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
snowflake.offerChannel <- []byte("test offer")
offer := <-p.offerChannel
So(ctx.idToSnowflake["test"], ShouldNotBeNil)
So(offer, ShouldResemble, []byte("test offer"))
So(ctx.snowflakes.Len(), ShouldEqual, 0)
})
Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte)
go func() {
offer := ctx.RequestOffer("test")
done <- offer
}()
request := <-ctx.proxyPolls
request.offerChannel <- []byte("test offer")
offer := <-done
So(offer, ShouldResemble, []byte("test offer"))
})
Convey("Responds to client offers...", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
Convey("with 503 when no snowflakes are available.", func() {
clientOffers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
So(w.Body.String(), ShouldEqual, "")
})
Convey("with a proxy answer if available.", func() {
done := make(chan bool)
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake")
go func() {
clientOffers(ctx, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer")
<-done
So(w.Body.String(), ShouldEqual, "fake answer")
So(w.Code, ShouldEqual, http.StatusOK)
})
Convey("Times out when no proxy responds.", func() {
if testing.Short() {
return
}
done := make(chan bool)
snowflake := ctx.AddSnowflake("fake")
go func() {
clientOffers(ctx, w, r)
// Takes a few seconds here...
done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
<-done
So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
})
})
Convey("Responds to proxy polls...", func() {
done := make(chan bool)
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
Convey("with a client offer if available.", func() {
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
// Pass a fake client offer to this proxy
p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "test")
p.offerChannel <- []byte("fake offer")
<-done
So(w.Code, ShouldEqual, http.StatusOK)
So(w.Body.String(), ShouldEqual, "fake offer")
})
Convey("times out when no client offer is available.", func() {
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "test")
// nil means timeout
p.offerChannel <- nil
<-done
So(w.Body.String(), ShouldEqual, "")
So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
})
})
Convey("Responds to proxy answers...", func() {
s := ctx.AddSnowflake("test")
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("fake answer"))
Convey("by passing to the client if valid.", func() {
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "test")
go func(ctx *BrokerContext) {
proxyAnswers(ctx, w, r)
}(ctx)
answer := <-s.answerChannel
So(w.Code, ShouldEqual, http.StatusOK)
So(answer, ShouldResemble, []byte("fake answer"))
})
Convey("with error if the proxy is not recognized", func() {
r, err := http.NewRequest("POST", "snowflake.broker/answer", nil)
So(err, ShouldBeNil)
r.Header.Set("X-Session-ID", "invalid")
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusGone)
})
Convey("with error if the proxy gives invalid answer", func() {
data := bytes.NewReader(nil)
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
})
Convey("with error if the proxy writes too much data", func() {
data := bytes.NewReader(make([]byte, 100001))
r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
r.Header.Set("X-Session-ID", "test")
So(err, ShouldBeNil)
proxyAnswers(ctx, w, r)
So(w.Code, ShouldEqual, http.StatusBadRequest)
})
})
})
Convey("End-To-End", t, func() {
done := make(chan bool)
polled := make(chan bool)
ctx := NewBrokerContext(NullLogger())
// Proxy polls with its ID first...
dataP := bytes.NewReader([]byte("test"))
wP := httptest.NewRecorder()
rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP)
So(err, ShouldBeNil)
rP.Header.Set("X-Session-ID", "test")
go func() {
proxyPolls(ctx, wP, rP)
polled <- true
}()
// Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "test")
s := ctx.AddSnowflake(p.id)
go func() {
offer := <-s.offerChannel
p.offerChannel <- offer
}()
So(ctx.idToSnowflake["test"], ShouldNotBeNil)
// Client request blocks until proxy answer arrives.
dataC := bytes.NewReader([]byte("fake offer"))
wC := httptest.NewRecorder()
rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
So(err, ShouldBeNil)
go func() {
clientOffers(ctx, wC, rC)
done <- true
}()
<-polled
So(wP.Code, ShouldEqual, http.StatusOK)
So(wP.Body.String(), ShouldResemble, "fake offer")
So(ctx.idToSnowflake["test"], ShouldNotBeNil)
// Follow up with the answer request afterwards
wA := httptest.NewRecorder()
dataA := bytes.NewReader([]byte("fake answer"))
rA, err := http.NewRequest("POST", "snowflake.broker/proxy", dataA)
So(err, ShouldBeNil)
rA.Header.Set("X-Session-ID", "test")
proxyAnswers(ctx, wA, rA)
So(wA.Code, ShouldEqual, http.StatusOK)
<-done
So(wC.Code, ShouldEqual, http.StatusOK)
So(wC.Body.String(), ShouldEqual, "fake answer")
})
}
func TestSnowflakeHeap(t *testing.T) {
Convey("SnowflakeHeap", t, func() {
h := new(SnowflakeHeap)
heap.Init(h)
So(h.Len(), ShouldEqual, 0)
s1 := new(Snowflake)
s2 := new(Snowflake)
s3 := new(Snowflake)
s4 := new(Snowflake)
s1.clients = 4
s2.clients = 5
s3.clients = 3
s4.clients = 1
heap.Push(h, s1)
So(h.Len(), ShouldEqual, 1)
heap.Push(h, s2)
So(h.Len(), ShouldEqual, 2)
heap.Push(h, s3)
So(h.Len(), ShouldEqual, 3)
heap.Push(h, s4)
So(h.Len(), ShouldEqual, 4)
heap.Remove(h, 0)
So(h.Len(), ShouldEqual, 3)
r := heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 2)
So(r.clients, ShouldEqual, 3)
So(r.index, ShouldEqual, -1)
r = heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 1)
So(r.clients, ShouldEqual, 4)
So(r.index, ShouldEqual, -1)
r = heap.Pop(h).(*Snowflake)
So(h.Len(), ShouldEqual, 0)
So(r.clients, ShouldEqual, 5)
So(r.index, ShouldEqual, -1)
})
}
func TestGeoip(t *testing.T) {
Convey("Geoip", t, func() {
tv4 := new(GeoIPv4Table)
err := GeoIPLoadFile(tv4, "test_geoip")
So(err, ShouldEqual, nil)
tv6 := new(GeoIPv6Table)
err = GeoIPLoadFile(tv6, "test_geoip6")
So(err, ShouldEqual, nil)
Convey("IPv4 Country Mapping Tests", func() {
for _, test := range []struct {
addr, cc string
ok bool
}{
{
"129.97.208.23", //uwaterloo
"CA",
true,
},
{
"127.0.0.1",
"",
false,
},
{
"255.255.255.255",
"",
false,
},
{
"0.0.0.0",
"",
false,
},
{
"223.252.127.255", //test high end of range
"JP",
true,
},
{
"223.252.127.255", //test low end of range
"JP",
true,
},
} {
country, ok := GetCountryByAddr(tv4, net.ParseIP(test.addr))
So(country, ShouldEqual, test.cc)
So(ok, ShouldResemble, test.ok)
}
})
Convey("IPv6 Country Mapping Tests", func() {
for _, test := range []struct {
addr, cc string
ok bool
}{
{
"2620:101:f000:0:250:56ff:fe80:168e", //uwaterloo
"CA",
true,
},
{
"fd00:0:0:0:0:0:0:1",
"",
false,
},
{
"0:0:0:0:0:0:0:0",
"",
false,
},
{
"ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"",
false,
},
{
"2a07:2e47:ffff:ffff:ffff:ffff:ffff:ffff", //test high end of range
"FR",
true,
},
{
"2a07:2e40::", //test low end of range
"FR",
true,
},
} {
country, ok := GetCountryByAddr(tv6, net.ParseIP(test.addr))
So(country, ShouldEqual, test.cc)
So(ok, ShouldResemble, test.ok)
}
})
// Make sure things behave properly if geoip file fails to load
ctx := NewBrokerContext(NullLogger())
if err := ctx.metrics.LoadGeoipDatabases("invalid_filename", "invalid_filename6"); err != nil {
log.Printf("loading geo ip databases returned error: %v", err)
}
ctx.metrics.UpdateCountryStats("127.0.0.1")
So(ctx.metrics.tablev4, ShouldEqual, nil)
})
}
func TestMetrics(t *testing.T) {
Convey("Test metrics...", t, func() {
done := make(chan bool)
buf := new(bytes.Buffer)
ctx := NewBrokerContext(log.New(buf, "", 0))
err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6")
So(err, ShouldEqual, nil)
//Test addition of proxy polls
Convey("for proxy polls", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})
//Test addition of client failures
Convey("for no proxies available", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")
// Test reset
buf.Reset()
ctx.metrics.zeroMetrics()
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})
//Test addition of client matches
Convey("for client-proxy match", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
// Prepare a fake proxy to respond with.
snowflake := ctx.AddSnowflake("fake")
go func() {
clientOffers(ctx, w, r)
done <- true
}()
offer := <-snowflake.offerChannel
So(offer, ShouldResemble, []byte("test"))
snowflake.answerChannel <- []byte("fake answer")
<-done
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 0\nclient-snowflake-match-count 8\n")
})
//Test rounding boundary
Convey("binning boundary", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/client", data)
So(err, ShouldBeNil)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
clientOffers(ctx, w, r)
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 8\nclient-snowflake-match-count 0\n")
clientOffers(ctx, w, r)
buf.Reset()
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips \nsnowflake-ips-total 0\nsnowflake-idle-count 0\nclient-denied-count 16\nclient-snowflake-match-count 0\n")
})
//Test unique ip
Convey("proxy counts by unique ip", func() {
w := httptest.NewRecorder()
data := bytes.NewReader([]byte("test"))
r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
So(err, ShouldBeNil)
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p := <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
data = bytes.NewReader([]byte("test"))
r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
if err != nil {
log.Printf("unable to get NewRequest with error: %v", err)
}
r.Header.Set("X-Session-ID", "test")
r.RemoteAddr = "129.97.208.23:8888" //CA geoip
go func(ctx *BrokerContext) {
proxyPolls(ctx, w, r)
done <- true
}(ctx)
p = <-ctx.proxyPolls //manually unblock poll
p.offerChannel <- nil
<-done
ctx.metrics.printMetrics()
So(buf.String(), ShouldResemble, "snowflake-stats-end "+time.Now().UTC().Format("2006-01-02 15:04:05")+" (86400 s)\nsnowflake-ips CA=1\nsnowflake-ips-total 1\nsnowflake-idle-count 8\nclient-denied-count 0\nclient-snowflake-match-count 0\n")
})
})
}
/*
Keeping track of pending available snowflake proxies.
*/
package main
/*
The Snowflake struct contains a single interaction
over the offer and answer channels.
*/
type Snowflake struct {
id string
offerChannel chan []byte
answerChannel chan []byte
clients int
index int
}
// Implements heap.Interface, and holds Snowflakes.
type SnowflakeHeap []*Snowflake
func (sh SnowflakeHeap) Len() int { return len(sh) }
func (sh SnowflakeHeap) Less(i, j int) bool {
// Snowflakes serving less clients should sort earlier.
return sh[i].clients < sh[j].clients
}
func (sh SnowflakeHeap) Swap(i, j int) {
sh[i], sh[j] = sh[j], sh[i]
sh[i].index = i
sh[j].index = j
}
func (sh *SnowflakeHeap) Push(s interface{}) {
n := len(*sh)
snowflake := s.(*Snowflake)
snowflake.index = n
*sh = append(*sh, snowflake)
}
// Only valid when Len() > 0.
func (sh *SnowflakeHeap) Pop() interface{} {
flakes := *sh
n := len(flakes)
snowflake := flakes[n-1]
snowflake.index = -1
*sh = flakes[0 : n-1]
return snowflake
}
This diff is collapsed.