-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.go
104 lines (90 loc) · 2.58 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"encoding/json"
"flag"
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/jmespath/go-jmespath"
"github.com/robbles/kinesiscat/worker"
)
var (
debug bool
region string
streamName string
position string
outputFormat string
separator string
nullSeparator bool
batchSize int64
sleepTime int64
jsonFilter string
)
func main() {
flag.BoolVar(&debug, "debug", false, "Enable debug logging")
flag.StringVar(®ion, "region", "us-west-1", "AWS region")
flag.StringVar(&streamName, "stream-name", "events", "Kinesis stream name")
flag.StringVar(&position, "position", "LATEST", "Position in stream")
flag.StringVar(&outputFormat, "format", "data", "What to output for each record: sequence, partition-key, or data")
flag.StringVar(&separator, "separator", "\n", "Separator to output between records")
flag.BoolVar(&nullSeparator, "0", false, "Use NULL character as the separator")
flag.Int64Var(&batchSize, "batch-size", 1, "How many records to fetch in each call")
flag.Int64Var(&sleepTime, "sleep-time", 1000, "How long to sleep between calls (ms)")
flag.StringVar(&jsonFilter, "filter", "", "A JMESPath filter to apply to each message")
flag.Parse()
if debug {
log.SetLevel(log.DebugLevel)
kinesis_worker.Logger.Level = log.DebugLevel
}
if nullSeparator {
separator = "\x00"
}
worker := kinesis_worker.StreamWorker{
AwsConfig: &aws.Config{Region: aws.String(region)},
StreamName: streamName,
IteratorType: position,
BatchSize: batchSize,
SleepTime: time.Duration(sleepTime) * time.Millisecond,
}
if err := worker.Start(); err != nil {
log.Panicln(err)
}
for record := range worker.Output {
outputRecord(record, outputFormat)
}
}
const (
SEQUENCE = "sequence"
PARTITION_KEY = "partition-key"
DATA = "data"
)
func outputRecord(record *kinesis.Record, format string) {
switch format {
case DATA:
outputData(record.Data)
case PARTITION_KEY:
fmt.Println(record.PartitionKey)
case SEQUENCE:
fmt.Println(record.SequenceNumber)
}
}
func outputData(data []byte) {
var output []byte = data
if jsonFilter != "" {
var obj interface{}
json.Unmarshal(data, &obj)
result, err := jmespath.Search(jsonFilter, obj)
if err != nil {
log.Errorf("Error executing expression: %s", err)
}
toJSON, err := json.MarshalIndent(result, "", " ")
if err != nil {
log.Errorf("Error serializing result to JSON: %s", err)
}
output = toJSON
return
}
fmt.Printf("%s%s", output, separator)
}