Skip to content

Commit c8e5222

Browse files
committed
update.
1 parent 6219c14 commit c8e5222

File tree

6 files changed

+130
-134
lines changed

6 files changed

+130
-134
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
all: linux darwin windows
22

3-
release: all upx tar
3+
release: all tar
44

55
clean:
66
rm -rf bin/*

go.sum

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
55
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
66
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
77
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
8-
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
98
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
109
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
1110
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
@@ -20,9 +19,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
2019
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
2120
github.com/rs/zerolog v1.14.3 h1:4EGfSkR2hJDB0s3oFfrlPqjU1e4WLncergLil3nEKW0=
2221
github.com/rs/zerolog v1.14.3/go.mod h1:3WXPzbXEEliJ+a6UFE4vhIxV8qR1EML6ngzP9ug4eYg=
23-
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
2422
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
25-
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
2623
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
2724
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
2825
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

pkg/signaler/signaler.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"strings"
88

99
"github.com/cloudwebrtc/flutter-webrtc-server/pkg/logger"
10-
"github.com/cloudwebrtc/flutter-webrtc-server/pkg/transport"
1110
"github.com/cloudwebrtc/flutter-webrtc-server/pkg/turn"
11+
"github.com/cloudwebrtc/flutter-webrtc-server/pkg/websocket"
1212
)
1313

1414
func Marshal(m map[string]interface{}) string {
@@ -38,8 +38,8 @@ type PeerInfo struct {
3838

3939
// Peer .
4040
type Peer struct {
41-
info PeerInfo
42-
transport *transport.WebSocketTransport
41+
info PeerInfo
42+
conn *websocket.WebSocketConn
4343
}
4444

4545
// Session info.
@@ -70,7 +70,7 @@ func (s Signaler) authHandler(username string, realm string, srcAddr net.Addr) (
7070
return nil, false
7171
}
7272

73-
func (s *Signaler) NotifyPeersUpdate(transport *transport.WebSocketTransport, peers map[string]Peer) {
73+
func (s *Signaler) NotifyPeersUpdate(conn *websocket.WebSocketConn, peers map[string]Peer) {
7474
infos := []PeerInfo{}
7575
for _, peer := range peers {
7676
infos = append(infos, peer.info)
@@ -79,33 +79,33 @@ func (s *Signaler) NotifyPeersUpdate(transport *transport.WebSocketTransport, pe
7979
request["type"] = "peers"
8080
request["data"] = infos
8181
for _, peer := range peers {
82-
peer.transport.Send(Marshal(request))
82+
peer.conn.Send(Marshal(request))
8383
}
8484
}
8585

8686
func (s *Signaler) HandleTurnServerCredentials(writer http.ResponseWriter, request *http.Request) {
8787
// return turn credentials for client.
8888
}
8989

90-
func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, request *http.Request) {
90+
func (s *Signaler) HandleNewWebSocket(conn *websocket.WebSocketConn, request *http.Request) {
9191
logger.Infof("On Open %v", request)
92-
transport.On("message", func(message []byte) {
92+
conn.On("message", func(message []byte) {
9393
request := Unmarshal(string(message))
9494
data := request["data"].(map[string]interface{})
9595
switch request["type"] {
9696
case "new":
9797
{
9898

9999
peer := Peer{
100-
transport: transport,
100+
conn: conn,
101101
info: PeerInfo{
102102
ID: data["id"].(string),
103103
Name: data["name"].(string),
104104
UserAgent: data["user_agent"].(string),
105105
},
106106
}
107107
s.peers[peer.info.ID] = peer
108-
s.NotifyPeersUpdate(transport, s.peers)
108+
s.NotifyPeersUpdate(conn, s.peers)
109109
}
110110
break
111111
case "leave":
@@ -128,10 +128,10 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
128128
"reason": "Peer [" + To + "] not found ",
129129
},
130130
}
131-
transport.Send(Marshal(msg))
131+
conn.Send(Marshal(msg))
132132
return
133133
} else {
134-
peer.transport.Send(Marshal(request))
134+
peer.conn.Send(Marshal(request))
135135
}
136136
}
137137
break
@@ -147,7 +147,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
147147
"reason": "Invalid session [" + sessionID + "]",
148148
},
149149
}
150-
transport.Send(Marshal(msg))
150+
conn.Send(Marshal(msg))
151151
return
152152
}
153153
if peer, ok := s.peers[ids[0]]; !ok {
@@ -158,7 +158,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
158158
"reason": "Peer [" + ids[0] + "] not found.",
159159
},
160160
}
161-
transport.Send(Marshal(msg))
161+
conn.Send(Marshal(msg))
162162
return
163163
} else {
164164
bye := map[string]interface{}{
@@ -168,7 +168,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
168168
"session_id": sessionID,
169169
},
170170
}
171-
peer.transport.Send(Marshal(bye))
171+
peer.conn.Send(Marshal(bye))
172172
}
173173

174174
if peer, ok := s.peers[ids[1]]; !ok {
@@ -179,7 +179,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
179179
"reason": "Peer [" + ids[0] + "] not found ",
180180
},
181181
}
182-
transport.Send(Marshal(msg))
182+
conn.Send(Marshal(msg))
183183
return
184184
} else {
185185
bye := map[string]interface{}{
@@ -189,7 +189,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
189189
"session_id": sessionID,
190190
},
191191
}
192-
peer.transport.Send(Marshal(bye))
192+
peer.conn.Send(Marshal(bye))
193193
}
194194
}
195195
break
@@ -198,7 +198,7 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
198198
"type": "keepalive",
199199
"data": map[string]interface{}{},
200200
}
201-
transport.Send(Marshal(keepalive))
201+
conn.Send(Marshal(keepalive))
202202
break
203203
default:
204204
{
@@ -208,15 +208,15 @@ func (s *Signaler) HandleNewWebSocket(transport *transport.WebSocketTransport, r
208208
}
209209
})
210210

211-
transport.On("close", func(code int, text string) {
212-
logger.Infof("On Close %v", transport)
211+
conn.On("close", func(code int, text string) {
212+
logger.Infof("On Close %v", conn)
213213
for _, peer := range s.peers {
214-
if peer.transport == transport {
214+
if peer.conn == conn {
215215
logger.Infof("Remove peer %s", peer.info.ID)
216216
delete(s.peers, peer.info.ID)
217217
break
218218
}
219219
}
220-
s.NotifyPeersUpdate(transport, s.peers)
220+
s.NotifyPeersUpdate(conn, s.peers)
221221
})
222222
}

pkg/transport/transport.go

Lines changed: 0 additions & 104 deletions
This file was deleted.

pkg/websocket/conn.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package websocket
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
8+
"github.com/chuckpreslar/emission"
9+
"github.com/cloudwebrtc/flutter-webrtc-server/pkg/logger"
10+
"github.com/gorilla/websocket"
11+
)
12+
13+
const pingPeriod = 5 * time.Second
14+
15+
type WebSocketConn struct {
16+
emission.Emitter
17+
socket *websocket.Conn
18+
mutex *sync.Mutex
19+
closed bool
20+
}
21+
22+
func NewWebSocketConn(socket *websocket.Conn) *WebSocketConn {
23+
var conn WebSocketConn
24+
conn.Emitter = *emission.NewEmitter()
25+
conn.socket = socket
26+
conn.mutex = new(sync.Mutex)
27+
conn.closed = false
28+
conn.socket.SetCloseHandler(func(code int, text string) error {
29+
logger.Warnf("%s [%d]", text, code)
30+
conn.Emit("close", code, text)
31+
conn.closed = true
32+
return nil
33+
})
34+
return &conn
35+
}
36+
37+
func (conn *WebSocketConn) ReadMessage() {
38+
in := make(chan []byte)
39+
stop := make(chan struct{})
40+
pingTicker := time.NewTicker(pingPeriod)
41+
42+
var c = conn.socket
43+
go func() {
44+
for {
45+
_, message, err := c.ReadMessage()
46+
if err != nil {
47+
logger.Warnf("Got error: %v", err)
48+
if c, k := err.(*websocket.CloseError); k {
49+
conn.Emit("error", c.Code, c.Text)
50+
}
51+
close(stop)
52+
break
53+
}
54+
in <- message
55+
}
56+
}()
57+
58+
for {
59+
select {
60+
case _ = <-pingTicker.C:
61+
logger.Infof("Send keepalive !!!")
62+
if err := conn.Send("{}"); err != nil {
63+
logger.Errorf("Keepalive has failed")
64+
pingTicker.Stop()
65+
return
66+
}
67+
case message := <-in:
68+
{
69+
logger.Infof("Recivied data: %s", message)
70+
conn.Emit("message", []byte(message))
71+
}
72+
case <-stop:
73+
return
74+
}
75+
}
76+
}
77+
78+
/*
79+
* Send |message| to the connection.
80+
*/
81+
func (conn *WebSocketConn) Send(message string) error {
82+
logger.Infof("Send data: %s", message)
83+
conn.mutex.Lock()
84+
defer conn.mutex.Unlock()
85+
if conn.closed {
86+
return errors.New("websocket: write closed")
87+
}
88+
return conn.socket.WriteMessage(websocket.TextMessage, []byte(message))
89+
}
90+
91+
/*
92+
* Close conn.
93+
*/
94+
func (conn *WebSocketConn) Close() {
95+
conn.mutex.Lock()
96+
defer conn.mutex.Unlock()
97+
if conn.closed == false {
98+
logger.Infof("Close ws conn now : ", conn)
99+
conn.socket.Close()
100+
conn.closed = true
101+
} else {
102+
logger.Warnf("Transport already closed :", conn)
103+
}
104+
}

0 commit comments

Comments
 (0)