-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
181 lines (170 loc) · 5.04 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package rmq
import (
"context"
"fmt"
"sync"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/go-pay/limiter"
"github.com/go-pay/xlog"
)
type Consumer struct {
Consumer rocketmq.PushConsumer
namespace string
groupName string
serverName string
messageBatchMaxSize int // default 1
subscribeTopic map[string]struct{}
conf *RocketMQConfig
limiter *limiter.RateLimiter
ops []consumer.Option
mu sync.RWMutex
}
// new
func NewConsumer(conf *RocketMQConfig) (c *Consumer) {
ops := defaultConsumerOps(conf)
if len(conf.ConsumerOptions) > 0 {
ops = append(ops, conf.ConsumerOptions...)
}
c = &Consumer{
Consumer: nil,
namespace: conf.Namespace,
groupName: conf.GroupName,
serverName: conf.EndPoint,
subscribeTopic: make(map[string]struct{}),
conf: conf,
ops: ops,
}
if conf.Limit != nil && conf.Limit.Rate != 0 {
c.limiter = limiter.NewLimiter(conf.Limit)
}
return c
}
// Conn connect to aliyun rocketmq
func (c *Consumer) Conn() (conn *Consumer, err error) {
if c.conf.LogLevel != "" {
rlog.SetLogLevel(string(c.conf.LogLevel))
}
if c.messageBatchMaxSize == 0 {
c.messageBatchMaxSize = 1
c.ops = append(c.ops, consumer.WithConsumeMessageBatchMaxSize(1))
}
newPushConsumer, err := consumer.NewPushConsumer(c.ops...)
if err != nil {
return nil, err
}
c.Consumer = newPushConsumer
return c, nil
}
// Start start subscribe
func (c *Consumer) Start() (err error) {
xlog.Warnf("count [%d] start subscribe", len(c.subscribeTopic))
primitive.PanicHandler = c.defaultPanicHandler
return c.Consumer.Start()
}
// Close unsubscribe all topic
func (c *Consumer) Close() {
if c.Consumer != nil && len(c.subscribeTopic) > 0 {
for topic := range c.subscribeTopic {
_ = c.Consumer.Unsubscribe(topic)
delete(c.subscribeTopic, topic)
}
_ = c.Consumer.Shutdown()
}
}
// TopicList get topic list
func (c *Consumer) TopicList() (ts []string) {
for topic := range c.subscribeTopic {
ts = append(ts, topic)
}
return
}
// SubscribeSingle 单条消息消费 default
func (c *Consumer) SubscribeSingle(topic, expression string, callback func(ctx context.Context, ext *primitive.MessageExt) error) (err error) {
if c.Consumer == nil {
return fmt.Errorf("[%s] is nil", c.serverName)
}
selector := consumer.MessageSelector{Type: consumer.TAG, Expression: expression}
err = c.Consumer.Subscribe(topic, selector, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// 单条消息
if c.messageBatchMaxSize == 1 {
// 限流
if c.limiter != nil {
limitTopic := c.limiter.LimiterGroup.Get(topic)
if !limitTopic.Allow() {
// 超出速率,直接返回重试
return consumer.ConsumeRetryLater, fmt.Errorf("[%d] rate limiter, consume retry later", c.conf.Limit.Rate)
}
if err = callback(ctx, ext[0]); err != nil {
return consumer.ConsumeRetryLater, err
}
return consumer.ConsumeSuccess, nil
}
// 无限流
if err = callback(ctx, ext[0]); err != nil {
return consumer.ConsumeRetryLater, err
}
return consumer.ConsumeSuccess, nil
}
return consumer.ConsumeRetryLater, nil
})
if err != nil {
return err
}
c.mu.Lock()
c.subscribeTopic[topic] = struct{}{}
c.mu.Unlock()
return nil
}
// SubscribeMulti 多条消息消费,需配置 client.MessageBatchMaxSize() 且size不为 1,否则不生效
func (c *Consumer) SubscribeMulti(topic, expression string, callback func(ctx context.Context, ext ...*primitive.MessageExt) error) (err error) {
if c.Consumer == nil {
return fmt.Errorf("[%s] is nil", c.serverName)
}
selector := consumer.MessageSelector{Type: consumer.TAG, Expression: expression}
err = c.Consumer.Subscribe(topic, selector, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
// 多条消息
// 限流
if c.limiter != nil {
// 限流
if c.limiter != nil {
limitTopic := c.limiter.LimiterGroup.Get(topic)
if !limitTopic.Allow() {
// 超出速率,直接返回重试
return consumer.ConsumeRetryLater, fmt.Errorf("[%d] rate limiter, consume retry later", c.conf.Limit.Rate)
}
if err = callback(ctx, ext[0]); err != nil {
return consumer.ConsumeRetryLater, err
}
return consumer.ConsumeSuccess, nil
}
}
// 无限流
if err = callback(ctx, ext...); err != nil {
return consumer.ConsumeRetryLater, err
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
return err
}
c.mu.Lock()
c.subscribeTopic[topic] = struct{}{}
c.mu.Unlock()
return nil
}
// Unsubscribe unsubscribe one topic
func (c *Consumer) Unsubscribe(topic string) (err error) {
if c.Consumer == nil {
return fmt.Errorf("[%s] is nil", c.serverName)
}
if err = c.Consumer.Unsubscribe(topic); err != nil {
return err
}
c.mu.Lock()
delete(c.subscribeTopic, topic)
c.mu.Unlock()
return
}