Loading server/snowflake.go +99 −97 Original line number Diff line number Diff line Loading @@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { } func datachannelHandler(conn *webRTCConn) { defer conn.Close() handlerChan <- 1 defer func() { handlerChan <- -1 Loading @@ -91,12 +93,34 @@ func datachannelHandler(conn *webRTCConn) { log.Printf("Failed to connect to ORPort: " + err.Error()) return } //defer or.Close() defer or.Close() copyLoop(conn, or) } // Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE // candidates is complete and and answer is available in LocalDescription. // Installs an OnDataChannel callback that creates a webRTCConn and passes it to // datachannelHandler. func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) { errChan := make(chan error) answerChan := make(chan *webrtc.SessionDescription) pc, err := webrtc.NewPeerConnection(config) if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } pc.OnIceComplete = func() { answerChan <- pc.LocalDescription() } pc.OnDataChannel = func(dc *data.Channel) { log.Println("OnDataChannel") pr, pw := io.Pipe() conn.pr = pr dc := conn.dc dc.OnOpen = func() { log.Println("OnOpen channel") } Loading @@ -105,7 +129,6 @@ func datachannelHandler(conn *webRTCConn) { pw.Close() } dc.OnMessage = func(msg []byte) { // log.Printf("OnMessage channel %d %+q", len(msg), msg) log.Printf("OnMessage <--- %d bytes", len(msg)) n, err := pw.Write(msg) if err != nil { Loading @@ -116,96 +139,77 @@ func datachannelHandler(conn *webRTCConn) { } } go copyLoop(conn, or) conn := &webRTCConn{pc: pc, dc: dc, pr: pr} go datachannelHandler(conn) } func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { pc, err := webrtc.NewPeerConnection(config) err = pc.SetRemoteDescription(sdp) if err != nil { log.Printf("NewPeerConnection: %s", err) return nil, err } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } pc.OnDataChannel = func(dc *data.Channel) { log.Println("OnDataChannel") datachannelHandler(&webRTCConn{pc: pc, dc: dc}) } pc.OnIceComplete = func() { log.Printf("----------------") fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) log.Printf("----------------") } return pc, nil } func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) { s := bufio.NewScanner(f) for s.Scan() { msg := s.Text() sdp := webrtc.DeserializeSessionDescription(msg) if sdp == nil { log.Printf("ignoring invalid signal message %+q", msg) continue } signalChan <- sdp continue } if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) } pc.Close() return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } log.Println("sdp offer successfully received.") func generateAnswer(pc *webrtc.PeerConnection) { fmt.Println("Generating answer...") go func() { log.Println("Generating answer...") answer, err := pc.CreateAnswer() // blocking if err != nil { fmt.Println(err) errChan <- err return } pc.SetLocalDescription(answer) err = pc.SetLocalDescription(answer) if err != nil { errChan <- err return } }() func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) { err := syscall.Mkfifo(signal, 0600) // Wait until answer is ready. select { case err = <-errChan: pc.Close() return nil, err case <-answerChan: } return pc, nil } // Create a signaling named pipe and feed offers from it into // makePeerConnectionFromOffer. func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error { err := syscall.Mkfifo(filename, 0600) if err != nil { if err.(syscall.Errno) != syscall.EEXIST { return nil, err return err } } signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600) signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600) if err != nil { return nil, err return err } //defer signalFile.Close() var signalChan = make(chan *webrtc.SessionDescription) defer signalFile.Close() go func() { for { select { case sdp := <-signalChan: pc, err := makePeerConnection(config) if err != nil { log.Printf("makePeerConnection: %s", err) break s := bufio.NewScanner(signalFile) for s.Scan() { msg := s.Text() sdp := webrtc.DeserializeSessionDescription(msg) if sdp == nil { log.Printf("ignoring invalid signal message %+q", msg) continue } err = pc.SetRemoteDescription(sdp) pc, err := makePeerConnectionFromOffer(sdp, config) if err != nil { fmt.Println("ERROR", err) break log.Printf("makePeerConnectionFromOffer: %s", err) continue } fmt.Println("sdp offer successfully received.") go generateAnswer(pc) // Write offer to log for manual signaling. log.Printf("----------------") fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) log.Printf("----------------") } } }() go readSignalingMessages(signalChan, signalFile) log.Printf("waiting for offer") return signalFile, nil return s.Err() } func main() { Loading @@ -228,18 +232,19 @@ func main() { webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302")) listeners := make([]*os.File, 0) // Start FIFO-based signaling receiver. go func() { err := receiveSignalsFIFO("signal", webRTCConfig) if err != nil { log.Printf("receiveSignalsFIFO: %s", err) } }() for _, bindaddr := range ptInfo.Bindaddrs { switch bindaddr.MethodName { case ptMethodName: ln, err := listenWebRTC(webRTCConfig, "signal") // meh if err != nil { pt.SmethodError(bindaddr.MethodName, err.Error()) break } bindaddr.Addr.Port = 12345 // lies!!! pt.Smethod(bindaddr.MethodName, bindaddr.Addr) listeners = append(listeners, ln) default: pt.SmethodError(bindaddr.MethodName, "no such method") } Loading @@ -260,9 +265,6 @@ func main() { case sig = <-sigChan: } } for _, ln := range listeners { ln.Close() } if sig == syscall.SIGTERM { return Loading Loading
server/snowflake.go +99 −97 Original line number Diff line number Diff line Loading @@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { } func datachannelHandler(conn *webRTCConn) { defer conn.Close() handlerChan <- 1 defer func() { handlerChan <- -1 Loading @@ -91,12 +93,34 @@ func datachannelHandler(conn *webRTCConn) { log.Printf("Failed to connect to ORPort: " + err.Error()) return } //defer or.Close() defer or.Close() copyLoop(conn, or) } // Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE // candidates is complete and and answer is available in LocalDescription. // Installs an OnDataChannel callback that creates a webRTCConn and passes it to // datachannelHandler. func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) { errChan := make(chan error) answerChan := make(chan *webrtc.SessionDescription) pc, err := webrtc.NewPeerConnection(config) if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } pc.OnIceComplete = func() { answerChan <- pc.LocalDescription() } pc.OnDataChannel = func(dc *data.Channel) { log.Println("OnDataChannel") pr, pw := io.Pipe() conn.pr = pr dc := conn.dc dc.OnOpen = func() { log.Println("OnOpen channel") } Loading @@ -105,7 +129,6 @@ func datachannelHandler(conn *webRTCConn) { pw.Close() } dc.OnMessage = func(msg []byte) { // log.Printf("OnMessage channel %d %+q", len(msg), msg) log.Printf("OnMessage <--- %d bytes", len(msg)) n, err := pw.Write(msg) if err != nil { Loading @@ -116,96 +139,77 @@ func datachannelHandler(conn *webRTCConn) { } } go copyLoop(conn, or) conn := &webRTCConn{pc: pc, dc: dc, pr: pr} go datachannelHandler(conn) } func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { pc, err := webrtc.NewPeerConnection(config) err = pc.SetRemoteDescription(sdp) if err != nil { log.Printf("NewPeerConnection: %s", err) return nil, err } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } pc.OnDataChannel = func(dc *data.Channel) { log.Println("OnDataChannel") datachannelHandler(&webRTCConn{pc: pc, dc: dc}) } pc.OnIceComplete = func() { log.Printf("----------------") fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) log.Printf("----------------") } return pc, nil } func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) { s := bufio.NewScanner(f) for s.Scan() { msg := s.Text() sdp := webrtc.DeserializeSessionDescription(msg) if sdp == nil { log.Printf("ignoring invalid signal message %+q", msg) continue } signalChan <- sdp continue } if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) } pc.Close() return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } log.Println("sdp offer successfully received.") func generateAnswer(pc *webrtc.PeerConnection) { fmt.Println("Generating answer...") go func() { log.Println("Generating answer...") answer, err := pc.CreateAnswer() // blocking if err != nil { fmt.Println(err) errChan <- err return } pc.SetLocalDescription(answer) err = pc.SetLocalDescription(answer) if err != nil { errChan <- err return } }() func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) { err := syscall.Mkfifo(signal, 0600) // Wait until answer is ready. select { case err = <-errChan: pc.Close() return nil, err case <-answerChan: } return pc, nil } // Create a signaling named pipe and feed offers from it into // makePeerConnectionFromOffer. func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error { err := syscall.Mkfifo(filename, 0600) if err != nil { if err.(syscall.Errno) != syscall.EEXIST { return nil, err return err } } signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600) signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600) if err != nil { return nil, err return err } //defer signalFile.Close() var signalChan = make(chan *webrtc.SessionDescription) defer signalFile.Close() go func() { for { select { case sdp := <-signalChan: pc, err := makePeerConnection(config) if err != nil { log.Printf("makePeerConnection: %s", err) break s := bufio.NewScanner(signalFile) for s.Scan() { msg := s.Text() sdp := webrtc.DeserializeSessionDescription(msg) if sdp == nil { log.Printf("ignoring invalid signal message %+q", msg) continue } err = pc.SetRemoteDescription(sdp) pc, err := makePeerConnectionFromOffer(sdp, config) if err != nil { fmt.Println("ERROR", err) break log.Printf("makePeerConnectionFromOffer: %s", err) continue } fmt.Println("sdp offer successfully received.") go generateAnswer(pc) // Write offer to log for manual signaling. log.Printf("----------------") fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) log.Printf("----------------") } } }() go readSignalingMessages(signalChan, signalFile) log.Printf("waiting for offer") return signalFile, nil return s.Err() } func main() { Loading @@ -228,18 +232,19 @@ func main() { webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302")) listeners := make([]*os.File, 0) // Start FIFO-based signaling receiver. go func() { err := receiveSignalsFIFO("signal", webRTCConfig) if err != nil { log.Printf("receiveSignalsFIFO: %s", err) } }() for _, bindaddr := range ptInfo.Bindaddrs { switch bindaddr.MethodName { case ptMethodName: ln, err := listenWebRTC(webRTCConfig, "signal") // meh if err != nil { pt.SmethodError(bindaddr.MethodName, err.Error()) break } bindaddr.Addr.Port = 12345 // lies!!! pt.Smethod(bindaddr.MethodName, bindaddr.Addr) listeners = append(listeners, ln) default: pt.SmethodError(bindaddr.MethodName, "no such method") } Loading @@ -260,9 +265,6 @@ func main() { case sig = <-sigChan: } } for _, ln := range listeners { ln.Close() } if sig == syscall.SIGTERM { return Loading