-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
131 lines (106 loc) · 3.32 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package amqp
import (
"sync"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/pkg/errors"
"gopkg.in/gopaws/go.events.v2"
)
type Listener struct {
*events.Emitter
params ListenerParams
driver *Driver
guard sync.Mutex
start chan events.Event
stop chan events.Event
errors chan events.Event
data chan events.Event
}
func NewListener(driver *Driver, params ListenerParams) *Listener {
listener := new(Listener)
listener.Emitter = events.NewEmitter()
listener.params = params
listener.driver = driver
listener.start = make(chan events.Event, 1)
listener.stop = make(chan events.Event, 1)
listener.data = make(chan events.Event)
listener.errors = make(chan events.Event)
return listener
}
func (listener *Listener) Listen() error {
if listener.params.Queue.Name == "" {
return errors.Errorf("empty queue name")
}
listener.
On(ListenStart, events.Stream(listener.start)).
On(ListenStop, events.Stream(listener.stop)).
On(ListenError, events.Stream(listener.errors))
defer listener.RemoveEventListener(events.Stream(listener.start))
defer listener.RemoveEventListener(events.Stream(listener.stop))
defer listener.RemoveEventListener(events.Stream(listener.errors))
consumer, err := listener.restart(nil)
if err != nil {
return err
}
for {
select {
case event := <-listener.data:
if listener.params.Event != "" {
listener.Fire(events.New(listener.params.Event, events.WithContext(event.Context)))
} else {
listener.Fire(events.New(event.Context["key"], events.WithContext(event.Context)))
}
case <-listener.errors:
consumer, err = listener.restart(consumer)
if err != nil {
return err
}
case <-listener.start:
go listener.listen(consumer)
case <-listener.stop:
return listener.shutdown(consumer)
}
}
}
func (listener *Listener) Stop() {
listener.Fire(ListenStop)
}
func (listener *Listener) listen(consumer *Consumer) {
if err := consumer.Start(); err != nil {
listener.Fire(events.New(ListenError, events.WithContext(events.Map{"error": errors.Wrap(err, "consume failed")})))
}
}
func (listener *Listener) shutdown(consumer *Consumer) error {
if consumer != nil {
consumer.RemoveEventListener(events.Stream(listener.start))
consumer.RemoveEventListener(events.Stream(listener.errors))
consumer.RemoveEventListener(events.Stream(listener.data))
if err := consumer.Stop(); err != nil {
return err
}
return consumer.Close()
}
return nil
}
func (listener *Listener) restart(consumer *Consumer) (*Consumer, error) {
listener.shutdown(consumer)
connect := func(attempt uint) (err error) {
consumer, err = listener.driver.Consumer(listener.params.Queue)
return
}
if err := retry.Retry(connect, strategy.Limit(listener.params.Retries), strategy.Wait(listener.params.Pause)); err != nil {
err = errors.Wrapf(err, "connect to AMQP queue '%s' failed after %d retries", listener.params.Queue.Name, listener.params.Retries)
if listener.params.FailFast {
return nil, err
}
listener.Fire(events.New(ListenError, events.WithContext(events.Map{"error": err})))
return nil, nil
}
consumer.
On(ConsumerCanceled, events.Stream(listener.start)).
On(ConsumerClosed, events.Stream(listener.errors)).
On(ConsumerData, events.Stream(listener.data))
consumer.QoS = listener.params.QoS
listener.Fire(ListenStart)
return consumer, nil
}