forked from potato2003/actioncable-client-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscription.go
99 lines (84 loc) · 1.92 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package actioncable
import (
"encoding/json"
"sync"
)
type Subscription struct {
consumer *Consumer
Identifier *ChannelIdentifier
NotifyCh chan *SubscriptionEvent
stopHandleCh chan chan struct{}
handler SubscriptionEventHandler
lockForHandler *sync.Mutex
}
func newSubscription(consumer *Consumer, identifier *ChannelIdentifier) *Subscription {
return &Subscription{
consumer: consumer,
Identifier: identifier,
NotifyCh: make(chan *SubscriptionEvent, 32),
stopHandleCh: make(chan chan struct{}, 1),
lockForHandler: &sync.Mutex{},
}
}
func (s *Subscription) Perform(action string, data map[string]interface{}) {
copied := map[string]interface{}{}
for k, v := range data {
copied[k] = v
}
copied["action"] = action
s.Send(copied)
}
func (s *Subscription) Send(data map[string]interface{}) {
encodedData, _ := json.Marshal(data)
s.consumer.send(map[string]interface{}{
"command": "message",
"identifier": s.Identifier,
"data": string(encodedData),
})
}
func (s *Subscription) SetHandler(h SubscriptionEventHandler) {
s.lockForHandler.Lock()
defer s.lockForHandler.Unlock()
s.stopHandle()
s.handler = h
if h == nil {
return
}
go func() {
for {
select {
case se := <-s.NotifyCh:
switch se.Type {
case Connected:
h.OnConnected(se)
case Disconnected:
h.OnDisconnected(se)
case Rejected:
h.OnRejected(se)
case Received:
h.OnReceived(se)
default:
logger.Warnf("unknown subscription event: %v", se)
}
case doneCh := <-s.stopHandleCh:
doneCh <- struct{}{}
return
}
}
}()
}
func (s *Subscription) stopHandle() {
if s.handler == nil {
return
}
doneCh := make(chan struct{}, 1)
defer close(doneCh)
s.stopHandleCh <- doneCh
<-doneCh
}
func (s *Subscription) Unsubscribe() {
s.lockForHandler.Lock()
defer s.lockForHandler.Unlock()
s.stopHandle()
s.consumer.Subscriptions.remove(s)
}