Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • tpo/anti-censorship/pluggable-transports/snowflake
  • cohosh/snowflake
  • eighthave/snowflake
  • phw/snowflake
  • dcf/snowflake
  • meskio/snowflake
  • sbs/snowflake
  • idk/snowflake
  • CyberTailor/snowflake
  • Flo418/snowflake
  • shelikhoo/snowflake
  • WofWca/snowflake
  • pjsier/snowflake
  • eta/snowflake
  • dangowrt/snowflake
  • luciole/snowflake
  • KokaKiwi/snowflake
  • trinity-1686a/snowflake
  • sambhavsaxena/snowflake
  • nomandhoni/snowflake
  • pranitpatil3112/snowflake
  • Vort/snowflake
  • mpu/snowflake
  • Gedsh/snowflake
  • mingyech/snowflake
  • just1602/snowflake
  • theodorsm/snowflake
  • obble/snowflake
  • tgragnato/snowflake
  • anarcat/snowflake
  • neel/snowflake
  • akiel/snowflake
  • onyinyang/snowflake
  • opara/snowflake
34 results
Show changes
Commits on Source (575)
Showing with 2017 additions and 408 deletions
stages:
- test
- deploy
- container-build
- container-mirror
variables:
DEBIAN_FRONTEND: noninteractive
DEBIAN_OLD_STABLE: buster
DEBIAN_STABLE: bullseye
REPRODUCIBLE_FLAGS: -trimpath -ldflags=-buildid=
# set up apt for automated use
.apt-template: &apt-template
- export LC_ALL=C.UTF-8
- export DEBIAN_FRONTEND=noninteractive
- echo Etc/UTC > /etc/timezone
- ln -fs /usr/share/zoneinfo/Etc/UTC /etc/localtime
- echo 'quiet "1";'
'APT::Install-Recommends "0";'
'APT::Install-Suggests "0";'
......@@ -48,7 +55,6 @@ variables:
golang-golang-x-sys-dev
golang-golang-x-text-dev
golang-golang-x-xerrors-dev
lbzip2
# use Go installed as part of the official, Debian-based Docker images
.golang-docker-debian-template: &golang-docker-debian-template
......@@ -57,9 +63,9 @@ variables:
- apt-get -qy install --no-install-recommends
ca-certificates
git
lbzip2
.go-test: &go-test
- gofmt -d .
- test -z "$(go fmt ./...)"
- go vet ./...
- go test -v -race ./...
......@@ -86,12 +92,10 @@ variables:
# -- jobs ------------------------------------------------------------
android:
image: debian:bullseye-backports
image: containers.torproject.org/tpo/anti-censorship/duplicatedcontainerimages:golang-1.23-$DEBIAN_STABLE
variables:
ANDROID_HOME: /usr/lib/android-sdk
GOPATH: "/go"
LANG: C.UTF-8
PATH: "/go/bin:/usr/lib/go-1.16/bin:/usr/bin:/bin"
cache:
paths:
- .gradle/wrapper
......@@ -109,7 +113,7 @@ android:
gnupg
unzip
wget
- apt-get install -t bullseye-backports golang-1.16
ca-certificates
- ndk=android-ndk-r21e-linux-x86_64.zip
- wget --continue --no-verbose https://dl.google.com/android/repository/$ndk
......@@ -120,7 +124,6 @@ android:
- mv android-ndk-* $ANDROID_HOME/ndk-bundle/
- chmod -R a+rX $ANDROID_HOME
script:
- *go-test
- export GRADLE_USER_HOME=$CI_PROJECT_DIR/.gradle
......@@ -135,26 +138,19 @@ android:
- cd $CI_PROJECT_DIR/client
# gomobile builds a shared library not a CLI executable
- sed -i 's,^package main$,package snowflakeclient,' snowflake.go
- sed -i 's,^package main$,package snowflakeclient,' *.go
- go get golang.org/x/mobile/bind
- gomobile bind -v -target=android $REPRODUCIBLE_FLAGS .
go-1.15:
image: golang:1.15-stretch
go-1.21:
image: containers.torproject.org/tpo/anti-censorship/duplicatedcontainerimages:golang-1.21-$DEBIAN_STABLE
<<: *golang-docker-debian-template
<<: *test-template
script:
- *go-test
go-1.16:
image: golang:1.16-stretch
<<: *golang-docker-debian-template
<<: *test-template
script:
- *go-test
go-1.17:
image: golang:1.17-stretch
go-1.23:
image: containers.torproject.org/tpo/anti-censorship/duplicatedcontainerimages:golang-1.23-$DEBIAN_STABLE
<<: *golang-docker-debian-template
<<: *test-template
script:
......@@ -166,3 +162,239 @@ debian-testing:
<<: *test-template
script:
- *go-test
shadow-integration:
image: containers.torproject.org/tpo/anti-censorship/duplicatedcontainerimages:golang-1.21-$DEBIAN_STABLE
variables:
SHADOW_VERSION: "193924aae0dab30ffda0abe29467f552949849fa"
TGEN_VERSION: "v1.1.2"
cache:
key: sf-integration-$SHADOW_VERSION-$TGEN_VERSION
paths:
- /opt/
artifacts:
paths:
- shadow.data.tar.gz
when: on_failure
tags:
- amd64
- tpa
script:
- apt-get update
- apt-get install -y git tor
- mkdir -p ~/.local/bin
- mkdir -p ~/.local/src
- export PATH=$PATH:$CI_PROJECT_DIR/opt/bin/
# Install shadow and tgen
- pushd ~/.local/src
- |
if [ ! -f opt/shadow/bin/shadow ]
then
echo "The required version of shadow was not cached, building from source"
git clone --shallow-since=2021-08-01 https://github.com/shadow/shadow.git
pushd shadow/
git checkout $SHADOW_VERSION
CONTAINER=debian:stable-slim ci/container_scripts/install_deps.sh
CC=gcc CONTAINER=debian:stable-slim ci/container_scripts/install_extra_deps.sh
export PATH="$HOME/.cargo/bin:${PATH}"
./setup build --jobs $(nproc) --prefix $CI_PROJECT_DIR/opt/
./setup install
popd
fi
- |
if [ ! -f opt/shadow/bin/tgen ]
then
echo "The required version of tgen was not cached, building from source"
git clone --branch $TGEN_VERSION --depth 1 https://github.com/shadow/tgen.git
pushd tgen/
apt-get install -y cmake libglib2.0-dev libigraph-dev
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=$CI_PROJECT_DIR/opt/
make
make install
popd
fi
install $CI_PROJECT_DIR/opt/bin/tgen ~/.local/bin/tgen
- popd
# Apply snowflake patch(es)
- |
git clone --depth 1 https://github.com/cohosh/shadow-snowflake-minimal
git am -3 shadow-snowflake-minimal/*.patch
# Install snowflake binaries to .local folder
- |
for app in "proxy" "client" "server" "broker" "probetest"; do
pushd $app
go build
install $app ~/.local/bin/snowflake-$app
popd
done
# Install stun server
- GOBIN=~/.local/bin go install github.com/gortc/stund@latest
# Run a minimal snowflake shadow experiment
- pushd shadow-snowflake-minimal/
- shadow --log-level=debug --model-unblocked-syscall-latency=true snowflake-minimal.yaml > shadow.log
# Check to make sure streams succeeded
- |
if [ $(grep -c "stream-success" shadow.data/hosts/snowflakeclient/tgen.*.stdout) = 10 ]
then
echo "All streams in shadow completed successfully"
else
echo "Shadow simulation failed"
exit 1
fi
after_script:
- tar -czvf $CI_PROJECT_DIR/shadow.data.tar.gz shadow-snowflake-minimal/shadow.data/ shadow-snowflake-minimal/shadow.log
generate_tarball:
stage: deploy
image: golang:1.21-$DEBIAN_STABLE
rules:
- if: $CI_COMMIT_TAG
script:
- go mod vendor
- tar czf ${CI_PROJECT_NAME}-${CI_COMMIT_TAG}.tar.gz --transform "s,^,${CI_PROJECT_NAME}-${CI_COMMIT_TAG}/," *
after_script:
- echo TAR_JOB_ID=$CI_JOB_ID >> generate_tarball.env
artifacts:
paths:
- ${CI_PROJECT_NAME}-${CI_COMMIT_TAG}.tar.gz
reports:
dotenv: generate_tarball.env
release-job:
stage: deploy
image: registry.gitlab.com/gitlab-org/release-cli:latest
rules:
- if: $CI_COMMIT_TAG
needs:
- job: generate_tarball
artifacts: true
script:
- echo "running release_job"
release:
name: 'Release $CI_COMMIT_TAG'
description: 'Created using the release-cli'
tag_name: '$CI_COMMIT_TAG'
ref: '$CI_COMMIT_TAG'
assets:
links:
- name: '${CI_PROJECT_NAME}-${CI_COMMIT_TAG}.tar.gz'
url: '${CI_PROJECT_URL}/-/jobs/${TAR_JOB_ID}/artifacts/file/${CI_PROJECT_NAME}-${CI_COMMIT_TAG}.tar.gz'
# Build the container only if the commit is to main, or it is a tag.
# If the commit is to main, then the docker image tag should be set to `nightly`.
# If it is a tag, then the docker image tag should be set to the tag name.
build-container:
variables:
TAG: $CI_COMMIT_TAG # Will not be set on a non-tag build, will be set later
stage: container-build
parallel:
matrix:
- ARCH: amd64
- ARCH: arm64
- ARCH: s390x
tags:
- $ARCH
image:
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
script:
- if [ $CI_COMMIT_REF_NAME == "main" ]; then export TAG='nightly'; fi
- >-
/kaniko/executor
--context "${CI_PROJECT_DIR}"
--dockerfile "${CI_PROJECT_DIR}/Dockerfile"
--destination "${CI_REGISTRY_IMAGE}:${TAG}_${ARCH}"
rules:
- if: $CI_COMMIT_REF_NAME == "main"
- if: $CI_COMMIT_TAG
merge-manifests:
variables:
TAG: $CI_COMMIT_TAG
stage: container-build
needs:
- job: build-container
artifacts: false
image:
name: containers.torproject.org/tpo/anti-censorship/duplicatedcontainerimages:mplatform-manifest-tool-alpine
entrypoint: [""]
script:
- if [ $CI_COMMIT_REF_NAME == "main" ]; then export TAG='nightly'; fi
- >-
manifest-tool
--username="${CI_REGISTRY_USER}"
--password="${CI_REGISTRY_PASSWORD}"
push from-args
--platforms linux/amd64,linux/arm64,linux/s390x
--template "${CI_REGISTRY_IMAGE}:${TAG}_ARCH"
--target "${CI_REGISTRY_IMAGE}:${TAG}"
rules:
- if: $CI_COMMIT_REF_NAME == "main"
when: always
- if: $CI_COMMIT_TAG
when: always
# If this is a tag, then we want to additionally tag the image as `latest`
tag-container-release:
stage: container-build
needs:
- job: merge-manifests
artifacts: false
image:
name: gcr.io/go-containerregistry/crane:debug
entrypoint: [""]
allow_failure: false
variables:
CI_REGISTRY: $CI_REGISTRY
IMAGE_TAG: $CI_REGISTRY_IMAGE:$CI_COMMIT_TAG
RELEASE_TAG: $CI_REGISTRY_IMAGE:latest
script:
- echo "Tagging docker image with stable tag with crane"
- echo -n "$CI_JOB_TOKEN" | crane auth login $CI_REGISTRY -u gitlab-ci-token --password-stdin
- crane cp $IMAGE_TAG $RELEASE_TAG
rules:
- if: $CI_COMMIT_TAG
when: always
clean-image-tags:
stage: container-build
needs:
- job: merge-manifests
artifacts: false
image: containers.torproject.org/tpo/tpa/base-images/debian:bookworm
before_script:
- *apt-template
- apt-get install -y jq curl
script:
- "REGISTRY_ID=$(curl --silent --request GET --header \"JOB-TOKEN: ${CI_JOB_TOKEN}\" \"https://gitlab.torproject.org/api/v4/projects/${CI_PROJECT_ID}/registry/repositories\" | jq '.[].id')"
- "curl --request DELETE --data \"name_regex_delete=(latest|${CI_COMMIT_TAG})_.*\" --header \"JOB-TOKEN: ${CI_JOB_TOKEN}\" \"https://gitlab.torproject.org/api/v4/projects/${CI_PROJECT_ID}/registry/repositories/${REGISTRY_ID}/tags\""
rules:
- if: $CI_COMMIT_REF_NAME == "main"
when: always
- if: $CI_COMMIT_TAG
when: always
mirror-image-to-dockerhub:
stage: container-mirror
variables:
DOCKERHUB_MIRROR_REPOURL: $DOCKERHUB_MIRROR_REPOURL
DOCKERHUB_USERNAME: $DOCKERHUB_MIRROR_USERNAME
DOCKERHUB_PASSWORD: $DOCKERHUB_MIRROR_PASSWORD
image:
name: gcr.io/go-containerregistry/crane:debug
entrypoint: [""]
rules:
- if: $CI_COMMIT_REF_NAME == "main"
when: always
- if: $CI_COMMIT_TAG
when: always
script:
- echo "$DOCKERHUB_PASSWORD" | crane auth login docker.io -u $DOCKERHUB_MIRROR_USERNAME --password-stdin
- crane cp -a containers.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake $DOCKERHUB_MIRROR_REPOURL
Changes in version v2.11.0 - 2025-03-18
- Fix data race warnings for tokens_t
- Fix race condition in proxy connection count stats
- Make NATPolicy thread-safe
- Fix race conditions with error scope
- Fix race condition with proxy isClosing variable
- Issue 40454: Update broker metrics to count matches, denials, and timeouts
- Add proxy event and metrics for failed connections
- Issue 40377: Create CI artifact if shadow fails
- Issue 40438: Copy base client config for each SOCKS connection
- Fix minor data race in Snowflake broker metrics
- Issue 40363: Process and read broker SQS messages more quickly
- Issue 40419: delay before calling dc.Close() to improve NAT test on proxy
- Add country stats to proxy prometheus metrics
- Issue 40381: Avoid snowflake client dependency in proxy
- Issue 40446: Lower broker ClientTimeout to 5 seconds in line with CDN77 defaults
- Refactor out utls library into ptutil/utls
- Issue 40414: Use /etc/localtime for CI
- Issue 40440: Add LE self-signed ISRG Root X1 to cert pool
- Proxy refactor to simplify tokens.ret() on error
- Clarify ephemeral-ports-range proxy option
- Issue 40417: Fixes and updates to CI containers
- Issue 40178: Handle unknown client type better
- Issue 40304: Update STUN server list
- Issue 40210: Remove proxy log when offer is nil
- Issue 40413: Log EventOnCurrentNATTypeDetermined for proxy
- Use named return for some functions to improve readability
- Issue 40271: Use pion SetIPFilter rather than our own StripLocalAddress
- Issue 40413: Suppress logs of proxy events by default
- Add IsLinkLocalUnicast in IsLocal
- Fix comments
- Bump versions of dependencies
Changes in version v2.10.1 - 2024-11-11
- Issue 40406: Update version string
Changes in version v2.10.0 - 2024-11-07
- Issue 40402: Add proxy event for when client has connected
- Issue 40405: Prevent panic for duplicate SnowflakeConn.Close() calls
- Enable local time for proxy logging
- Have proxy summary statistics log average transfer rate
- Issue 40210: Remove duplicate poll interval loop in proxy
- Issue 40371: Prevent broker and proxy from rejecting clients without ICE candidates
- Issue 40392: Allow the proxy and probetest to set multiple STUN URLs
- Issue 40387: Fix error in probetest NAT check
- Fix proxy panic on invalid relayURL
- Set empty pattern if broker bridge-list is empty
- Improve documentation of Ephemeral[Min,Max]Port
- Fix resource leak and NAT check in probetest
- Fix memory leak from failed NAT check
- Improve NAT check logging
- Issue 40230: Send answer even if ICE gathering is not complete
- Improve broker error message on unknown bridge fingerprint
- Don't proxy private IP addresses
- Only accept ws:// and wss:// relay addresses
- Issue 40373: Add cli flag and SnowflakeProxy field to modify proxy poll interval
- Use %w not $v in fmt.Errorf
- Updates to documentation
- Adjust copy buffer size to improve proxy performance
- Improve descriptions of cli flags
- Cosmetic changes for code readability
- Issue 40367: Deduplicate prometheus metrics names
- Report the version of snowflake to the tor process
- Issue 40365: Indicate whether the repo was modified in the version string
- Simplify NAT checking logic
- Issue 40354: Use ptutil library for safelog and prometheus metrics
- Add cli flag to set a listen address for proxy prometheus metrics
- Issue 40345: Integrate docker image with release process
- Bump versions of dependencies
Changes in version v2.9.2 - 2024-03-18
- Issue 40288: Add integration testing with Shadow
- Issue 40345: Automatically build and push containers to our registry
- Issue 40339: Fix client ID reuse bug in SQS rendezvous
- Issue 40341: Modify SQS rendezvous arguments to use b64 encoded parameters
- Issue 40330: Add new metrics at the broker for per-country rendezvous stats
- Issue 40345: Update docker container tags
- Bump versions of dependencies
Changes in version v2.9.1 - 2024-02-27
- Issue 40335: Fix release job
- Change deprecated io/ioutil package to io package
- Bump versions of dependencies
Changes in version v2.9.0 - 2024-02-05
- Issue 40285: Add vcs revision to version string
- Issue 40294: Update recommended torrc options in client README
- Issue 40306: Scrub space-separated IP addresses
- Add proxy commandline option for probe server URL
- Use SetNet setting in probest to ignore net.Interfaces error
- Add probetest commandline option for STUN URL
- Issue 26151: Implement SQS rendezvous in client and broker
- Add broker metrics to track rendezvous method
- Cosmetic code quality fixes
- Bump versions of dependencies
Changes in version v2.8.1 - 2023-12-21
- Issue 40276: Reduce allocations in encapsulation.ReadData
- Issue 40310: Remove excessive logging for closed proxy connections
- Issue 40278: Add network fix for old version of android to proxy
- Bump versions of dependencies
Changes in version v2.8.0 - 2023-11-20
- Issue 40069: Add outbound proxy support
- Issue 40301: Fix for a bug in domain fronting configurations
- Issue 40302: Remove throughput summary from proxy logger
- Issue 40302: Change proxy stats logging to only log stats for traffic that occurred in the summary interval
- Update renovate bot configuration to use Go 1.21
- Bump versions of dependencies
Changes in version v2.7.0 - 2023-10-16
7142fa3 fix(proxy): Correctly close connection pipe when dealing with error
6393af6 Remove proxy churn measurements from broker.
a615e8b fix(proxy): remove _potential_ deadlock
d434549 Maintain backward compatability with old clients
9fdfb3d Randomly select front domain from comma-separated list
5cdf52c Update dependencies
1559963 chore(deps): update module github.com/xtaci/kcp-go/v5 to v5.6.3
60e66be Remove Golang 1.20 from CI Testing
1d069ca Update CI targets to test android from golang 1.21
3a050c6 Use ShouldBeNil to check for nil values
e45e8e5 chore(deps): update module github.com/smartystreets/goconvey to v1.8.1
f47ca18 chore(deps): update module gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib to v1.5.0
106da49 chore(deps): update module github.com/pion/webrtc/v3 to v3.2.20
2844ac6 Update CI targets to include only Go 1.20 and 1.21
f4e1ab9 chore(deps): update module golang.org/x/net to v0.15.0
caaff70 Update module golang.org/x/sys to v0.12.0
Changes in version v2.6.1 - 2023-09-11
- a3bfc28 Update module golang.org/x/crypto to v0.12.0
- e37e15a Update golang Docker tag to v1.21
- b632c7d Workaround for shadow in lieu of AF_NETLINK support
- 0cb2975 Update module golang.org/x/net to v0.13.0 [SECURITY]
- f73fe6e Keep the 'v' from the tag on the released .tar.gz
- 8104732 Change DefaultRelayURL back to wss://snowflake.torproject.net/.
- d932cb2 feat: add option to expose the stats by using metrics
- af73ab7 Add renovate config
- aaeab3f Update dependencies
- 58c3121 Close temporary UDPSession in TestQueuePacketConnWriteToKCP.
- 80980a3 Fix a comment left over from turbotunnel-quic.
- 08d1c6d Bump minimum required version of go
Changes in version v2.6.0 - 2023-06-19
- Issue 40243: Implement datachannel flow control at proxy
- Issue 40087: Append Let's Encrypt ISRG Root X1 to cert pool
- Issue 40198: Use IP_BIND_ADDRESS_NO_PORT when dialing the ORPort on linux
- Move from gitweb to gitlab
- Add warning log at broker when proxy does not connect with client
- Fix unit tests after SDP validation
- Soften non-critical log from error to warning
- Issue 40231: Validate SDP offers and answers
- Add scanner error check to ClusterCounter.Count
- Fix server benchmark tests
- Issue 40260: Use a sync.Pool to reuse QueuePacketConn buffers
- Issue 40043: Restore ListenAndServe error in server
- Update pion webrtc library versions
- Issue 40108: Add outbound address config option to proxy
- Issue 40260: Fix a data race in the Snowflake server
- Issue 40216: Add utls-imitate, utls-nosni documentation to the README
- Fix up/down traffic stats in standalone proxy
- Issue 40226: Filter out ICE servers that are not STUN
- Issue 40226: Update README to reflect the type of ICE servers we support
- Issue 40226: Parse ICE servers using the pion/ice library function
- Bring client torrc up to date with Tor Browser
Changes in version v2.5.1 - 2023-01-18
- Issue 40249: Fix issue with Skip Hello Verify patch
Changes in version v2.5.0 - 2023-01-18
- Issue 40249: Apply Skip Hello Verify Migration
Changes in version v2.4.3 - 2023-01-16
- Fix version number in version.go
Changes in version v2.4.2 - 2023-01-13
- Issue 40208: Enhance help info for capacity flag
- Issue 40232: Update README and fix help output
- Issue 40173: Increase clientIDAddrMapCapacity
- Issue 40177: Manually unlock mutex in ClientMap.SendQueue
- Issue 40177: Have SnowflakeClientConn implement io.WriterTo
- Issue 40179: Reduce turbotunnel queueSize from 2048 to 512
- Issue 40187/40199: Take ownership of buffer in QueuePacketConn QueueIncoming/WriteTo
- Add more tests for URL encoded IPs (safelog)
- Fix server flag name
- Issue 40200: Use multiple parallel KCP state machines in the server
- Add a num-turbotunnel server transport option
- Issue: 40241: Switch default proxy STUN server to stun.l.google.com
Changes in version v2.4.1 - 2022-12-01
- Issue 40224: Bug fix in utls roundtripper
Changes in version v2.4.0 - 2022-11-29
- Fix proxy command line help output
- Issue 40123: Reduce multicast DNS candidates
- Add ICE ephemeral ports range setting
- Reformat using Go 1.19
- Update CI tests to include latest and minimum Go versions
- Issue 40184: Use fixed unit for bandwidth logging
- Update gorilla/websocket to v1.5.0
- Issue 40175: Server performance improvements
- Issue 40183: Change snowflake proxy log verbosity
- Issue 40117: Display proxy NAT type in logs
- Issue 40198: Add a `orport-srcaddr` server transport option
- Add gofmt output to CI test
- Issue 40185: Change bandwidth type from int to int64 to prevent overflow
- Add version output support to snowflake
- Issue 40229: Change regexes for ipv6 addresses to catch url-encoded addresses
- Issue 40220: Close stale connections in standalone proxy
Changes in version v2.3.0 - 2022-06-23
- Issue 40146: Avoid performing two NAT probe tests at startup
- Issue 40134: Log messages from client NAT check failures are confusing
- Issue 34075: Implement metrics to measure snowflake churn
- Issue 28651: Prepare all pieces of the snowflake pipeline for a second snowflake bridge
- Issue 40129: Distributed Snowflake Server Support
Changes in version v2.2.0 - 2022-05-25
- Issue 40099: Initialize SnowflakeListener.closed
- Add connection failure events for proxy timeouts
- Issue 40103: Fix proxy logging verb tense
- Fix up and downstream metrics output for proxy
- Issue 40054: uTLS for broker negotiation
- Forward bridge fingerprint from client to broker (WIP, Issue 28651)
- Issue 40104: Make it easier to configure proxy type
- Remove version from ClientPollRequest
- Issue 40124: Move tor-specific code out of library
- Issue 40115: Scrub pt event logs
- Issue 40127: Bump webrtc and dtls library versions
- Bump version of webrtc and dtls to fix dtls CVEs
- Issue 40141: Ensure library calls of events can be scrubbed
Changes in version v2.1.0 - 2022-02-08
- Issue 40098: Remove support for legacy one shot mode
- Issue 40079: Make connection summary at proxy privacy preserving
- Issue 40076: Add snowflake event API for notifications of connection events
- Issue 40084: Increase capacity of client address map at the server
- Issue 40060: Further clean up snowflake server logs
- Issue 40089: Validate proxy and client supplied strings at broker
- Issue 40014: Update version of DTLS library to include fingerprinting fixes
- Issue 40075: Support recurring NAT type check in standalone proxy
Changes in version v2.0.0 - 2021-11-04
- Turn the standalone snowflake proxy code into a library
......
FROM docker.io/library/golang:1.23-bookworm AS build
# Set some labels
# io.containers.autoupdate label will instruct podman to reach out to the corres
# corresponding registry to check if the image has been updated. If an image
# must be updated, Podman pulls it down and restarts the systemd unit executing
# the container. See podman-auto-update(1) for more details, or
# https://docs.podman.io/en/latest/markdown/podman-auto-update.1.html
LABEL io.containers.autoupdate=registry
LABEL org.opencontainers.image.authors="anti-censorship-team@lists.torproject.org"
RUN apt-get update && apt-get install -y tor-geoipdb
ADD . /app
WORKDIR /app/proxy
RUN go get
RUN CGO_ENABLED=0 go build -o proxy -ldflags '-extldflags "-static" -w -s' .
FROM scratch
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=build /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=build /usr/share/tor/geoip* /usr/share/tor/
COPY --from=build /app/proxy/proxy /bin/proxy
ENTRYPOINT [ "/bin/proxy" ]
# Snowflake
[![Build Status](https://travis-ci.org/keroserene/snowflake.svg?branch=master)](https://travis-ci.org/keroserene/snowflake)
Pluggable Transport using WebRTC, inspired by Flashproxy.
Snowflake is a censorship-evasion pluggable transport using WebRTC, inspired by Flashproxy.
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
......@@ -55,8 +53,7 @@ There is a Docker-based test environment at https://github.com/cohosh/snowbox.
In the Tor use-case:
1. Volunteers visit websites which host the "snowflake" proxy. (just
like flashproxy)
1. Volunteers visit websites that host the 'snowflake' proxy, run a snowflake [web extension](https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake-webext), or use a standalone proxy.
2. Tor clients automatically find available browser proxies via the Broker
(the domain fronted signaling channel).
3. Tor client and browser proxy establish a WebRTC peer connection.
......@@ -97,3 +94,13 @@ runs in GitLab CI. It is also possible to run this setup in a Virtual Machine
using [vagrant](https://www.vagrantup.com/). Just run `vagrant up` and it will
create and provision the VM. `vagrant ssh` to get into the VM to use it as a
development environment.
##### uTLS Settings
Snowflake communicate with broker that serves as signaling server with TLS based domain fronting connection, which may be identified by its usage of Go language TLS stack.
uTLS is a software library designed to initiate the TLS Client Hello fingerprint of browsers or other popular software's TLS stack to evade censorship based on TLS client hello fingerprint with `-utls-imitate` . You can use `-version` to see a list of supported values.
Depending on client and server configuration, it may not always work as expected as not all extensions are correctly implemented.
You can also remove SNI (Server Name Indication) from client hello to evade censorship with `-utls-nosni`, not all servers supports this.
......@@ -4,7 +4,7 @@ require 'yaml'
srvpath = Pathname.new(File.dirname(__FILE__)).realpath
configfile = YAML.load_file(File.join(srvpath, "/.gitlab-ci.yml"))
remote_url = 'https://git.torproject.org/pluggable-transports/snowflake.git'
remote_url = 'https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake'
# set up essential environment variables
env = configfile['variables']
......
......@@ -5,8 +5,9 @@ import (
"net/http"
"strings"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/amp"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/amp"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util"
)
// ampClientOffers is the AMP-speaking endpoint for client poll messages,
......@@ -34,8 +35,9 @@ func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
encPollReq, err = amp.DecodePath(path)
if err == nil {
arg := messages.Arg{
Body: encPollReq,
RemoteAddr: "",
Body: encPollReq,
RemoteAddr: util.GetClientIp(r),
RendezvousMethod: messages.RendezvousAmpCache,
}
err = i.ClientOffers(arg, &response)
} else {
......
/* (*BridgeListHolderFileBased).LoadBridgeInfo loads a Snowflake Server bridge info description file,
its format is as follows:
This file should be in newline-delimited JSON format(https://jsonlines.org/).
For each line, the format of json data should be in the format of:
{"displayName":"default", "webSocketAddress":"wss://snowflake.torproject.net/", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80A72"}
displayName:string is the name of this bridge. This value is not currently used programmatically.
webSocketAddress:string is the WebSocket URL of this bridge.
This will be the address proxy used to connect to this snowflake server.
fingerprint:string is the identifier of the bridge.
This will be used by a client to identify the bridge it wishes to connect to.
The existence of ANY other fields is NOT permitted.
The file will be considered invalid if there is at least one invalid json record.
In this case, an error will be returned, and none of the records will be loaded.
*/
package main
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"io"
"sync"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
)
var ErrBridgeNotFound = errors.New("bridge with requested fingerprint is unknown to the broker")
func NewBridgeListHolder() BridgeListHolderFileBased {
return &bridgeListHolder{}
}
type bridgeListHolder struct {
bridgeInfo map[bridgefingerprint.Fingerprint]BridgeInfo
accessBridgeInfo sync.RWMutex
}
type BridgeListHolder interface {
GetBridgeInfo(bridgefingerprint.Fingerprint) (BridgeInfo, error)
}
type BridgeListHolderFileBased interface {
BridgeListHolder
LoadBridgeInfo(reader io.Reader) error
}
type BridgeInfo struct {
DisplayName string `json:"displayName"`
WebSocketAddress string `json:"webSocketAddress"`
Fingerprint string `json:"fingerprint"`
}
func (h *bridgeListHolder) GetBridgeInfo(fingerprint bridgefingerprint.Fingerprint) (BridgeInfo, error) {
h.accessBridgeInfo.RLock()
defer h.accessBridgeInfo.RUnlock()
if bridgeInfo, ok := h.bridgeInfo[fingerprint]; ok {
return bridgeInfo, nil
}
return BridgeInfo{}, ErrBridgeNotFound
}
func (h *bridgeListHolder) LoadBridgeInfo(reader io.Reader) error {
bridgeInfoMap := map[bridgefingerprint.Fingerprint]BridgeInfo{}
inputScanner := bufio.NewScanner(reader)
for inputScanner.Scan() {
inputLine := inputScanner.Bytes()
bridgeInfo := BridgeInfo{}
decoder := json.NewDecoder(bytes.NewReader(inputLine))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&bridgeInfo); err != nil {
return err
}
var bridgeFingerprint bridgefingerprint.Fingerprint
var err error
if bridgeFingerprint, err = bridgefingerprint.FingerprintFromHexString(bridgeInfo.Fingerprint); err != nil {
return err
}
bridgeInfoMap[bridgeFingerprint] = bridgeInfo
}
h.accessBridgeInfo.Lock()
defer h.accessBridgeInfo.Unlock()
h.bridgeInfo = bridgeInfoMap
return nil
}
package main
import (
"bytes"
"encoding/hex"
. "github.com/smartystreets/goconvey/convey"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"testing"
)
const DefaultBridges = `{"displayName":"default", "webSocketAddress":"wss://snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80A72"}
`
const ImaginaryBridges = `{"displayName":"default", "webSocketAddress":"wss://snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80A72"}
{"displayName":"imaginary-1", "webSocketAddress":"wss://imaginary-1-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B00"}
{"displayName":"imaginary-2", "webSocketAddress":"wss://imaginary-2-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B01"}
{"displayName":"imaginary-3", "webSocketAddress":"wss://imaginary-3-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B02"}
{"displayName":"imaginary-4", "webSocketAddress":"wss://imaginary-4-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B03"}
{"displayName":"imaginary-5", "webSocketAddress":"wss://imaginary-5-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B04"}
{"displayName":"imaginary-6", "webSocketAddress":"wss://imaginary-6-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B05"}
{"displayName":"imaginary-7", "webSocketAddress":"wss://imaginary-7-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B06"}
{"displayName":"imaginary-8", "webSocketAddress":"wss://imaginary-8-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B07"}
{"displayName":"imaginary-9", "webSocketAddress":"wss://imaginary-9-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B08"}
{"displayName":"imaginary-10", "webSocketAddress":"wss://imaginary-10-snowflake.torproject.org", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80B09"}
`
func TestBridgeLoad(t *testing.T) {
Convey("load default list", t, func() {
bridgeList := NewBridgeListHolder()
So(bridgeList.LoadBridgeInfo(bytes.NewReader([]byte(DefaultBridges))), ShouldBeNil)
{
bridgeFingerprint := [20]byte{}
{
n, err := hex.Decode(bridgeFingerprint[:], []byte("2B280B23E1107BB62ABFC40DDCC8824814F80A72"))
So(n, ShouldEqual, 20)
So(err, ShouldBeNil)
}
Fingerprint, err := bridgefingerprint.FingerprintFromBytes(bridgeFingerprint[:])
So(err, ShouldBeNil)
bridgeInfo, err := bridgeList.GetBridgeInfo(Fingerprint)
So(err, ShouldBeNil)
So(bridgeInfo.DisplayName, ShouldEqual, "default")
So(bridgeInfo.WebSocketAddress, ShouldEqual, "wss://snowflake.torproject.org")
}
})
Convey("load imaginary list", t, func() {
bridgeList := NewBridgeListHolder()
So(bridgeList.LoadBridgeInfo(bytes.NewReader([]byte(ImaginaryBridges))), ShouldBeNil)
{
bridgeFingerprint := [20]byte{}
{
n, err := hex.Decode(bridgeFingerprint[:], []byte("2B280B23E1107BB62ABFC40DDCC8824814F80B07"))
So(n, ShouldEqual, 20)
So(err, ShouldBeNil)
}
Fingerprint, err := bridgefingerprint.FingerprintFromBytes(bridgeFingerprint[:])
So(err, ShouldBeNil)
bridgeInfo, err := bridgeList.GetBridgeInfo(Fingerprint)
So(err, ShouldBeNil)
So(bridgeInfo.DisplayName, ShouldEqual, "imaginary-8")
So(bridgeInfo.WebSocketAddress, ShouldEqual, "wss://imaginary-8-snowflake.torproject.org")
}
})
}
......@@ -6,7 +6,9 @@ SessionDescriptions in order to negotiate a WebRTC connection.
package main
import (
"bytes"
"container/heap"
"context"
"crypto/tls"
"flag"
"io"
......@@ -19,9 +21,14 @@ import (
"syscall"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/safelog"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/ptutil/safelog"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/namematcher"
"golang.org/x/crypto/acme/autocert"
)
......@@ -36,9 +43,21 @@ type BrokerContext struct {
snowflakeLock sync.Mutex
proxyPolls chan *ProxyPoll
metrics *Metrics
bridgeList BridgeListHolderFileBased
allowedRelayPattern string
presumedPatternForLegacyClient string
}
func (ctx *BrokerContext) GetBridgeInfo(fingerprint bridgefingerprint.Fingerprint) (BridgeInfo, error) {
return ctx.bridgeList.GetBridgeInfo(fingerprint)
}
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
func NewBrokerContext(
metricsLogger *log.Logger,
allowedRelayPattern,
presumedPatternForLegacyClient string,
) *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
rSnowflakes := new(SnowflakeHeap)
......@@ -53,12 +72,21 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
panic("Failed to create metrics")
}
bridgeListHolder := NewBridgeListHolder()
const DefaultBridges = `{"displayName":"default", "webSocketAddress":"wss://snowflake.torproject.net/", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80A72"}
`
bridgeListHolder.LoadBridgeInfo(bytes.NewReader([]byte(DefaultBridges)))
return &BrokerContext{
snowflakes: snowflakes,
restrictedSnowflakes: rSnowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: metrics,
snowflakes: snowflakes,
restrictedSnowflakes: rSnowflakes,
idToSnowflake: make(map[string]*Snowflake),
proxyPolls: make(chan *ProxyPoll),
metrics: metrics,
bridgeList: bridgeListHolder,
allowedRelayPattern: allowedRelayPattern,
presumedPatternForLegacyClient: presumedPatternForLegacyClient,
}
}
......@@ -134,15 +162,32 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
heap.Push(ctx.restrictedSnowflakes, snowflake)
}
ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake
ctx.snowflakeLock.Unlock()
return snowflake
}
// Client offer contains an SDP and the NAT type of the client
func (ctx *BrokerContext) InstallBridgeListProfile(reader io.Reader) error {
if err := ctx.bridgeList.LoadBridgeInfo(reader); err != nil {
return err
}
return nil
}
func (ctx *BrokerContext) CheckProxyRelayPattern(pattern string, nonSupported bool) bool {
if nonSupported {
pattern = ctx.presumedPatternForLegacyClient
}
proxyPattern := namematcher.NewNameMatcher(pattern)
brokerPattern := namematcher.NewNameMatcher(ctx.allowedRelayPattern)
return proxyPattern.IsSupersetOf(brokerPattern)
}
// Client offer contains an SDP, bridge fingerprint and the NAT type of the client
type ClientOffer struct {
natType string
sdp []byte
natType string
sdp []byte
fingerprint []byte
}
func main() {
......@@ -152,6 +197,8 @@ func main() {
var addr string
var geoipDatabase string
var geoip6Database string
var bridgeListFilePath, allowedRelayPattern, presumedPatternForLegacyClient string
var brokerSQSQueueName, brokerSQSQueueRegion string
var disableTLS bool
var certFilename, keyFilename string
var disableGeoip bool
......@@ -166,13 +213,17 @@ func main() {
flag.StringVar(&addr, "addr", ":443", "address to listen on")
flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
flag.StringVar(&bridgeListFilePath, "bridge-list-path", "", "file path for bridgeListFile")
flag.StringVar(&allowedRelayPattern, "allowed-relay-pattern", "", "allowed pattern for relay host name. The broker will reject proxies whose AcceptedRelayPattern is more restrictive than this")
flag.StringVar(&presumedPatternForLegacyClient, "default-relay-pattern", "", "presumed pattern for legacy client")
flag.StringVar(&brokerSQSQueueName, "broker-sqs-name", "", "name of broker SQS queue to listen for incoming messages on")
flag.StringVar(&brokerSQSQueueRegion, "broker-sqs-region", "", "name of AWS region of broker SQS queue")
flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
flag.Parse()
var err error
var metricsFile io.Writer
var logOutput io.Writer = os.Stderr
if unsafeLogging {
......@@ -185,6 +236,7 @@ func main() {
log.SetFlags(log.LstdFlags | log.LUTC)
if metricsFilename != "" {
var err error
metricsFile, err = os.OpenFile(metricsFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
......@@ -196,10 +248,21 @@ func main() {
metricsLogger := log.New(metricsFile, "", 0)
ctx := NewBrokerContext(metricsLogger)
ctx := NewBrokerContext(metricsLogger, allowedRelayPattern, presumedPatternForLegacyClient)
if bridgeListFilePath != "" {
bridgeListFile, err := os.Open(bridgeListFilePath)
if err != nil {
log.Fatal(err.Error())
}
err = ctx.InstallBridgeListProfile(bridgeListFile)
if err != nil {
log.Fatal(err.Error())
}
}
if !disableGeoip {
err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
if err != nil {
log.Fatal(err.Error())
}
......@@ -224,6 +287,22 @@ func main() {
Addr: addr,
}
// Run SQS Handler to continuously poll and process messages from SQS
if brokerSQSQueueName != "" && brokerSQSQueueRegion != "" {
log.Printf("Loading SQSHandler using SQS Queue %s in region %s\n", brokerSQSQueueName, brokerSQSQueueRegion)
sqsHandlerContext := context.Background()
cfg, err := config.LoadDefaultConfig(sqsHandlerContext, config.WithRegion(brokerSQSQueueRegion))
if err != nil {
log.Fatal(err)
}
client := sqs.NewFromConfig(cfg)
sqsHandler, err := newSQSHandler(sqsHandlerContext, client, brokerSQSQueueName, brokerSQSQueueRegion, i)
if err != nil {
log.Fatal(err)
}
go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
......@@ -234,7 +313,7 @@ func main() {
for {
signal := <-sigChan
log.Printf("Received signal: %s. Reloading geoip databases.", signal)
if err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database); err != nil {
if err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database); err != nil {
log.Fatalf("reload of Geo IP databases on signal %s returned error: %v", signal, err)
}
}
......@@ -247,12 +326,13 @@ func main() {
// --disable-tls
// The outputs of this block of code are the disableTLS,
// needHTTP01Listener, certManager, and getCertificate variables.
var err error
if acmeHostnamesCommas != "" {
acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
log.Printf("ACME hostnames: %q", acmeHostnames)
var cache autocert.Cache
if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil {
if err := os.MkdirAll(acmeCertCacheDir, 0700); err != nil {
log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err)
} else {
cache = autocert.DirCache(acmeCertCacheDir)
......
package main
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util"
)
const (
......@@ -92,16 +94,16 @@ func debugHandler(i *IPC, w http.ResponseWriter, r *http.Request) {
For snowflake proxies to request a client from the Broker.
*/
func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil {
log.Println("Invalid data.")
log.Println("Invalid data.", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
arg := messages.Arg{
Body: body,
RemoteAddr: r.RemoteAddr,
RemoteAddr: util.GetClientIp(r),
}
var response []byte
......@@ -130,7 +132,7 @@ snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil {
log.Printf("Error reading client request: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
......@@ -140,7 +142,7 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
// Handle the legacy version
//
// We support two client message formats. The legacy format is for backwards
// combatability and relies heavily on HTTP headers and status codes to convey
// compatability and relies heavily on HTTP headers and status codes to convey
// information.
isLegacy := false
if len(body) > 0 && body[0] == '{' {
......@@ -149,7 +151,7 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
Offer: string(body),
NAT: r.Header.Get("Snowflake-NAT-Type"),
}
body, err = req.EncodePollRequest()
body, err = req.EncodeClientPollRequest()
if err != nil {
log.Printf("Error shimming the legacy request: %s", err.Error())
w.WriteHeader(http.StatusInternalServerError)
......@@ -158,8 +160,9 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
}
arg := messages.Arg{
Body: body,
RemoteAddr: "",
Body: body,
RemoteAddr: util.GetClientIp(r),
RendezvousMethod: messages.RendezvousHttp,
}
var response []byte
......@@ -197,21 +200,28 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
}
/*
Expects snowflake proxes which have previously successfully received
Expects snowflake proxies which have previously successfully received
an offer from proxyHandler to respond with an answer in an HTTP POST,
which the broker will pass back to the original client.
*/
func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
if err != nil {
log.Println("Invalid data.", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
err = validateSDP(body)
if err != nil {
log.Println("Invalid data.")
log.Println("Error proxy SDP: ", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
arg := messages.Arg{
Body: body,
RemoteAddr: "",
RemoteAddr: util.GetClientIp(r),
}
var response []byte
......@@ -233,3 +243,12 @@ func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
log.Printf("proxyAnswers unable to write answer response with error: %v", err)
}
}
func validateSDP(SDP []byte) error {
// TODO: more validation likely needed
if !bytes.Contains(SDP, []byte("a=candidate")) {
return fmt.Errorf("SDP contains no candidate")
}
return nil
}
package main
import (
"bytes"
"container/heap"
"encoding/hex"
"fmt"
"log"
"net"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/bridgefingerprint"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/constants"
"github.com/prometheus/client_golang/prometheus"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
)
const (
ClientTimeout = 10
ClientTimeout = constants.BrokerClientTimeout
ProxyTimeout = 10
NATUnknown = "unknown"
......@@ -21,29 +23,20 @@ const (
NATUnrestricted = "unrestricted"
)
type clientVersion int
const (
v1 clientVersion = iota
)
type IPC struct {
ctx *BrokerContext
}
func (i *IPC) Debug(_ interface{}, response *string) error {
var webexts, browsers, standalones, unknowns int
var unknowns int
var natRestricted, natUnrestricted, natUnknown int
proxyTypes := make(map[string]int)
i.ctx.snowflakeLock.Lock()
s := fmt.Sprintf("current snowflakes available: %d\n", len(i.ctx.idToSnowflake))
for _, snowflake := range i.ctx.idToSnowflake {
if snowflake.proxyType == "badge" {
browsers++
} else if snowflake.proxyType == "webext" {
webexts++
} else if snowflake.proxyType == "standalone" {
standalones++
if messages.KnownProxyTypes[snowflake.proxyType] {
proxyTypes[snowflake.proxyType]++
} else {
unknowns++
}
......@@ -60,10 +53,10 @@ func (i *IPC) Debug(_ interface{}, response *string) error {
}
i.ctx.snowflakeLock.Unlock()
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
for pType, num := range proxyTypes {
s += fmt.Sprintf("\t%s proxies: %d\n", pType, num)
}
s += fmt.Sprintf("\tunknown proxies: %d", unknowns)
s += fmt.Sprintf("\nNAT Types available:")
s += fmt.Sprintf("\n\trestricted: %d", natRestricted)
......@@ -75,15 +68,42 @@ func (i *IPC) Debug(_ interface{}, response *string) error {
}
func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
sid, proxyType, natType, clients, err := messages.DecodePollRequest(arg.Body)
sid, proxyType, natType, clients, relayPattern, relayPatternSupported, err := messages.DecodeProxyPollRequestWithRelayPrefix(arg.Body)
if err != nil {
return messages.ErrBadRequest
}
if !relayPatternSupported {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.proxyPollWithoutRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollWithoutRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.proxyPollWithRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollWithRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
}
if !i.ctx.CheckProxyRelayPattern(relayPattern, !relayPatternSupported) {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.proxyPollRejectedWithRelayURLExtension++
i.ctx.metrics.promMetrics.ProxyPollRejectedForRelayURLExtensionTotal.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
i.ctx.metrics.lock.Unlock()
log.Printf("bad request: rejected relay pattern from proxy = %v", messages.ErrBadRequest)
b, err := messages.EncodePollResponseWithRelayURL("", false, "", "", "incorrect relay pattern")
*response = b
if err != nil {
return messages.ErrInternal
}
return nil
}
// Log geoip stats
remoteIP, _, err := net.SplitHostPort(arg.RemoteAddr)
remoteIP := arg.RemoteAddr
if err != nil {
log.Println("Error processing proxy IP: ", err.Error())
log.Println("Warning: cannot process proxy IP: ", err.Error())
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
......@@ -111,7 +131,17 @@ func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
}
i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
var relayURL string
bridgeFingerprint, err := bridgefingerprint.FingerprintFromBytes(offer.fingerprint)
if err != nil {
return messages.ErrBadRequest
}
if info, err := i.ctx.bridgeList.GetBridgeInfo(bridgeFingerprint); err != nil {
return err
} else {
relayURL = info.WebSocketAddress
}
b, err = messages.EncodePollResponseWithRelayURL(string(offer.sdp), true, offer.natType, relayURL, "")
if err != nil {
return messages.ErrInternal
}
......@@ -132,105 +162,67 @@ func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) err
}
func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
var version clientVersion
startTime := time.Now()
body := arg.Body
parts := bytes.SplitN(body, []byte("\n"), 2)
if len(parts) < 2 {
// no version number found
err := fmt.Errorf("unsupported message version")
req, err := messages.DecodeClientPollRequest(arg.Body)
if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
body = parts[1]
if string(parts[0]) == "1.0" {
version = v1
} else {
err := fmt.Errorf("unsupported message version")
offer := &ClientOffer{
natType: req.NAT,
sdp: []byte(req.Offer),
}
fingerprint, err := hex.DecodeString(req.Fingerprint)
if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
var offer *ClientOffer
switch version {
case v1:
req, err := messages.DecodeClientPollRequest(body)
if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
offer = &ClientOffer{
natType: req.NAT,
sdp: []byte(req.Offer),
}
default:
panic("unknown version")
BridgeFingerprint, err := bridgefingerprint.FingerprintFromBytes(fingerprint)
if err != nil {
return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
}
// Only hand out known restricted snowflakes to unrestricted clients
var snowflakeHeap *SnowflakeHeap
if offer.natType == NATUnrestricted {
snowflakeHeap = i.ctx.restrictedSnowflakes
} else {
snowflakeHeap = i.ctx.snowflakes
if _, err := i.ctx.GetBridgeInfo(BridgeFingerprint); err != nil {
return sendClientResponse(
&messages.ClientPollResponse{Error: err.Error()},
response,
)
}
// Immediately fail if there are no snowflakes available.
i.ctx.snowflakeLock.Lock()
numSnowflakes := snowflakeHeap.Len()
i.ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
offer.fingerprint = BridgeFingerprint.ToBytes()
snowflake := i.matchSnowflake(offer.natType)
if snowflake != nil {
snowflake.offerChannel <- offer
} else {
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientDeniedCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
if offer.natType == NATUnrestricted {
i.ctx.metrics.clientUnrestrictedDeniedCount++
} else {
i.ctx.metrics.clientRestrictedDeniedCount++
}
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "denied")
i.ctx.metrics.lock.Unlock()
switch version {
case v1:
resp := &messages.ClientPollResponse{Error: messages.StrNoProxies}
return sendClientResponse(resp, response)
default:
panic("unknown version")
}
resp := &messages.ClientPollResponse{Error: messages.StrNoProxies}
return sendClientResponse(resp, response)
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
i.ctx.snowflakeLock.Lock()
snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
i.ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer
var err error
// Wait for the answer to be returned on the channel or timeout.
select {
case answer := <-snowflake.answerChannel:
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientProxyMatchCount++
i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "matched")
i.ctx.metrics.lock.Unlock()
switch version {
case v1:
resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response)
default:
panic("unknown version")
}
resp := &messages.ClientPollResponse{Answer: answer}
err = sendClientResponse(resp, response)
// Initial tracking of elapsed time.
i.ctx.metrics.lock.Lock()
i.ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond
i.ctx.metrics.lock.Unlock()
case <-time.After(time.Second * ClientTimeout):
log.Println("Client: Timed out.")
switch version {
case v1:
resp := &messages.ClientPollResponse{Error: messages.StrTimedOut}
err = sendClientResponse(resp, response)
default:
panic("unknown version")
}
i.ctx.metrics.lock.Lock()
i.ctx.metrics.UpdateRendezvousStats(arg.RemoteAddr, arg.RendezvousMethod, offer.natType, "timeout")
i.ctx.metrics.lock.Unlock()
resp := &messages.ClientPollResponse{Error: messages.StrTimedOut}
err = sendClientResponse(resp, response)
}
i.ctx.snowflakeLock.Lock()
......@@ -241,6 +233,22 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
return err
}
func (i *IPC) matchSnowflake(natType string) *Snowflake {
i.ctx.snowflakeLock.Lock()
defer i.ctx.snowflakeLock.Unlock()
// Proiritize known restricted snowflakes for unrestricted clients
if natType == NATUnrestricted && i.ctx.restrictedSnowflakes.Len() > 0 {
return heap.Pop(i.ctx.restrictedSnowflakes).(*Snowflake)
}
if i.ctx.snowflakes.Len() > 0 {
return heap.Pop(i.ctx.snowflakes).(*Snowflake)
}
return nil
}
func (i *IPC) ProxyAnswers(arg messages.Arg, response *[]byte) error {
answer, id, err := messages.DecodeAnswerRequest(arg.Body)
if err != nil || answer == "" {
......
......@@ -16,6 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gitlab.torproject.org/tpo/anti-censorship/geoip"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/ptutil/safeprom"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
)
const (
......@@ -23,11 +25,16 @@ const (
metricsResolution = 60 * 60 * 24 * time.Second //86400 seconds
)
var rendezvoudMethodList = [...]messages.RendezvousMethod{
messages.RendezvousHttp,
messages.RendezvousAmpCache,
messages.RendezvousSqs,
}
type CountryStats struct {
standalone map[string]bool
badge map[string]bool
webext map[string]bool
unknown map[string]bool
// map[proxyType][address]bool
proxies map[string]map[string]bool
unknown map[string]bool
natRestricted map[string]bool
natUnrestricted map[string]bool
......@@ -44,10 +51,17 @@ type Metrics struct {
countryStats CountryStats
clientRoundtripEstimate time.Duration
proxyIdleCount uint
clientDeniedCount uint
clientRestrictedDeniedCount uint
clientUnrestrictedDeniedCount uint
clientProxyMatchCount uint
clientDeniedCount map[messages.RendezvousMethod]uint
clientRestrictedDeniedCount map[messages.RendezvousMethod]uint
clientUnrestrictedDeniedCount map[messages.RendezvousMethod]uint
clientProxyMatchCount map[messages.RendezvousMethod]uint
clientProxyTimeoutCount map[messages.RendezvousMethod]uint
rendezvousCountryStats map[messages.RendezvousMethod]map[string]int
proxyPollWithRelayURLExtension uint
proxyPollWithoutRelayURLExtension uint
proxyPollRejectedWithRelayURLExtension uint
// synchronization for access to snowflake metrics
lock sync.Mutex
......@@ -92,26 +106,20 @@ func (s CountryStats) Display() string {
}
func (m *Metrics) UpdateCountryStats(addr string, proxyType string, natType string) {
var country string
var ok bool
if proxyType == "standalone" {
if m.countryStats.standalone[addr] {
return
}
} else if proxyType == "badge" {
if m.countryStats.badge[addr] {
return
}
} else if proxyType == "webext" {
if m.countryStats.webext[addr] {
addresses, ok := m.countryStats.proxies[proxyType]
if !ok {
if m.countryStats.unknown[addr] {
return
}
m.countryStats.unknown[addr] = true
} else {
if m.countryStats.unknown[addr] {
if addresses[addr] {
return
}
addresses[addr] = true
}
ip := net.ParseIP(addr)
......@@ -122,18 +130,7 @@ func (m *Metrics) UpdateCountryStats(addr string, proxyType string, natType stri
if !ok {
country = "??"
}
//update map of unique ips and counts
m.countryStats.counts[country]++
if proxyType == "standalone" {
m.countryStats.standalone[addr] = true
} else if proxyType == "badge" {
m.countryStats.badge[addr] = true
} else if proxyType == "webext" {
m.countryStats.webext[addr] = true
} else {
m.countryStats.unknown[addr] = true
}
m.promMetrics.ProxyTotal.With(prometheus.Labels{
"nat": natType,
......@@ -149,7 +146,61 @@ func (m *Metrics) UpdateCountryStats(addr string, proxyType string, natType stri
default:
m.countryStats.natUnknown[addr] = true
}
}
func (m *Metrics) UpdateRendezvousStats(addr string, rendezvousMethod messages.RendezvousMethod, natType, status string) {
ip := net.ParseIP(addr)
country := "??"
if m.geoipdb != nil {
country_by_addr, ok := m.geoipdb.GetCountryByAddr(ip)
if ok {
country = country_by_addr
}
}
switch status {
case "denied":
m.clientDeniedCount[rendezvousMethod]++
if natType == NATUnrestricted {
m.clientUnrestrictedDeniedCount[rendezvousMethod]++
} else {
m.clientRestrictedDeniedCount[rendezvousMethod]++
}
case "matched":
m.clientProxyMatchCount[rendezvousMethod]++
case "timeout":
m.clientProxyTimeoutCount[rendezvousMethod]++
default:
log.Printf("Unknown rendezvous status: %s", status)
}
m.rendezvousCountryStats[rendezvousMethod][country]++
m.promMetrics.ClientPollTotal.With(prometheus.Labels{
"nat": natType,
"status": status,
"rendezvous_method": string(rendezvousMethod),
"cc": country,
}).Inc()
}
func (m *Metrics) DisplayRendezvousStatsByCountry(rendezvoudMethod messages.RendezvousMethod) string {
output := ""
// Use the records struct to sort our counts map by value.
rs := records{}
for cc, count := range m.rendezvousCountryStats[rendezvoudMethod] {
rs = append(rs, record{cc: cc, count: count})
}
sort.Sort(sort.Reverse(rs))
for _, r := range rs {
output += fmt.Sprintf("%s=%d,", r.cc, binCount(uint(r.count)))
}
// cut off trailing ","
if len(output) > 0 {
return output[:len(output)-1]
}
return output
}
func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
......@@ -164,21 +215,33 @@ func (m *Metrics) LoadGeoipDatabases(geoipDB string, geoip6DB string) error {
func NewMetrics(metricsLogger *log.Logger) (*Metrics, error) {
m := new(Metrics)
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyTimeoutCount = make(map[messages.RendezvousMethod]uint)
m.rendezvousCountryStats = make(map[messages.RendezvousMethod]map[string]int)
for _, rendezvousMethod := range rendezvoudMethodList {
m.rendezvousCountryStats[rendezvousMethod] = make(map[string]int)
}
m.countryStats = CountryStats{
counts: make(map[string]int),
standalone: make(map[string]bool),
badge: make(map[string]bool),
webext: make(map[string]bool),
proxies: make(map[string]map[string]bool),
unknown: make(map[string]bool),
natRestricted: make(map[string]bool),
natUnrestricted: make(map[string]bool),
natUnknown: make(map[string]bool),
}
for pType := range messages.KnownProxyTypes {
m.countryStats.proxies[pType] = make(map[string]bool)
}
m.logger = metricsLogger
m.promMetrics = initPrometheus()
// Write to log file every hour with updated metrics
// Write to log file every day with updated metrics
go m.logMetrics()
return m, nil
......@@ -195,18 +258,36 @@ func (m *Metrics) logMetrics() {
func (m *Metrics) printMetrics() {
m.lock.Lock()
m.logger.Println("snowflake-stats-end", time.Now().UTC().Format("2006-01-02 15:04:05"), fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())))
m.logger.Println(
"snowflake-stats-end",
time.Now().UTC().Format("2006-01-02 15:04:05"),
fmt.Sprintf("(%d s)", int(metricsResolution.Seconds())),
)
m.logger.Println("snowflake-ips", m.countryStats.Display())
m.logger.Println("snowflake-ips-total", len(m.countryStats.standalone)+
len(m.countryStats.badge)+len(m.countryStats.webext)+len(m.countryStats.unknown))
m.logger.Println("snowflake-ips-standalone", len(m.countryStats.standalone))
m.logger.Println("snowflake-ips-badge", len(m.countryStats.badge))
m.logger.Println("snowflake-ips-webext", len(m.countryStats.webext))
total := len(m.countryStats.unknown)
for pType, addresses := range m.countryStats.proxies {
m.logger.Printf("snowflake-ips-%s %d\n", pType, len(addresses))
total += len(addresses)
}
m.logger.Println("snowflake-ips-total", total)
m.logger.Println("snowflake-idle-count", binCount(m.proxyIdleCount))
m.logger.Println("client-denied-count", binCount(m.clientDeniedCount))
m.logger.Println("client-restricted-denied-count", binCount(m.clientRestrictedDeniedCount))
m.logger.Println("client-unrestricted-denied-count", binCount(m.clientUnrestrictedDeniedCount))
m.logger.Println("client-snowflake-match-count", binCount(m.clientProxyMatchCount))
m.logger.Println("snowflake-proxy-poll-with-relay-url-count", binCount(m.proxyPollWithRelayURLExtension))
m.logger.Println("snowflake-proxy-poll-without-relay-url-count", binCount(m.proxyPollWithoutRelayURLExtension))
m.logger.Println("snowflake-proxy-rejected-for-relay-url-count", binCount(m.proxyPollRejectedWithRelayURLExtension))
m.logger.Println("client-denied-count", binCount(sumMapValues(&m.clientDeniedCount)))
m.logger.Println("client-restricted-denied-count", binCount(sumMapValues(&m.clientRestrictedDeniedCount)))
m.logger.Println("client-unrestricted-denied-count", binCount(sumMapValues(&m.clientUnrestrictedDeniedCount)))
m.logger.Println("client-snowflake-match-count", binCount(sumMapValues(&m.clientProxyMatchCount)))
m.logger.Println("client-snowflake-timeout-count", binCount(sumMapValues(&m.clientProxyTimeoutCount)))
for _, rendezvousMethod := range rendezvoudMethodList {
m.logger.Printf("client-%s-count %d\n", rendezvousMethod, binCount(
m.clientDeniedCount[rendezvousMethod]+m.clientProxyMatchCount[rendezvousMethod],
))
m.logger.Printf("client-%s-ips %s\n", rendezvousMethod, m.DisplayRendezvousStatsByCountry(rendezvousMethod))
}
m.logger.Println("snowflake-ips-nat-restricted", len(m.countryStats.natRestricted))
m.logger.Println("snowflake-ips-nat-unrestricted", len(m.countryStats.natUnrestricted))
m.logger.Println("snowflake-ips-nat-unknown", len(m.countryStats.natUnknown))
......@@ -216,14 +297,24 @@ func (m *Metrics) printMetrics() {
// Restores all metrics to original values
func (m *Metrics) zeroMetrics() {
m.proxyIdleCount = 0
m.clientDeniedCount = 0
m.clientRestrictedDeniedCount = 0
m.clientUnrestrictedDeniedCount = 0
m.clientProxyMatchCount = 0
m.clientDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientRestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.clientUnrestrictedDeniedCount = make(map[messages.RendezvousMethod]uint)
m.proxyPollRejectedWithRelayURLExtension = 0
m.proxyPollWithRelayURLExtension = 0
m.proxyPollWithoutRelayURLExtension = 0
m.clientProxyMatchCount = make(map[messages.RendezvousMethod]uint)
m.clientProxyTimeoutCount = make(map[messages.RendezvousMethod]uint)
m.rendezvousCountryStats = make(map[messages.RendezvousMethod]map[string]int)
for _, rendezvousMethod := range rendezvoudMethodList {
m.rendezvousCountryStats[rendezvousMethod] = make(map[string]int)
}
m.countryStats.counts = make(map[string]int)
m.countryStats.standalone = make(map[string]bool)
m.countryStats.badge = make(map[string]bool)
m.countryStats.webext = make(map[string]bool)
for pType := range m.countryStats.proxies {
m.countryStats.proxies[pType] = make(map[string]bool)
}
m.countryStats.unknown = make(map[string]bool)
m.countryStats.natRestricted = make(map[string]bool)
m.countryStats.natUnrestricted = make(map[string]bool)
......@@ -235,12 +326,25 @@ func binCount(count uint) uint {
return uint((math.Ceil(float64(count) / 8)) * 8)
}
func sumMapValues(m *map[messages.RendezvousMethod]uint) uint {
var s uint = 0
for _, v := range *m {
s += v
}
return s
}
type PromMetrics struct {
registry *prometheus.Registry
ProxyTotal *prometheus.CounterVec
ProxyPollTotal *RoundedCounterVec
ClientPollTotal *RoundedCounterVec
ProxyPollTotal *safeprom.CounterVec
ClientPollTotal *safeprom.CounterVec
AvailableProxies *prometheus.GaugeVec
ProxyPollWithRelayURLExtensionTotal *safeprom.CounterVec
ProxyPollWithoutRelayURLExtensionTotal *safeprom.CounterVec
ProxyPollRejectedForRelayURLExtensionTotal *safeprom.CounterVec
}
// Initialize metrics for prometheus exporter
......@@ -267,7 +371,7 @@ func initPrometheus() *PromMetrics {
[]string{"type", "nat"},
)
promMetrics.ProxyPollTotal = NewRoundedCounterVec(
promMetrics.ProxyPollTotal = safeprom.NewCounterVec(
prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "rounded_proxy_poll_total",
......@@ -276,19 +380,49 @@ func initPrometheus() *PromMetrics {
[]string{"nat", "status"},
)
promMetrics.ClientPollTotal = NewRoundedCounterVec(
promMetrics.ProxyPollWithRelayURLExtensionTotal = safeprom.NewCounterVec(
prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "rounded_proxy_poll_with_relay_url_extension_total",
Help: "The number of snowflake proxy polls with Relay URL Extension, rounded up to a multiple of 8",
},
[]string{"nat", "type"},
)
promMetrics.ProxyPollWithoutRelayURLExtensionTotal = safeprom.NewCounterVec(
prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "rounded_proxy_poll_without_relay_url_extension_total",
Help: "The number of snowflake proxy polls without Relay URL Extension, rounded up to a multiple of 8",
},
[]string{"nat", "type"},
)
promMetrics.ProxyPollRejectedForRelayURLExtensionTotal = safeprom.NewCounterVec(
prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "rounded_proxy_poll_rejected_relay_url_extension_total",
Help: "The number of snowflake proxy polls rejected by Relay URL Extension, rounded up to a multiple of 8",
},
[]string{"nat", "type"},
)
promMetrics.ClientPollTotal = safeprom.NewCounterVec(
prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "rounded_client_poll_total",
Help: "The number of snowflake client polls, rounded up to a multiple of 8",
},
[]string{"nat", "status"},
[]string{"nat", "status", "cc", "rendezvous_method"},
)
// We need to register our metrics so they can be exported.
promMetrics.registry.MustRegister(
promMetrics.ClientPollTotal, promMetrics.ProxyPollTotal,
promMetrics.ProxyTotal, promMetrics.AvailableProxies,
promMetrics.ProxyPollWithRelayURLExtensionTotal,
promMetrics.ProxyPollWithoutRelayURLExtensionTotal,
promMetrics.ProxyPollRejectedForRelayURLExtensionTotal,
)
return promMetrics
......
/*
Implements some additional prometheus metrics that we need for privacy preserving
counts of users and proxies
*/
package main
import (
"sync/atomic"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"google.golang.org/protobuf/proto"
)
// New Prometheus counter type that produces rounded counts of metrics
// for privacy preserving reasons
type RoundedCounter interface {
prometheus.Metric
Inc()
}
type roundedCounter struct {
total uint64 //reflects the true count
value uint64 //reflects the rounded count
desc *prometheus.Desc
labelPairs []*dto.LabelPair
}
// Implements the RoundedCounter interface
func (c *roundedCounter) Inc() {
atomic.AddUint64(&c.total, 1)
if c.total > c.value {
atomic.AddUint64(&c.value, 8)
}
}
// Implements the prometheus.Metric interface
func (c *roundedCounter) Desc() *prometheus.Desc {
return c.desc
}
// Implements the prometheus.Metric interface
func (c *roundedCounter) Write(m *dto.Metric) error {
m.Label = c.labelPairs
m.Counter = &dto.Counter{Value: proto.Float64(float64(c.value))}
return nil
}
// New prometheus vector type that will track RoundedCounter metrics
// accross multiple labels
type RoundedCounterVec struct {
*prometheus.MetricVec
}
func NewRoundedCounterVec(opts prometheus.CounterOpts, labelNames []string) *RoundedCounterVec {
desc := prometheus.NewDesc(
prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
opts.Help,
labelNames,
opts.ConstLabels,
)
return &RoundedCounterVec{
MetricVec: prometheus.NewMetricVec(desc, func(lvs ...string) prometheus.Metric {
if len(lvs) != len(labelNames) {
panic("inconsistent cardinality")
}
return &roundedCounter{desc: desc, labelPairs: prometheus.MakeLabelPairs(desc, lvs)}
}),
}
}
// Helper function to return the underlying RoundedCounter metric from MetricVec
func (v *RoundedCounterVec) With(labels prometheus.Labels) RoundedCounter {
metric, err := v.GetMetricWith(labels)
if err != nil {
panic(err)
}
return metric.(RoundedCounter)
}
This diff is collapsed.
package main
import (
"context"
"log"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/messages"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/util"
)
const (
cleanupThreshold = -2 * time.Minute
)
type sqsHandler struct {
SQSClient sqsclient.SQSClient
SQSQueueURL *string
IPC *IPC
cleanupInterval time.Duration
}
func (r *sqsHandler) pollMessages(ctx context.Context, chn chan<- *types.Message) {
for {
select {
case <-ctx.Done():
// if context is cancelled
return
default:
res, err := r.SQSClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: r.SQSQueueURL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 15,
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
})
if err != nil {
log.Printf("SQSHandler: encountered error while polling for messages: %v\n", err)
continue
}
for _, message := range res.Messages {
chn <- &message
}
}
}
}
func (r *sqsHandler) cleanupClientQueues(ctx context.Context) {
for range time.NewTicker(r.cleanupInterval).C {
// Runs at fixed intervals to clean up any client queues that were last changed more than 2 minutes ago
select {
case <-ctx.Done():
// if context is cancelled
return
default:
queueURLsList := []string{}
var nextToken *string
for {
res, err := r.SQSClient.ListQueues(ctx, &sqs.ListQueuesInput{
QueueNamePrefix: aws.String("snowflake-client-"),
MaxResults: aws.Int32(1000),
NextToken: nextToken,
})
if err != nil {
log.Printf("SQSHandler: encountered error while retrieving client queues to clean up: %v\n", err)
// client queues will be cleaned up the next time the cleanup operation is triggered automatically
break
}
queueURLsList = append(queueURLsList, res.QueueUrls...)
if res.NextToken == nil {
break
} else {
nextToken = res.NextToken
}
}
numDeleted := 0
cleanupCutoff := time.Now().Add(cleanupThreshold)
for _, queueURL := range queueURLsList {
if !strings.Contains(queueURL, "snowflake-client-") {
continue
}
res, err := r.SQSClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(queueURL),
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
})
if err != nil {
// According to the AWS SQS docs, the deletion process for a queue can take up to 60 seconds. So the queue
// can be in the process of being deleted, but will still be returned by the ListQueues operation, but
// fail when we try to GetQueueAttributes for the queue
log.Printf("SQSHandler: encountered error while getting attribute of client queue %s. queue may already be deleted.\n", queueURL)
continue
}
lastModifiedInt64, err := strconv.ParseInt(res.Attributes[string(types.QueueAttributeNameLastModifiedTimestamp)], 10, 64)
if err != nil {
log.Printf("SQSHandler: encountered invalid lastModifiedTimetamp value from client queue %s: %v\n", queueURL, err)
continue
}
lastModified := time.Unix(lastModifiedInt64, 0)
if lastModified.Before(cleanupCutoff) {
_, err := r.SQSClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{
QueueUrl: aws.String(queueURL),
})
if err != nil {
log.Printf("SQSHandler: encountered error when deleting client queue %s: %v\n", queueURL, err)
continue
} else {
numDeleted += 1
}
}
}
log.Printf("SQSHandler: finished running iteration of client queue cleanup. found and deleted %d client queues.\n", numDeleted)
}
}
}
func (r *sqsHandler) handleMessage(context context.Context, message *types.Message) {
var encPollReq []byte
var response []byte
var err error
clientID := message.MessageAttributes["ClientID"].StringValue
if clientID == nil {
log.Println("SQSHandler: got SDP offer in SQS message with no client ID. ignoring this message.")
return
}
res, err := r.SQSClient.CreateQueue(context, &sqs.CreateQueueInput{
QueueName: aws.String("snowflake-client-" + *clientID),
})
if err != nil {
log.Printf("SQSHandler: error encountered when creating answer queue for client %s: %v\n", *clientID, err)
return
}
answerSQSURL := res.QueueUrl
encPollReq = []byte(*message.Body)
// Get best guess Client IP for geolocating
remoteAddr := ""
req, err := messages.DecodeClientPollRequest(encPollReq)
if err != nil {
log.Printf("SQSHandler: error encounted when decoding client poll request %s: %v\n", *clientID, err)
} else {
sdp, err := util.DeserializeSessionDescription(req.Offer)
if err != nil {
log.Printf("SQSHandler: error encounted when deserializing session desc %s: %v\n", *clientID, err)
} else {
candidateAddrs := util.GetCandidateAddrs(sdp.SDP)
if len(candidateAddrs) > 0 {
remoteAddr = candidateAddrs[0].String()
}
}
}
arg := messages.Arg{
Body: encPollReq,
RemoteAddr: remoteAddr,
RendezvousMethod: messages.RendezvousSqs,
}
err = r.IPC.ClientOffers(arg, &response)
if err != nil {
log.Printf("SQSHandler: error encountered when handling message: %v\n", err)
return
}
r.SQSClient.SendMessage(context, &sqs.SendMessageInput{
QueueUrl: answerSQSURL,
MessageBody: aws.String(string(response)),
})
}
func (r *sqsHandler) deleteMessage(context context.Context, message *types.Message) {
r.SQSClient.DeleteMessage(context, &sqs.DeleteMessageInput{
QueueUrl: r.SQSQueueURL,
ReceiptHandle: message.ReceiptHandle,
})
}
func newSQSHandler(context context.Context, client sqsclient.SQSClient, sqsQueueName string, region string, i *IPC) (*sqsHandler, error) {
// Creates the queue if a queue with the same name doesn't exist. If a queue with the same name and attributes
// already exists, then nothing will happen. If a queue with the same name, but different attributes exists, then
// an error will be returned
res, err := client.CreateQueue(context, &sqs.CreateQueueInput{
QueueName: aws.String(sqsQueueName),
Attributes: map[string]string{
"MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
},
})
if err != nil {
return nil, err
}
return &sqsHandler{
SQSClient: client,
SQSQueueURL: res.QueueUrl,
IPC: i,
cleanupInterval: time.Second * 30,
}, nil
}
func (r *sqsHandler) PollAndHandleMessages(ctx context.Context) {
log.Println("SQSHandler: Starting to poll for messages at: " + *r.SQSQueueURL)
messagesChn := make(chan *types.Message, 20)
go r.pollMessages(ctx, messagesChn)
go r.cleanupClientQueues(ctx)
for message := range messagesChn {
select {
case <-ctx.Done():
// if context is cancelled
return
default:
go func(msg *types.Message) {
r.handleMessage(ctx, msg)
r.deleteMessage(ctx, msg)
}(message)
}
}
}
package main
import (
"bytes"
"context"
"errors"
"log"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/sqsclient"
)
func TestSQS(t *testing.T) {
Convey("Context", t, func() {
buf := new(bytes.Buffer)
ipcCtx := NewBrokerContext(log.New(buf, "", 0), "", "")
i := &IPC{ipcCtx}
Convey("Responds to SQS client offers...", func() {
ctrl := gomock.NewController(t)
mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
brokerSQSQueueName := "example-name"
responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
runSQSHandler := func(sqsHandlerContext context.Context) {
mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
QueueName: aws.String(brokerSQSQueueName),
Attributes: map[string]string{
"MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
},
}).Return(&sqs.CreateQueueOutput{
QueueUrl: responseQueueURL,
}, nil).Times(1)
sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
So(err, ShouldBeNil)
go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
}
messageBody := aws.String("1.0\n{\"offer\": \"fake\", \"nat\": \"unknown\"}")
receiptHandle := "fake-receipt-handle"
sqsReceiveMessageInput := sqs.ReceiveMessageInput{
QueueUrl: responseQueueURL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 15,
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
}
sqsDeleteMessageInput := sqs.DeleteMessageInput{
QueueUrl: responseQueueURL,
ReceiptHandle: &receiptHandle,
}
Convey("by ignoring it if no client id specified", func(c C) {
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
return &sqs.ReceiveMessageOutput{
Messages: []types.Message{
{
Body: messageBody,
ReceiptHandle: &receiptHandle,
},
},
}, nil
},
)
mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).MinTimes(1).Do(
func(ctx context.Context, input *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) {
sqsCancelFunc()
},
)
// We expect no queues to be created
mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
runSQSHandler(sqsHandlerContext)
<-sqsHandlerContext.Done()
})
Convey("by doing nothing if an error occurs upon receipt of the message", func(c C) {
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).MinTimes(1).DoAndReturn(
func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
sqsCancelFunc()
return nil, errors.New("error")
},
)
// We expect no queues to be created or deleted
mockSQSClient.EXPECT().CreateQueue(gomock.Any(), gomock.Any()).Times(0)
mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).Times(0)
runSQSHandler(sqsHandlerContext)
<-sqsHandlerContext.Done()
})
Convey("by attempting to create a new sqs queue...", func() {
clientId := "fake-id"
sqsCreateQueueInput := sqs.CreateQueueInput{
QueueName: aws.String("snowflake-client-fake-id"),
}
validMessage := &sqs.ReceiveMessageOutput{
Messages: []types.Message{
{
Body: messageBody,
MessageAttributes: map[string]types.MessageAttributeValue{
"ClientID": {StringValue: &clientId},
},
ReceiptHandle: &receiptHandle,
},
},
}
Convey("and does not attempt to send a message via SQS if queue creation fails.", func(c C) {
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
sqsCancelFunc()
return validMessage, nil
})
mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(nil, errors.New("error")).AnyTimes()
mockSQSClient.EXPECT().DeleteMessage(sqsHandlerContext, &sqsDeleteMessageInput).AnyTimes()
runSQSHandler(sqsHandlerContext)
<-sqsHandlerContext.Done()
})
Convey("and responds with a proxy answer if available.", func(c C) {
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
var numTimes atomic.Uint32
mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, &sqsReceiveMessageInput).AnyTimes().DoAndReturn(
func(ctx context.Context, input *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
n := numTimes.Add(1)
if n == 1 {
snowflake := ipcCtx.AddSnowflake("fake", "", NATUnrestricted, 0)
go func(c C) {
<-snowflake.offerChannel
snowflake.answerChannel <- "fake answer"
}(c)
return validMessage, nil
}
return nil, errors.New("error")
})
mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqsCreateQueueInput).Return(&sqs.CreateQueueOutput{
QueueUrl: responseQueueURL,
}, nil).AnyTimes()
mockSQSClient.EXPECT().DeleteMessage(gomock.Any(), gomock.Any()).AnyTimes()
mockSQSClient.EXPECT().SendMessage(sqsHandlerContext, gomock.Any()).Times(1).DoAndReturn(
func(ctx context.Context, input *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) {
c.So(input.MessageBody, ShouldEqual, aws.String("{\"answer\":\"fake answer\"}"))
// Ensure that match is correctly recorded in metrics
ipcCtx.metrics.printMetrics()
c.So(buf.String(), ShouldContainSubstring, `client-denied-count 0
client-restricted-denied-count 0
client-unrestricted-denied-count 0
client-snowflake-match-count 8
client-snowflake-timeout-count 0
client-http-count 0
client-http-ips
client-ampcache-count 0
client-ampcache-ips
client-sqs-count 8
client-sqs-ips ??=8
`)
sqsCancelFunc()
return &sqs.SendMessageOutput{}, nil
},
)
runSQSHandler(sqsHandlerContext)
<-sqsHandlerContext.Done()
})
})
})
Convey("Cleans up SQS client queues...", func() {
brokerSQSQueueName := "example-name"
responseQueueURL := aws.String("https://sqs.us-east-1.amazonaws.com/testing")
ctrl := gomock.NewController(t)
mockSQSClient := sqsclient.NewMockSQSClient(ctrl)
runSQSHandler := func(sqsHandlerContext context.Context) {
mockSQSClient.EXPECT().CreateQueue(sqsHandlerContext, &sqs.CreateQueueInput{
QueueName: aws.String(brokerSQSQueueName),
Attributes: map[string]string{
"MessageRetentionPeriod": strconv.FormatInt(int64((5 * time.Minute).Seconds()), 10),
},
}).Return(&sqs.CreateQueueOutput{
QueueUrl: responseQueueURL,
}, nil).Times(1)
mockSQSClient.EXPECT().ReceiveMessage(sqsHandlerContext, gomock.Any()).AnyTimes().Return(
&sqs.ReceiveMessageOutput{
Messages: []types.Message{},
}, nil,
)
sqsHandler, err := newSQSHandler(sqsHandlerContext, mockSQSClient, brokerSQSQueueName, "example-region", i)
So(err, ShouldBeNil)
// Set the cleanup interval to 1 ns so we can immediately test the cleanup logic
sqsHandler.cleanupInterval = time.Nanosecond
go sqsHandler.PollAndHandleMessages(sqsHandlerContext)
}
Convey("does nothing if there are no open queues.", func() {
var wg sync.WaitGroup
wg.Add(1)
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
defer wg.Wait()
mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
QueueNamePrefix: aws.String("snowflake-client-"),
MaxResults: aws.Int32(1000),
NextToken: nil,
}).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
wg.Done()
// Cancel the handler context since we are only interested in testing one iteration of the cleanup
sqsCancelFunc()
return &sqs.ListQueuesOutput{
QueueUrls: []string{},
}, nil
})
runSQSHandler(sqsHandlerContext)
})
Convey("deletes open queue when there is one open queue.", func(c C) {
var wg sync.WaitGroup
wg.Add(1)
sqsHandlerContext, sqsCancelFunc := context.WithCancel(context.Background())
clientQueueUrl1 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-1"
clientQueueUrl2 := "https://sqs.us-east-1.amazonaws.com/snowflake-client-2"
gomock.InOrder(
mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
QueueNamePrefix: aws.String("snowflake-client-"),
MaxResults: aws.Int32(1000),
NextToken: nil,
}).Times(1).Return(&sqs.ListQueuesOutput{
QueueUrls: []string{
clientQueueUrl1,
clientQueueUrl2,
},
}, nil),
mockSQSClient.EXPECT().ListQueues(sqsHandlerContext, &sqs.ListQueuesInput{
QueueNamePrefix: aws.String("snowflake-client-"),
MaxResults: aws.Int32(1000),
NextToken: nil,
}).Times(1).DoAndReturn(func(ctx context.Context, input *sqs.ListQueuesInput, optFns ...func(*sqs.Options)) (*sqs.ListQueuesOutput, error) {
// Executed on second iteration of cleanupClientQueues loop. This means that one full iteration has completed and we can verify the results of that iteration
wg.Done()
sqsCancelFunc()
return &sqs.ListQueuesOutput{
QueueUrls: []string{},
}, nil
}),
)
gomock.InOrder(
mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(clientQueueUrl1),
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
}).Times(1).Return(&sqs.GetQueueAttributesOutput{
Attributes: map[string]string{
string(types.QueueAttributeNameLastModifiedTimestamp): "0",
}}, nil),
mockSQSClient.EXPECT().GetQueueAttributes(sqsHandlerContext, &sqs.GetQueueAttributesInput{
QueueUrl: aws.String(clientQueueUrl2),
AttributeNames: []types.QueueAttributeName{types.QueueAttributeNameLastModifiedTimestamp},
}).Times(1).Return(&sqs.GetQueueAttributesOutput{
Attributes: map[string]string{
string(types.QueueAttributeNameLastModifiedTimestamp): "0",
}}, nil),
)
gomock.InOrder(
mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
QueueUrl: aws.String(clientQueueUrl1),
}).Return(&sqs.DeleteQueueOutput{}, nil),
mockSQSClient.EXPECT().DeleteQueue(sqsHandlerContext, &sqs.DeleteQueueInput{
QueueUrl: aws.String(clientQueueUrl2),
}).Return(&sqs.DeleteQueueOutput{}, nil),
)
runSQSHandler(sqsHandlerContext)
wg.Wait()
})
})
})
}
{"displayName":"flakey", "webSocketAddress":"wss://snowflake.torproject.net", "fingerprint":"2B280B23E1107BB62ABFC40DDCC8824814F80A72"}
{"displayName":"second", "webSocketAddress":"wss://02.snowflake.torproject.net", "fingerprint":"8838024498816A039FCBBAB14E6F40A0843051FA"}
......@@ -15,7 +15,7 @@ It is based on the [goptlib](https://gitweb.torproject.org/pluggable-transports/
### Dependencies
- Go 1.13+
- Go 1.15+
- We use the [pion/webrtc](https://github.com/pion/webrtc) library for WebRTC communication with Snowflake proxies. Note: running `go get` will fetch this dependency automatically during the build process.
### Building the Snowflake client
......@@ -29,23 +29,24 @@ go build
### Running the Snowflake client with Tor
The Snowflake client can be configured with either command line options or SOCKS options. We have a few example `torrc` files in this directory. We recommend the following `torrc` options by default:
The Snowflake client can be configured with SOCKS options. We have a few example `torrc` files in this directory. We recommend the following `torrc` options by default:
```
UseBridges 1
ClientTransportPlugin snowflake exec ./client \
-url https://snowflake-broker.torproject.net.global.prod.fastly.net/ \
-front cdn.sstatic.net \
-ice stun:stun.voip.blackberry.com:3478,stun:stun.altar.com.pl:3478,stun:stun.antisip.com:3478,stun:stun.bluesip.net:3478,stun:stun.dus.net:3478,stun:stun.epygi.com:3478,stun:stun.sonetel.com:3478,stun:stun.sonetel.net:3478,stun:stun.stunprotocol.org:3478,stun:stun.uls.co.za:3478,stun:stun.voipgate.com:3478,stun:stun.voys.nl:3478
ClientTransportPlugin snowflake exec ./client -log snowflake.log
Bridge snowflake 192.0.2.3:1
Bridge snowflake 192.0.2.3:80 2B280B23E1107BB62ABFC40DDCC8824814F80A72 fingerprint=2B280B23E1107BB62ABFC40DDCC8824814F80A72 url=https://snowflake-broker.torproject.net.global.prod.fastly.net/ fronts=foursquare.com,github.githubassets.com ice=stun:stun.l.google.com:19302,stun:stun.antisip.com:3478,stun:stun.bluesip.net:3478,stun:stun.dus.net:3478,stun:stun.epygi.com:3478,stun:stun.sonetel.com:3478,stun:stun.uls.co.za:3478,stun:stun.voipgate.com:3478,stun:stun.voys.nl:3478 utls-imitate=hellorandomizedalpn
```
`-url` is the URL of a broker instance. If you would like to try out Snowflake with your own broker, simply provide the URL of your broker instance with this option.
`fingerprint=` is the fingerprint of bridge that the client will ultimately be connecting to.
`url=` is the URL of a broker instance. If you would like to try out Snowflake with your own broker, simply provide the URL of your broker instance with this option.
`fronts=` is an optional, comma-seperated list front domains for the broker request.
`-front` is an optional front domain for the broker request.
`ice=` is a comma-separated list of ICE servers. These must be STUN (over UDP) servers with the form stun:<var>host</var>[:<var>port</var>]. We recommend using servers that have implemented NAT discovery. See our wiki page on [NAT traversal](https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/wikis/NAT-matching) for more information.
`-ice` is a comma-separated list of ICE servers. These can be STUN or TURN servers. We recommend using servers that have implemented NAT discovery. See our wiki page on [NAT traversal](https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/wikis/NAT-matching) for more information.
`utls-imitate=` configuration instructs the client to use fingerprinting resistance when connecting when rendez-vous'ing with the broker.
To bootstrap Tor, run:
```
......
......@@ -7,6 +7,7 @@ import (
"time"
. "github.com/smartystreets/goconvey/convey"
"gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
)
type FakeDialer struct {
......@@ -171,7 +172,8 @@ func TestSnowflakeClient(t *testing.T) {
func TestWebRTCPeer(t *testing.T) {
Convey("WebRTCPeer", t, func(c C) {
p := &WebRTCPeer{closed: make(chan struct{})}
p := &WebRTCPeer{closed: make(chan struct{}),
eventsLogger: event.NewSnowflakeEventDispatcher()}
Convey("checks for staleness", func() {
go p.checkForStaleness(time.Second)
<-time.After(2 * time.Second)
......@@ -188,19 +190,15 @@ func TestICEServerParser(t *testing.T) {
length int
}{
{
[]string{"stun:stun.l.google.com:19302"},
[][]string{[]string{"stun:stun.l.google.com:19302"}},
1,
},
{
[]string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
[]string{"stun:stun.l.google.com:19302", "stun:stun.ekiga.net"},
[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun:stun.ekiga.net:3478"}},
2,
},
{
[]string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
2,
[]string{"stun:stun1.l.google.com:19302", "stun.ekiga.net", "stun:stun.example.com:1234/path?query",
"https://example.com", "turn:relay.metered.ca:80?transport=udp"},
[][]string{[]string{"stun:stun1.l.google.com:19302"}},
1,
},
} {
servers := parseIceServers(test.input)
......
......@@ -29,6 +29,7 @@ type Peers struct {
melt chan struct{}
collectLock sync.Mutex
closeOnce sync.Once
}
// NewPeers constructs a fresh container of remote peers.
......@@ -122,17 +123,19 @@ func (p *Peers) purgeClosedPeers() {
// End closes all active connections to Peers contained here, and stops the
// collection of future Peers.
func (p *Peers) End() {
close(p.melt)
p.collectLock.Lock()
defer p.collectLock.Unlock()
close(p.snowflakeChan)
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
conn.Close()
p.activePeers.Remove(e)
e = next
}
log.Printf("WebRTC: melted all %d snowflakes.", cnt)
p.closeOnce.Do(func() {
close(p.melt)
p.collectLock.Lock()
defer p.collectLock.Unlock()
close(p.snowflakeChan)
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
next := e.Next()
conn := e.Value.(*WebRTCPeer)
conn.Close()
p.activePeers.Remove(e)
e = next
}
log.Printf("WebRTC: melted all %d snowflakes.", cnt)
})
}