Skip to content
Snippets Groups Projects
Commit 7092b2cb authored by Arlo Breault's avatar Arlo Breault
Browse files

Revert abstracting copyloop

parent 30b5ef8a
Branches
Tags
No related merge requests found
......@@ -2,8 +2,6 @@ package websocketconn
import (
"io"
"log"
"sync"
"time"
"github.com/gorilla/websocket"
......@@ -70,20 +68,3 @@ func NewWebSocketConn(ws *websocket.Conn) WebSocketConn {
conn.Ws = ws
return conn
}
// Copy from WebSocket to socket and vice versa.
func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) {
var wg sync.WaitGroup
copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
defer wg.Done()
if _, err := io.Copy(dst, src); err != nil {
log.Printf("io.Copy inside CopyLoop generated an error: %v", err)
}
dst.Close()
src.Close()
}
wg.Add(2)
go copyer(c1, c2)
go copyer(c2, c1)
wg.Wait()
}
package websocketconn
import (
"net"
"testing"
. "github.com/smartystreets/goconvey/convey"
)
func TestWebsocketConn(t *testing.T) {
Convey("CopyLoop", t, func() {
c1, s1 := net.Pipe()
c2, s2 := net.Pipe()
go CopyLoop(s1, s2)
go func() {
bytes := []byte("Hello!")
c1.Write(bytes)
}()
bytes := make([]byte, 6)
n, err := c2.Read(bytes)
So(n, ShouldEqual, 6)
So(err, ShouldEqual, nil)
So(bytes, ShouldResemble, []byte("Hello!"))
s1.Close()
// Check that copy loop has closed other connection
_, err = s2.Write(bytes)
So(err, ShouldNotBeNil)
})
}
......@@ -374,4 +374,23 @@ func TestUtilityFuncs(t *testing.T) {
sid2 := genSessionID()
So(sid1, ShouldNotEqual, sid2)
})
Convey("CopyLoop", t, func() {
c1, s1 := net.Pipe()
c2, s2 := net.Pipe()
go CopyLoop(s1, s2)
go func() {
bytes := []byte("Hello!")
c1.Write(bytes)
}()
bytes := make([]byte, 6)
n, err := c2.Read(bytes)
So(n, ShouldEqual, 6)
So(err, ShouldEqual, nil)
So(bytes, ShouldResemble, []byte("Hello!"))
s1.Close()
//Check that copy loop has closed other connection
_, err = s2.Write(bytes)
So(err, ShouldNotBeNil)
})
}
......@@ -240,6 +240,22 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
return nil
}
func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) {
var wg sync.WaitGroup
copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
defer wg.Done()
if _, err := io.Copy(dst, src); err != nil {
log.Printf("io.Copy inside CopyLoop generated an error: %v", err)
}
dst.Close()
src.Close()
}
wg.Add(2)
go copyer(c1, c2)
go copyer(c2, c1)
wg.Wait()
}
// We pass conn.RemoteAddr() as an additional parameter, rather than calling
// conn.RemoteAddr() inside this function, as a workaround for a hang that
// otherwise occurs inside of conn.pc.RemoteDescription() (called by
......@@ -272,7 +288,7 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
wsConn := websocketconn.NewWebSocketConn(ws)
log.Printf("connected to relay")
defer wsConn.Close()
websocketconn.CopyLoop(conn, &wsConn)
CopyLoop(conn, &wsConn)
log.Printf("datachannelHandler ends")
}
......
......@@ -15,6 +15,7 @@ import (
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
......@@ -50,6 +51,35 @@ additional HTTP listener on port 80 to work with ACME.
flag.PrintDefaults()
}
// Copy from WebSocket to socket and vice versa.
func proxy(local *net.TCPConn, conn *websocketconn.WebSocketConn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
if _, err := io.Copy(conn, local); err != nil {
log.Printf("error copying ORPort to WebSocket %v", err)
}
if err := local.CloseRead(); err != nil {
log.Printf("error closing read after copying ORPort to WebSocket %v", err)
}
conn.Close()
wg.Done()
}()
go func() {
if _, err := io.Copy(local, conn); err != nil {
log.Printf("error copying WebSocket to ORPort")
}
if err := local.CloseWrite(); err != nil {
log.Printf("error closing write after copying WebSocket to ORPort %v", err)
}
conn.Close()
wg.Done()
}()
wg.Wait()
}
// Return an address string suitable to pass into pt.DialOr.
func clientAddr(clientIPParam string) string {
if clientIPParam == "" {
......@@ -75,8 +105,8 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
wsConn := websocketconn.NewWebSocketConn(ws)
defer wsConn.Close()
conn := websocketconn.NewWebSocketConn(ws)
defer conn.Close()
// Pass the address of client as the remote address of incoming connection
clientIPParam := r.URL.Query().Get("client_ip")
......@@ -93,7 +123,7 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
defer or.Close()
websocketconn.CopyLoop(or, &wsConn)
proxy(or, &conn)
}
func initServer(addr *net.TCPAddr,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment