-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
95 lines (78 loc) · 2.24 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package amqp
import (
"github.com/pkg/errors"
"github.com/streadway/amqp"
"gopkg.in/gopaws/go.events.v2"
)
// Consumer consume messages from message broker
type Consumer struct {
*events.Emitter
QoS int
queue Queue
tag string
channel *amqp.Channel
fail chan error
done chan struct{}
}
// Start consuming messages
func (consumer *Consumer) Start() error {
consumer.done = make(chan struct{})
consumer.fail = make(chan error)
defer close(consumer.fail)
if err := consumer.channel.Qos(consumer.QoS, 0, false); err != nil {
return err
}
deliveries, err := consumer.channel.Consume(
consumer.queue.Name, // name
consumer.tag, // consumerTag
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return err
}
go consumer.listenChannel()
go consumer.listenDeliveries(deliveries)
return <-consumer.fail
}
// Stop consumimg messages
func (consumer *Consumer) Stop() error {
select {
case <-consumer.done:
return nil
default:
// manual call to channel.Cancel does not fire NotifyCancel, it just close deliveries chan
if err := consumer.channel.Cancel(consumer.tag, false); err != nil {
return errors.Wrap(err, "channel cancel failed")
}
return nil
}
}
func (consumer *Consumer) Close() error {
return consumer.channel.Close()
}
func (consumer *Consumer) listenDeliveries(deliveries <-chan amqp.Delivery) {
defer close(consumer.done)
for delivery := range deliveries {
consumer.Fire(events.New(ConsumerData, events.WithContext(events.Map{
"key": delivery.RoutingKey,
"data": Message{delivery},
"queue": consumer.queue,
})))
}
}
func (consumer *Consumer) listenChannel() {
select {
case reason := <-consumer.channel.NotifyCancel(make(chan string)):
consumer.Fire(events.New(ConsumerCanceled, events.WithContext(events.Map{"consumer": consumer})))
consumer.fail <- errors.Errorf("channel canceled: %s", reason)
case err := <-consumer.channel.NotifyClose(make(chan *amqp.Error)):
consumer.Fire(events.New(ConsumerClosed, events.WithContext(events.Map{"consumer": consumer, "error": err})))
consumer.fail <- err
case <-consumer.done:
consumer.fail <- nil
}
}