forked from alash3al/go-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
broker.go
119 lines (101 loc) · 2.41 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package pubsub
import (
"sync"
"time"
)
// Broker The broker related meta data
type Broker struct {
subscribers Subscribers
sLock sync.RWMutex
topics map[string]Subscribers
tLock sync.RWMutex
}
// NewBroker Create new broker
func NewBroker() *Broker {
return &Broker{
subscribers: Subscribers{},
sLock: sync.RWMutex{},
topics: map[string]Subscribers{},
tLock: sync.RWMutex{},
}
}
// Attach Create a new subscriber and register it into our main broker
func (b *Broker) Attach() (*Subscriber, error) {
s, err := NewSubscriber()
if err != nil {
return nil, err
}
b.sLock.Lock()
b.subscribers[s.GetID()] = s
b.sLock.Unlock()
return s, nil
}
// Subscribe subscribes the specified subscriber "s" to the specified list of topic(s)
func (b *Broker) Subscribe(s *Subscriber, topics ...string) {
b.tLock.Lock()
defer b.tLock.Unlock()
for _, topic := range topics {
if nil == b.topics[topic] {
b.topics[topic] = Subscribers{}
}
s.AddTopic(topic)
b.topics[topic][s.id] = s
}
}
// Unsubscribe Unsubscribe the specified subscriber from the specified topic(s)
func (b *Broker) Unsubscribe(s *Subscriber, topics ...string) {
for _, topic := range topics {
b.tLock.Lock()
if nil == b.topics[topic] {
continue
}
delete(b.topics[topic], s.id)
b.tLock.Unlock()
s.RemoveTopic(topic)
}
}
// Detach remove the specified subscriber from the broker
func (b *Broker) Detach(s *Subscriber) {
s.destroy()
b.sLock.Lock()
b.Unsubscribe(s, s.GetTopics()...)
delete(b.subscribers, s.id)
defer b.sLock.Unlock()
}
// Broadcast broadcast the specified payload to all the topic(s) subscribers
func (b *Broker) Broadcast(payload interface{}, topics ...string) {
for _, topic := range topics {
if b.Subscribers(topic) < 1 {
continue
}
b.tLock.RLock()
for _, s := range b.topics[topic] {
m := &Message{
topic: topic,
payload: payload,
createdAt: time.Now().UnixNano(),
}
go (func(s *Subscriber) {
s.Signal(m)
})(s)
}
b.tLock.RUnlock()
}
}
// Subscribers Get the subscribers count
func (b *Broker) Subscribers(topic string) int {
b.tLock.RLock()
defer b.tLock.RUnlock()
return len(b.topics[topic])
}
// GetTopics Returns a slice of topics
func (b *Broker) GetTopics() []string {
b.tLock.RLock()
brokerTopics := b.topics
b.tLock.RUnlock()
topics := []string{}
for topic := range brokerTopics {
topics = append(topics, topic)
}
return topics
}