-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtopic.go
52 lines (44 loc) · 1.02 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package event
type topic struct {
id Identifier
queuedEvents chan Event
closed chan bool
handlers *[]Handler
errorHandlers *[]ErrorHandler
}
func newTopic(id Identifier, buffer int, handlers *[]Handler, errorHandlers *[]ErrorHandler) *topic {
tpc := &topic{
id: id,
queuedEvents: make(chan Event, buffer),
closed: make(chan bool),
handlers: handlers,
errorHandlers: errorHandlers,
}
go tpc.worker(tpc.queuedEvents, tpc.closed)
return tpc
}
func (tpc *topic) handle(evt Event) {
tpc.queuedEvents <- evt
}
func (tpc *topic) worker(queuedEvents <-chan Event, closed chan<- bool) {
for evt := range queuedEvents {
if evt == nil {
break
}
for _, hdl := range *tpc.handlers {
if err := hdl.Handle(evt); err != nil {
tpc.error(evt, err)
}
}
}
closed <- true
}
func (tpc *topic) shutdown() {
tpc.queuedEvents <- nil
<-tpc.closed
}
func (tpc *topic) error(evt Event, err error) {
for _, errHdl := range *tpc.errorHandlers {
errHdl.Handle(evt, err)
}
}