GitLab is used only for code review, issue tracking and project management. Canonical locations for source code are still https://gitweb.torproject.org/ https://git.torproject.org/ and git-rw.torproject.org.

Commit 2d43dd26 authored by Cecylia Bocovich's avatar Cecylia Bocovich

Merge branch 'issue/21314'

parents 3c331750 cc55481f
Pipeline #963 passed with stage
in 17 minutes and 4 seconds
......@@ -7,6 +7,9 @@ import (
// Interface for catching Snowflakes. (aka the remote dialer)
type Tongue interface {
Catch() (*WebRTCPeer, error)
// Get the maximum number of snowflakes
GetMax() int
}
// Interface for collecting some number of Snowflakes, for passing along
......
......@@ -27,13 +27,19 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
return r, nil
}
type FakeDialer struct{}
type FakeDialer struct {
max int
}
func (w FakeDialer) Catch() (*WebRTCPeer, error) {
fmt.Println("Caught a dummy snowflake.")
return &WebRTCPeer{}, nil
}
func (w FakeDialer) GetMax() int {
return w.max
}
type FakeSocksConn struct {
net.Conn
rejected bool
......@@ -55,19 +61,19 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Peers", t, func() {
Convey("Can construct", func() {
p := NewPeers(1)
So(p.capacity, ShouldEqual, 1)
d := &FakeDialer{max: 1}
p, _ := NewPeers(d)
So(p.Tongue.GetMax(), ShouldEqual, 1)
So(p.snowflakeChan, ShouldNotBeNil)
So(cap(p.snowflakeChan), ShouldEqual, 1)
})
Convey("Collecting a Snowflake requires a Tongue.", func() {
p := NewPeers(1)
_, err := p.Collect()
p, err := NewPeers(nil)
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, 0)
// Set the dialer so that collection is possible.
p.Tongue = FakeDialer{}
d := &FakeDialer{max: 1}
p, err = NewPeers(d)
_, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, 1)
......@@ -77,8 +83,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Collection continues until capacity.", func() {
c := 5
p := NewPeers(c)
p.Tongue = FakeDialer{}
p, _ := NewPeers(FakeDialer{max: c})
// Fill up to capacity.
for i := 0; i < c; i++ {
fmt.Println("Adding snowflake ", i)
......@@ -104,8 +109,7 @@ func TestSnowflakeClient(t *testing.T) {
})
Convey("Count correctly purges peers marked for deletion.", func() {
p := NewPeers(4)
p.Tongue = FakeDialer{}
p, _ := NewPeers(FakeDialer{max: 5})
p.Collect()
p.Collect()
p.Collect()
......@@ -121,7 +125,7 @@ func TestSnowflakeClient(t *testing.T) {
Convey("End Closes all peers.", func() {
cnt := 5
p := NewPeers(cnt)
p, _ := NewPeers(FakeDialer{max: cnt})
for i := 0; i < cnt; i++ {
p.activePeers.PushBack(&WebRTCPeer{})
}
......@@ -132,8 +136,7 @@ func TestSnowflakeClient(t *testing.T) {
})
Convey("Pop skips over closed peers.", func() {
p := NewPeers(4)
p.Tongue = FakeDialer{}
p, _ := NewPeers(FakeDialer{max: 4})
wc1, _ := p.Collect()
wc2, _ := p.Collect()
wc3, _ := p.Collect()
......@@ -157,11 +160,11 @@ func TestSnowflakeClient(t *testing.T) {
SkipConvey("Handler Grants correctly", func() {
socks := &FakeSocksConn{}
snowflakes := &FakePeers{}
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil, 1)
So(socks.rejected, ShouldEqual, false)
snowflakes.toRelease = nil
Handler(socks, snowflakes)
Handler(socks, d)
So(socks.rejected, ShouldEqual, true)
})
})
......@@ -169,14 +172,14 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Dialers", t, func() {
Convey("Can construct WebRTCDialer.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil)
d := NewWebRTCDialer(broker, nil, 1)
So(d, ShouldNotBeNil)
So(d.BrokerChannel, ShouldNotBeNil)
So(d.BrokerChannel.Host, ShouldEqual, "test")
})
SkipConvey("WebRTCDialer can Catch a snowflake.", func() {
broker := &BrokerChannel{Host: "test"}
d := NewWebRTCDialer(broker, nil)
d := NewWebRTCDialer(broker, nil, 1)
conn, err := d.Catch()
So(conn, ShouldBeNil)
So(err, ShouldNotBeNil)
......
......@@ -24,33 +24,37 @@ type Peers struct {
snowflakeChan chan *WebRTCPeer
activePeers *list.List
capacity int
melt chan struct{}
}
// Construct a fresh container of remote peers.
func NewPeers(max int) *Peers {
p := &Peers{capacity: max}
func NewPeers(tongue Tongue) (*Peers, error) {
p := &Peers{}
// Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
p.snowflakeChan = make(chan *WebRTCPeer, max)
if tongue == nil {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
p.activePeers = list.New()
p.melt = make(chan struct{})
return p
p.Tongue = tongue
return p, nil
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Collect() (*WebRTCPeer, error) {
cnt := p.Count()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
if cnt >= p.capacity {
return nil, fmt.Errorf("At capacity [%d/%d]", cnt, p.capacity)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// Engage the Snowflake Catching interface, which must be available.
if nil == p.Tongue {
return nil, errors.New("missing Tongue to catch Snowflakes with")
}
cnt := p.Count()
capacity := p.Tongue.GetMax()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
if cnt >= capacity {
return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// BUG: some broker conflict here.
connection, err := p.Tongue.Catch()
if nil != err {
......
......@@ -155,15 +155,18 @@ func (bc *BrokerChannel) SetNATType(NATType string) {
type WebRTCDialer struct {
*BrokerChannel
webrtcConfig *webrtc.Configuration
max int
}
func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer {
func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer {
config := webrtc.Configuration{
ICEServers: iceServers,
}
return &WebRTCDialer{
BrokerChannel: broker,
webrtcConfig: &config,
max: max,
}
}
......@@ -173,3 +176,8 @@ func (w WebRTCDialer) Catch() (*WebRTCPeer, error) {
// TODO: [#25596] Consider TURN servers here too.
return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel)
}
// Returns the maximum number of snowflakes to collect
func (w WebRTCDialer) GetMax() int {
return w.max
}
......@@ -142,7 +142,19 @@ var sessionManager = sessionManager_{}
// Given an accepted SOCKS connection, establish a WebRTC connection to the
// remote peer and exchange traffic.
func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
func Handler(socks net.Conn, tongue Tongue) error {
// Prepare to collect remote WebRTC peers.
snowflakes, err := NewPeers(tongue)
if err != nil {
return err
}
// Use a real logger to periodically output how much traffic is happening.
snowflakes.BytesLogger = NewBytesSyncLogger()
log.Printf("---- Handler: begin collecting snowflakes ---")
go connectLoop(snowflakes)
// Return the global smux.Session.
sess, err := sessionManager.Get(snowflakes)
if err != nil {
......@@ -160,9 +172,31 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
log.Printf("---- Handler: begin stream %v ---", stream.ID())
copyLoop(socks, stream)
log.Printf("---- Handler: closed stream %v ---", stream.ID())
snowflakes.End()
log.Printf("---- Handler: end collecting snowflakes ---")
return nil
}
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func connectLoop(snowflakes SnowflakeCollector) {
for {
// Check if ending is necessary.
_, err := snowflakes.Collect()
if err != nil {
log.Printf("WebRTC: %v Retrying in %v...",
err, ReconnectTimeout)
}
select {
case <-time.After(ReconnectTimeout):
continue
case <-snowflakes.Melted():
log.Println("ConnectLoop: stopped.")
return
}
}
}
// Exchanges bytes between two ReadWriters.
// (In this case, between a SOCKS connection and smux stream.)
func copyLoop(socks, stream io.ReadWriter) {
......
......@@ -26,28 +26,8 @@ const (
DefaultSnowflakeCapacity = 1
)
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
func ConnectLoop(snowflakes sf.SnowflakeCollector) {
for {
// Check if ending is necessary.
_, err := snowflakes.Collect()
if err != nil {
log.Printf("WebRTC: %v Retrying in %v...",
err, sf.ReconnectTimeout)
}
select {
case <-time.After(sf.ReconnectTimeout):
continue
case <-snowflakes.Melted():
log.Println("ConnectLoop: stopped.")
return
}
}
}
// Accept local SOCKS connections and pass them to the handler.
func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) {
func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue) {
defer ln.Close()
for {
conn, err := ln.AcceptSocks()
......@@ -68,7 +48,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) {
return
}
err = sf.Handler(conn, snowflakes)
err = sf.Handler(conn, tongue)
if err != nil {
log.Printf("handler error: %s", err)
return
......@@ -158,9 +138,6 @@ func main() {
log.Printf("url: %v", strings.Join(server.URLs, " "))
}
// Prepare to collect remote WebRTC peers.
snowflakes := sf.NewPeers(*max)
// Use potentially domain-fronting broker to rendezvous.
broker, err := sf.NewBrokerChannel(
*brokerURL, *frontDomain, sf.CreateBrokerTransport(),
......@@ -170,12 +147,8 @@ func main() {
}
go updateNATType(iceServers, broker)
snowflakes.Tongue = sf.NewWebRTCDialer(broker, iceServers)
// Use a real logger to periodically output how much traffic is happening.
snowflakes.BytesLogger = sf.NewBytesSyncLogger()
go ConnectLoop(snowflakes)
// Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes
dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
// Begin goptlib client process.
ptInfo, err := pt.ClientSetup(nil)
......@@ -197,7 +170,7 @@ func main() {
break
}
log.Printf("Started SOCKS listener at %v.", ln.Addr())
go socksAcceptLoop(ln, snowflakes)
go socksAcceptLoop(ln, dialer)
pt.Cmethod(methodName, ln.Version(), ln.Addr())
listeners = append(listeners, ln)
default:
......@@ -228,7 +201,6 @@ func main() {
for _, ln := range listeners {
ln.Close()
}
snowflakes.End()
log.Println("snowflake is done.")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment