-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathdeliver.go
131 lines (114 loc) · 2.74 KB
/
deliver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package main
import (
"github.com/Shopify/sarama"
//"net"
//"time"
"log"
)
type Deliver interface {
Store() *Store
Start() error
Stop() error
}
type KafkaDeliver struct{
store *Store
clientId string
brokerList []string
clientConfig *sarama.ClientConfig
client *sarama.Client
producerConfig *sarama.ProducerConfig
producer *sarama.Producer
deliverGoroutines int
shutdownDeliver chan bool
shutdown chan bool
}
func NewKafkaDeliver(store *Store, clientId string, brokerList []string) (*KafkaDeliver, error) {
log.Println("go=kafka at=new-kafka-deliver")
clientConfig := sarama.NewClientConfig()
producerConfig := sarama.NewProducerConfig()
client, err := sarama.NewClient(clientId, brokerList, clientConfig)
if err != nil {
return nil, err
}
log.Println("go=kafka at=created-client")
producer, err := sarama.NewProducer(client, producerConfig)
if err != nil {
return nil, err
}
log.Println("go=kafka at=created-producer")
return &KafkaDeliver{
clientId: clientId,
brokerList: brokerList,
store: store,
producer: producer,
producerConfig: producerConfig,
client: client,
clientConfig: clientConfig,
deliverGoroutines: 8,
shutdownDeliver: make(chan bool, 8),
shutdown: make(chan bool, 8),
}, nil
}
func (k *KafkaDeliver) Store() *Store {
return k.store
}
func (k *KafkaDeliver) Start() error {
for i := 0; i < k.deliverGoroutines; i++ {
go k.deliverEvents(i)
}
return nil
}
func (k *KafkaDeliver) deliverEvents(num int) {
for{
select {
case <- k.shutdownDeliver:
k.shutdown <- true
return
case event, ok := <-k.store.eventsOut:
if ok {
err := k.producer.SendMessage(event.event.Channel, nil, sarama.ByteEncoder(event.event.Body))
if err != nil {
log.Printf("go=deliver num=%d at=send-error error=%v", num, err)
noAckEvent(k.store, event.sequence)
} else {
ackEvent(k.store, event.sequence)
}
}
}
}
}
func ackEvent(store *Store, seq int64) {
defer func(){
if r := recover(); r != nil {
log.Println("at=recover-ack-panic")
}
}()
// the store owns the ack channel and can close it on shutdown
// so we wrap this call which can panic in a recover
ack(store, seq)
}
func noAckEvent(store *Store, seq int64) {
defer func(){
if r := recover(); r != nil {
log.Println("at=recover-noack-panic")
}
}()
// the store owns the noAck channel and can close it on shutdown
// so we wrap this call which can panic in a recover
noAck(store, seq)
}
func ack(store *Store, seq int64) {
store.eventsDelivered <- seq
}
func noAck(store *Store, seq int64) {
store.eventsFailed <- seq
}
func (k *KafkaDeliver) Stop() error {
for i := 0; i < k.deliverGoroutines; i++ {
k.shutdownDeliver <- true
}
for i := 0; i < k.deliverGoroutines; i++ {
<-k.shutdown
}
return nil
}