util.go 2.77 KB
Newer Older
1
package lib
2
3

import (
4
	"encoding/json"
5
6
	"log"
	"time"
7

8
	"github.com/pion/webrtc"
9
10
)

Serene Han's avatar
Serene Han committed
11
12
13
14
const (
	LogTimeInterval = 5
)

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type BytesLogger interface {
	Log()
	AddOutbound(int)
	AddInbound(int)
}

// Default BytesLogger does nothing.
type BytesNullLogger struct{}

func (b BytesNullLogger) Log()                   {}
func (b BytesNullLogger) AddOutbound(amount int) {}
func (b BytesNullLogger) AddInbound(amount int)  {}

// BytesSyncLogger uses channels to safely log from multiple sources with output
// occuring at reasonable intervals.
type BytesSyncLogger struct {
31
32
33
34
35
36
37
	OutboundChan chan int
	InboundChan  chan int
	Outbound     int
	Inbound      int
	OutEvents    int
	InEvents     int
	IsLogging    bool
38
39
}

40
func (b *BytesSyncLogger) Log() {
41
	b.IsLogging = true
42
43
44
	var amount int
	output := func() {
		log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
45
46
47
48
49
			b.Inbound, b.Outbound, b.InEvents, b.OutEvents)
		b.Outbound = 0
		b.OutEvents = 0
		b.Inbound = 0
		b.InEvents = 0
50
51
52
53
	}
	last := time.Now()
	for {
		select {
54
55
56
		case amount = <-b.OutboundChan:
			b.Outbound += amount
			b.OutEvents++
57
			last := time.Now()
Serene Han's avatar
Serene Han committed
58
			if time.Since(last) > time.Second*LogTimeInterval {
59
60
61
				last = time.Now()
				output()
			}
62
63
64
		case amount = <-b.InboundChan:
			b.Inbound += amount
			b.InEvents++
Serene Han's avatar
Serene Han committed
65
			if time.Since(last) > time.Second*LogTimeInterval {
66
67
68
				last = time.Now()
				output()
			}
Serene Han's avatar
Serene Han committed
69
		case <-time.After(time.Second * LogTimeInterval):
70
			if b.InEvents > 0 || b.OutEvents > 0 {
71
72
73
74
75
76
				output()
			}
		}
	}
}

77
func (b *BytesSyncLogger) AddOutbound(amount int) {
78
	if !b.IsLogging {
79
80
		return
	}
81
	b.OutboundChan <- amount
82
83
}

84
func (b *BytesSyncLogger) AddInbound(amount int) {
85
	if !b.IsLogging {
86
87
		return
	}
88
	b.InboundChan <- amount
89
}
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
	var parsed map[string]interface{}
	err := json.Unmarshal([]byte(msg), &parsed)
	if nil != err {
		log.Println(err)
		return nil
	}
	if _, ok := parsed["type"]; !ok {
		log.Println("Cannot deserialize SessionDescription without type field.")
		return nil
	}
	if _, ok := parsed["sdp"]; !ok {
		log.Println("Cannot deserialize SessionDescription without sdp field.")
		return nil
	}

	var stype webrtc.SDPType
	switch parsed["type"].(string) {
	default:
		log.Println("Unknown SDP type")
		return nil
	case "offer":
		stype = webrtc.SDPTypeOffer
	case "pranswer":
		stype = webrtc.SDPTypePranswer
	case "answer":
		stype = webrtc.SDPTypeAnswer
	case "rollback":
		stype = webrtc.SDPTypeRollback
	}

	if err != nil {
		log.Println(err)
		return nil
	}
	return &webrtc.SessionDescription{
		Type: stype,
		SDP:  parsed["sdp"].(string),
	}
}

func serializeSessionDescription(desc *webrtc.SessionDescription) string {
	bytes, err := json.Marshal(*desc)
	if nil != err {
		log.Println(err)
		return ""
	}
	return string(bytes)
}