Skip to content
Snippets Groups Projects
Commit 098cc811 authored by Cecylia Bocovich's avatar Cecylia Bocovich
Browse files

Patch

parent 8c875f0b
No related branches found
No related tags found
No related merge requests found
......@@ -2,22 +2,45 @@ package lib
import (
"crypto/rand"
"encoding/csv"
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
"git.torproject.org/pluggable-transports/snowflake.git/common/util"
"github.com/pion/sdp/v2"
"github.com/pion/webrtc/v2"
)
var csvWriter *csv.Writer
func init() {
f, err := os.OpenFile("proxytest.csv", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
panic(err)
}
csvWriter = csv.NewWriter(f)
}
func Now() string {
return time.Now().UTC().Format("2006-01-02 15:04:05.000")
}
// Remote WebRTC peer.
//
// Handles preparation of go-webrtc PeerConnection. Only ever has
// one DataChannel.
type WebRTCPeer struct {
id string
attempt int
pc *webrtc.PeerConnection
transport *webrtc.DataChannel
......@@ -29,7 +52,17 @@ type WebRTCPeer struct {
once sync.Once // Synchronization for PeerConnection destruction
BytesLogger BytesLogger
BytesLogger BytesLogger
hasRead, hasWritten bool
}
func (c *WebRTCPeer) Out(varname, value string) {
csvWriter.Write([]string{c.id, strconv.Itoa(c.attempt), varname, value})
csvWriter.Flush()
err := csvWriter.Error()
if err != nil {
panic(err)
}
}
// Construct a WebRTC PeerConnection.
......@@ -67,6 +100,11 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
// Writes bytes out to remote WebRTC.
// As part of |io.ReadWriter|
func (c *WebRTCPeer) Write(b []byte) (int, error) {
if !c.hasWritten {
c.Out("ts_first_send", Now())
}
c.hasWritten = true
err := c.transport.Send(b)
if err != nil {
return 0, err
......@@ -94,6 +132,7 @@ func (c *WebRTCPeer) checkForStaleness() {
return
}
if time.Since(c.lastReceive) > SnowflakeTimeout {
c.Out("ts_idle_timeout", Now())
log.Printf("WebRTC: No messages received for %v -- closing stale connection.",
SnowflakeTimeout)
c.Close()
......@@ -108,12 +147,14 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel
// TODO: When go-webrtc is more stable, it's possible that a new
// PeerConnection won't need to be re-prepared each time.
var err error
c.pc, err = preparePeerConnection(config)
c.pc, err = c.preparePeerConnection(config)
if err != nil {
return err
}
answer := exchangeSDP(broker, c.pc.LocalDescription())
answer := c.exchangeSDP(broker, c.pc.LocalDescription())
log.Printf("Received Answer.\n")
c.Out("answer_sdp", answer.SDP)
summarizeSDP(c, "answer", answer.SDP)
err = c.pc.SetRemoteDescription(*answer)
if nil != err {
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
......@@ -131,7 +172,7 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel
// preparePeerConnection creates a new WebRTC PeerConnection and returns it
// after ICE candidate gathering is complete..
func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
pc, err := webrtc.NewPeerConnection(*config)
if err != nil {
log.Printf("NewPeerConnection ERROR: %s", err)
......@@ -164,6 +205,9 @@ func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection
return nil, err
}
log.Println("WebRTC: Set local description")
offerSDP := util.StripLocalAddresses(offer.SDP)
c.Out("offer_sdp", offerSDP)
summarizeSDP(c, "offer", offerSDP)
<-offerChannel // Wait for ICE candidate gathering to complete.
log.Println("WebRTC: PeerConnection created.")
......@@ -185,6 +229,7 @@ func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
openChannel := make(chan struct{})
dc.OnOpen(func() {
log.Println("WebRTC: DataChannel.OnOpen")
c.Out("ts_open", Now())
close(openChannel)
})
dc.OnClose(func() {
......@@ -195,6 +240,10 @@ func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
if len(msg.Data) <= 0 {
log.Println("0 length message---")
}
if !c.hasRead {
c.Out("ts_first_recv", Now())
}
c.hasRead = true
n, err := c.writePipe.Write(msg.Data)
c.BytesLogger.AddInbound(n)
if err != nil {
......@@ -219,14 +268,18 @@ func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
// exchangeSDP sends the local SDP offer to the Broker, awaits the SDP answer,
// and returns the answer.
func exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrtc.SessionDescription {
func (c *WebRTCPeer) exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrtc.SessionDescription {
// Keep trying the same offer until a valid answer arrives.
for {
// Send offer to broker (blocks).
c.Out("ts_broker_start", Now())
answer, err := broker.Negotiate(offer)
c.Out("ts_broker_end", Now())
if err == nil {
return answer
c.attempt += 1
}
c.Out("broker_err", err.Error())
log.Printf("BrokerChannel Error: %s", err)
log.Printf("Failed to retrieve answer. Retrying in %v", ReconnectTimeout)
<-time.After(ReconnectTimeout)
......@@ -251,3 +304,72 @@ func (c *WebRTCPeer) cleanup() {
}
}
}
func summarizeSDP(c *WebRTCPeer, label, text string) {
var desc sdp.SessionDescription
err := desc.Unmarshal([]byte(text))
if err != nil {
panic(err)
}
numTransportTCP := 0
numTransportUDP := 0
numTransportOther := 0
numAddressIPv4 := 0
numAddressIPv6 := 0
numAddressLocalHostname := 0
numAddressOther := 0
for _, m := range desc.MediaDescriptions {
if ci := m.ConnectionInformation; ci != nil {
c.Out(label+"_c_network_type", ci.NetworkType)
c.Out(label+"_c_address_type", ci.AddressType)
if ip := net.ParseIP(ci.Address.Address); ip != nil {
if ip.IsUnspecified() {
c.Out(label+"_c_address_zero", "T")
} else {
c.Out(label+"_c_address_zero", "F")
}
}
}
i := 0
for _, a := range m.Attributes {
candidate, err := a.ToICECandidate()
if err != nil {
continue
}
c.Out(fmt.Sprintf(label+"_candidate_transport.%d", i), candidate.Protocol)
c.Out(fmt.Sprintf(label+"_candidate_priority.%d", i), strconv.FormatUint(uint64(candidate.Priority), 10))
c.Out(fmt.Sprintf(label+"_candidate_typ.%d", i), candidate.Typ)
switch strings.ToLower(candidate.Protocol) {
case "tcp":
numTransportTCP += 1
case "udp":
numTransportUDP += 1
default:
numTransportOther += 1
}
if ip := net.ParseIP(candidate.Address); ip != nil {
if ip.To4() != nil {
numAddressIPv4 += 1
c.Out(fmt.Sprintf(label+"_candidate_address_type.%d", i), "IP4")
} else {
numAddressIPv6 += 1
c.Out(fmt.Sprintf(label+"_candidate_address_type.%d", i), "IP6")
}
} else if strings.HasSuffix(strings.ToLower(candidate.Address), ".local") {
c.Out(fmt.Sprintf(label+"_candidate_address_type.%d", i), "local_hostname")
numAddressLocalHostname += 1
} else {
c.Out(fmt.Sprintf(label+"_candidate_address_type.%d", i), "other")
numAddressOther += 1
}
i++
}
}
c.Out(label+"_num_transport_tcp", strconv.Itoa(numTransportTCP))
c.Out(label+"_num_transport_udp", strconv.Itoa(numTransportUDP))
c.Out(label+"_num_transport_other", strconv.Itoa(numTransportOther))
c.Out(label+"_num_address_ipv4", strconv.Itoa(numAddressIPv4))
c.Out(label+"_num_address_ipv6", strconv.Itoa(numAddressIPv6))
c.Out(label+"_num_address_local_hostname", strconv.Itoa(numAddressLocalHostname))
c.Out(label+"_num_address_other", strconv.Itoa(numAddressOther))
}
......@@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
......@@ -34,6 +35,21 @@ func ConnectLoop(snowflakes sf.SnowflakeCollector) {
if err != nil {
log.Printf("WebRTC: %v Retrying in %v...",
err, sf.ReconnectTimeout)
} else {
s := snowflakes.Pop()
n, err := s.Write([]byte("hello"))
s.Out("write_n", strconv.Itoa(n))
if err != nil {
s.Out("write_err", err.Error())
}
var buf [5]byte
n, err = s.Read(buf[:])
s.Out("read_n", strconv.Itoa(n))
if err != nil {
s.Out("read_err", err.Error())
}
s.Out("ts_close", sf.Now())
s.Close()
}
select {
case <-time.After(sf.ReconnectTimeout):
......@@ -166,7 +182,7 @@ func main() {
// Use a real logger to periodically output how much traffic is happening.
snowflakes.BytesLogger = sf.NewBytesSyncLogger()
go ConnectLoop(snowflakes)
ConnectLoop(snowflakes)
// Begin goptlib client process.
ptInfo, err := pt.ClientSetup(nil)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment