-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.go
67 lines (55 loc) · 1.51 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package amqp
import (
"github.com/streadway/amqp"
"strconv"
)
// PublishOption опции, передаваемые при публикации сообщения
type PublishOption struct {
apply func(*publishParams)
}
type publishParams struct {
messageTTL int64
retryCount int64
}
// WithMessageTTL sets TTL to message
// parameter count in milliseconds
func WithMessageTTL(ttl int64) PublishOption {
return PublishOption{func(params *publishParams) {
params.messageTTL = ttl
}}
}
// WithRetryCount sets max number of retries
func WithRetryCount(count int64) PublishOption {
return PublishOption{func(params *publishParams) {
params.retryCount = count
}}
}
type Producer struct {
counter uint64
exchange Exchange
channel *amqp.Channel
}
func (producer *Producer) Publish(key string, payload []byte, options ...PublishOption) error {
params := publishParams{}
for _, option := range options {
option.apply(¶ms)
}
publishig := amqp.Publishing{
Headers: amqp.Table{"x-retry-count": params.retryCount},
ContentType: "application/json",
ContentEncoding: "",
Body: payload,
DeliveryMode: amqp.Persistent, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
}
if params.messageTTL > 0 {
publishig.Expiration = strconv.FormatInt(params.messageTTL, 10)
}
return producer.channel.Publish(
producer.exchange.Name, // publish to an exchange
key, // routing to 0 or more queues
false, // mandatory
false, // immediate
publishig,
)
}