-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
89 lines (80 loc) · 2.41 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package rmq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
type Producer struct {
Producer rocketmq.Producer
namespace string
groupName string
serverName string
conf *RocketMQConfig
ops []producer.Option
}
func NewProducer(conf *RocketMQConfig) (p *Producer) {
ops := defaultProducerOps(conf)
if len(conf.ProducerOptions) > 0 {
ops = append(ops, conf.ProducerOptions...)
}
p = &Producer{
Producer: nil,
namespace: conf.Namespace,
groupName: conf.GroupName,
serverName: conf.EndPoint,
conf: conf,
ops: ops,
}
return p
}
// Conn connect to aliyun rocketmq
func (p *Producer) Conn() (conn *Producer, err error) {
if p.conf.LogLevel != "" {
rlog.SetLogLevel(string(p.conf.LogLevel))
}
defaultProducer, err := producer.NewDefaultProducer(p.ops...)
if err != nil {
return nil, err
}
p.Producer = defaultProducer
primitive.PanicHandler = p.defaultPanicHandler
if err = p.Producer.Start(); err != nil {
return nil, err
}
return p, nil
}
func (p *Producer) Close() {
if p.Producer != nil {
_ = p.Producer.Shutdown()
}
}
// SendSyncSingle 同步单条消息发送,对应消费 topic 的 MessageBatchMaxSize = 1时用
func (p *Producer) SendSyncSingle(c context.Context, message *primitive.Message) (result *primitive.SendResult, err error) {
if p.Producer == nil {
return nil, fmt.Errorf("[%s] is nil", p.serverName)
}
return p.Producer.SendSync(context.WithoutCancel(c), message)
}
// SendAsyncSingle 异步单条消息发送,对应消费 topic 的 MessageBatchMaxSize = 1时用
func (p *Producer) SendAsyncSingle(c context.Context, callback func(ctx context.Context, result *primitive.SendResult, err error), message *primitive.Message) (err error) {
if p.Producer == nil {
return fmt.Errorf("[%s] is nil", p.serverName)
}
if callback == nil {
callback = func(ctx context.Context, result *primitive.SendResult, err error) {}
}
err = p.Producer.SendAsync(context.WithoutCancel(c), callback, message)
if err != nil {
return err
}
return nil
}
func (p *Producer) SendOneWaySingle(c context.Context, message *primitive.Message) (err error) {
if p.Producer == nil {
return fmt.Errorf("[%s] is nil", p.serverName)
}
return p.Producer.SendOneWay(context.WithoutCancel(c), message)
}