-
Notifications
You must be signed in to change notification settings - Fork 5
/
input.go
107 lines (93 loc) · 2.52 KB
/
input.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package heka_redis
import (
"errors"
"fmt"
"github.com/garyburd/redigo/redis"
"github.com/mozilla-services/heka/pipeline"
"time"
)
type RedisPubSubInputConfig struct {
Address string `toml:"address"`
Channel string `toml:"channel"`
DecoderName string `toml:"decoder"`
}
type RedisPubSubInput struct {
conf *RedisPubSubInputConfig
conn redis.Conn
}
func (rpsi *RedisPubSubInput) ConfigStruct() interface{} {
return &RedisPubSubInputConfig{":6379", "*", ""}
}
func (rpsi *RedisPubSubInput) Init(config interface{}) error {
rpsi.conf = config.(*RedisPubSubInputConfig)
var err error
rpsi.conn, err = redis.Dial("tcp", rpsi.conf.Address)
if err != nil {
return fmt.Errorf("connecting to - %s", err.Error())
}
return nil
}
func (rpsi *RedisPubSubInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) error {
var (
dRunner pipeline.DecoderRunner
decoder pipeline.Decoder
pack *pipeline.PipelinePack
e error
ok bool
)
// Get the InputRunner's chan to receive empty PipelinePacks
packSupply := ir.InChan()
if rpsi.conf.DecoderName != "" {
if dRunner, ok = h.DecoderRunner(rpsi.conf.DecoderName, fmt.Sprintf("%s-%s", ir.Name(), rpsi.conf.DecoderName)); !ok {
return fmt.Errorf("Decoder not found: %s", rpsi.conf.DecoderName)
}
decoder = dRunner.Decoder()
}
//Connect to the channel
psc := redis.PubSubConn{Conn: rpsi.conn}
psc.PSubscribe(rpsi.conf.Channel)
for {
switch n := psc.Receive().(type) {
case redis.PMessage:
// Grab an empty PipelinePack from the InputRunner
pack = <-packSupply
pack.Message.SetType("redis_pub_sub")
pack.Message.SetLogger(n.Channel)
pack.Message.SetPayload(string(n.Data))
pack.Message.SetTimestamp(time.Now().UnixNano())
var packs []*pipeline.PipelinePack
if decoder == nil {
packs = []*pipeline.PipelinePack{pack}
} else {
packs, e = decoder.Decode(pack)
}
if packs != nil {
for _, p := range packs {
ir.Inject(p)
}
} else {
if e != nil {
ir.LogError(fmt.Errorf("Couldn't parse Redis message: %s", n.Data))
}
pack.Recycle(nil)
}
case redis.Subscription:
ir.LogMessage(fmt.Sprintf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count))
if n.Count == 0 {
return errors.New("No channel to subscribe")
}
case error:
ir.LogError(fmt.Errorf("error: %v\n", n))
return n
}
}
return nil
}
func (rpsi *RedisPubSubInput) Stop() {
rpsi.conn.Close()
}
func init() {
pipeline.RegisterPlugin("RedisPubSubInput", func() interface{} {
return new(RedisPubSubInput)
})
}