-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.go
126 lines (96 loc) · 2.83 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package amqp
import (
"fmt"
"sync"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"gopkg.in/gopaws/go.events.v2"
)
type state int
const (
created state = iota
connecting
connected
failed
)
type Publisher struct {
sync.Mutex
*events.Emitter
params PublisherParams
driver *Driver
producer *Producer
state state
err error
buffer map[*bufferedMessage]string
}
type bufferedMessage struct {
options []PublishOption
payload []byte
}
func NewPublisher(driver *Driver, params PublisherParams) *Publisher {
publisher := new(Publisher)
publisher.Emitter = events.NewEmitter(events.WithEventStategy(PublishError, events.ParallelBroadcast))
publisher.params = params
publisher.driver = driver
publisher.buffer = make(map[*bufferedMessage]string)
publisher.On(ProducerConnected, events.Callback(func(event events.Event) {
publisher.Lock()
defer publisher.Unlock()
for message, key := range publisher.buffer {
if err := publisher.producer.Publish(key, message.payload, message.options...); err == nil {
delete(publisher.buffer, message)
} else {
publisher.state = connecting
go publisher.connect()
publisher.Fire(events.New(PublishError, events.WithContext(events.Map{"error": err.Error()})))
break
}
}
}))
return publisher
}
func (publisher *Publisher) Publish(key string, payload []byte, options ...PublishOption) error {
publisher.Lock()
defer publisher.Unlock()
message := &bufferedMessage{options, payload}
switch publisher.state {
case created:
publisher.state = connecting
go publisher.connect()
publisher.buffer[message] = key
case connecting:
publisher.buffer[message] = key
case connected:
if err := publisher.producer.Publish(key, payload, options...); err != nil {
publisher.state = connecting
go publisher.connect()
publisher.Fire(events.New(PublishError, events.WithContext(events.Map{"error": err.Error()})))
publisher.buffer[message] = key
}
case failed:
return publisher.err
}
return nil // for backward with Producer interface
}
func (publisher *Publisher) connect() {
var producer *Producer
connect := func(attempt uint) (err error) {
producer, err = publisher.driver.Producer(publisher.params.Exchange)
return
}
if err := retry.Retry(connect, strategy.Limit(publisher.params.Retries), strategy.Wait(publisher.params.Pause)); err != nil {
publisher.Lock()
publisher.state = failed
publisher.err = err
publisher.Unlock()
publisher.Fire(events.New(PublisherFailed, events.WithContext(events.Map{
"error": fmt.Sprintf("connect to AMQP failed after %d retries: %s", publisher.params.Retries, err),
})))
return
}
publisher.Lock()
publisher.state = connected
publisher.producer = producer
publisher.Unlock()
publisher.Fire(events.New(ProducerConnected, events.WithContext(events.Map{"producer": producer})))
}