-
Notifications
You must be signed in to change notification settings - Fork 6
/
kafka.go
61 lines (50 loc) · 1.91 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
import (
"github.com/Shopify/sarama"
"encoding/json"
"github.com/Sirupsen/logrus"
_ "strings"
"errors"
)
// Create a Sarama Kafka client with the broker list
// Will pass back the client and any errors
func CreateClient(brokerList []string, partitionStrategy PartitionStrategy) (sarama.Client, error){
conf := sarama.NewConfig()
switch partitionStrategy {
case PARTITION_KEY_HASH:
conf.Producer.Partitioner = sarama.NewHashPartitioner
case PARTITION_ROUND_ROBIN:
conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
default:
err := errors.New("Unknown partition strategy: " + string(partitionStrategy))
return nil, err
}
return sarama.NewClient(brokerList, conf)
}
// Create a producer from a Sarama Kafka client
func CreateProducer(client *sarama.Client) (sarama.AsyncProducer, error){
return sarama.NewAsyncProducerFromClient(*client)
}
// Converts a log message into JSON and writes it to the producer, ready to be written to the broker
// Returns an error if any occurred.
func WriteMessage(topic string, msg LogMessage, containerId string, keyStrategy KeyStrategy,producer sarama.AsyncProducer) error {
asJson, err := json.Marshal(msg)
if err != nil {
logrus.WithField("id", containerId).WithError(err).WithField("message", msg).Error("error converting log message to json")
return err
}
key := keyBy(msg, containerId, keyStrategy)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(asJson), Timestamp: msg.Timestamp}
return nil
}
func keyBy(msg LogMessage, containerId string, strategy KeyStrategy) string {
switch strategy {
case KEY_BY_CONTAINER_ID:
return containerId
case KEY_BY_TIMESTAMP:
return string(msg.Timestamp.Unix())
default:
logrus.WithField("keyStrategy", strategy).Error("Unknown key strategy. Defaulting to KEY_BY_CONTAINER_ID")
return keyBy(msg, containerId, KEY_BY_CONTAINER_ID)
}
}