Skip to content

Commit

Permalink
proto parser support (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
anbarasantr authored and devdinu committed Nov 13, 2019
1 parent 9154d21 commit 925409f
Show file tree
Hide file tree
Showing 26 changed files with 661 additions and 95 deletions.
53 changes: 30 additions & 23 deletions callback/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package callback
import (
"time"

"github.com/gojekfarm/kafqa/creator"
"github.com/gojekfarm/kafqa/serde"

"github.com/gojekfarm/kafqa/logger"
"github.com/gojekfarm/kafqa/reporter"
"github.com/gojekfarm/kafqa/reporter/metrics"
Expand All @@ -13,20 +14,15 @@ import (

type Callback func(*kafka.Message)

func Display(msg *kafka.Message) {
message, _ := creator.FromBytes(msg.Value)
logger.Debugf("received message on %s: message: %s\n", msg.TopicPartition, message)
}

type acknowledger interface {
Acknowledge(store.Trace) error
}

func Acker(ack acknowledger) Callback {
func Acker(ack acknowledger, decoder serde.Decoder) Callback {
return func(msg *kafka.Message) {
message, err := creator.FromBytes(msg.Value)
message, err := decoder.FromBytes(msg.Value)
if err != nil {
logger.Debugf("Unable to decode message during consumer ack")
logger.Errorf("Unable to decode message during consumer ack %s", err.Error())
} else {
err := ack.Acknowledge(store.Trace{Message: message, TopicPartition: msg.TopicPartition})
if err != nil {
Expand All @@ -38,22 +34,33 @@ func Acker(ack acknowledger) Callback {
}
}

func MessageSent(msg *kafka.Message) {
message, err := creator.FromBytes(msg.Value)
if err != nil {
logger.Debugf("Unable to decode message during message sent callback")
} else {
metrics.SentMessage(message)
metrics.ProduceLatency(time.Since(message.CreatedTime))
func Reporter(decoder serde.Decoder) Callback {
return func(msg *kafka.Message) {
message, err := decoder.FromBytes(msg.Value)
if err != nil {
logger.Debugf("Unable to decode message during message sent callback")
} else {
metrics.SentMessage(message)
metrics.ProduceLatency(time.Since(message.CreatedTime))
}
}
}

func LatencyTracker(decoder serde.Decoder) Callback {
return func(msg *kafka.Message) {
message, err := decoder.FromBytes(msg.Value)
if err != nil {
logger.Debugf("Unable to decode message during consumer ack")
return
}
latency := time.Since(message.CreatedTime)
reporter.ConsumptionDelay(latency)
}
}

func LatencyTracker(msg *kafka.Message) {
message, err := creator.FromBytes(msg.Value)
if err != nil {
logger.Debugf("Unable to decode message during consumer ack")
return
func Display(decoder serde.Decoder) Callback {
return func(msg *kafka.Message) {
message, _ := decoder.FromBytes(msg.Value)
logger.Debugf("received message on %s: message: %s\n", msg.TopicPartition, message.String())
}
latency := time.Since(message.CreatedTime)
reporter.ConsumptionDelay(latency)
}
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Application struct {
Store
LibrdConfigs
Jaeger
ProtoParser
}

type Config struct {
Expand Down Expand Up @@ -99,6 +100,13 @@ type Jaeger struct {
SamplerParam float64 `split_words:"true" default:"1"`
}

type ProtoParser struct {
Enabled bool `default:"false"`
FilePath string `split_words:"true"`
MessageName string `split_words:"true"`
TimestampIndex int `split_words:"true"`
}

func (p Prometheus) BindPort() string {
return fmt.Sprintf("0.0.0.0:%d", p.Port)
}
Expand Down
17 changes: 9 additions & 8 deletions config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ func Load() error {
}

configs := map[string]interface{}{
"STORE": &application.Store,
"PRODUCER": &application.Producer,
"CONSUMER": &application.Consumer,
"LIBRD": &librdConfigs,
"APP": &application.Config,
"PROMETHEUS": &application.Reporter.Prometheus,
"STATSD": &application.Reporter.Statsd,
"JAEGER": &application.Jaeger,
"STORE": &application.Store,
"PRODUCER": &application.Producer,
"CONSUMER": &application.Consumer,
"LIBRD": &librdConfigs,
"APP": &application.Config,
"PROMETHEUS": &application.Reporter.Prometheus,
"STATSD": &application.Reporter.Statsd,
"JAEGER": &application.Jaeger,
"PROTO_PARSER": &application.ProtoParser,
}
if err := loadConfigs(configs); err != nil {
return err
Expand Down
13 changes: 12 additions & 1 deletion creator/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Creator struct {
index uint64
}

func (c *Creator) NewMessage() Message {
func (c *Creator) NewMessageWithFakeData() Message {
c.index++
id := uuid.NewV4()
return Message{
Expand All @@ -22,6 +22,17 @@ func (c *Creator) NewMessage() Message {
}
}

func (c *Creator) NewMessage(data []byte, createdTime time.Time) Message {
c.index++
id := uuid.NewV4()
return Message{
Sequence: c.index,
ID: id.String(),
CreatedTime: createdTime,
Data: data,
}
}

func New() *Creator {
return &Creator{}
}
38 changes: 28 additions & 10 deletions creator/creator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,39 @@ import (
"time"

"github.com/gojekfarm/kafqa/creator"
"github.com/gojekfarm/kafqa/serde"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
)

func TestNewBytesCreatesMessagesInSequence(t *testing.T) {
messageCreator := creator.New()
messageBytes, _ := messageCreator.NewMessage().Bytes()
message, err := creator.FromBytes(messageBytes)
parser := serde.KafqaParser{}
messageBytes, _ := parser.Bytes(messageCreator.NewMessageWithFakeData())
message, err := parser.FromBytes(messageBytes)
assert.NoError(t, err)
assert.Equal(t, uint64(1), message.Sequence)
messageBytes, _ = messageCreator.NewMessage().Bytes()
message, err = creator.FromBytes(messageBytes)
messageBytes, _ = parser.Bytes(messageCreator.NewMessageWithFakeData())
message, err = parser.FromBytes(messageBytes)
assert.NoError(t, err)
assert.Equal(t, uint64(2), message.Sequence)
}

func TestAddsUUIDV4AsID(t *testing.T) {
messageCreator := creator.New()
messageBytes, _ := messageCreator.NewMessage().Bytes()
message, err := creator.FromBytes(messageBytes)
parser := serde.KafqaParser{}
messageBytes, _ := parser.Bytes(messageCreator.NewMessageWithFakeData())
message, err := parser.FromBytes(messageBytes)
uid, err := uuid.FromString(message.ID)
assert.NoError(t, err)
assert.Equal(t, uuid.V4, uid.Version())
}

func TestAddsCreationTimeStamp(t *testing.T) {
messageCreator := creator.New()
messageBytes, _ := messageCreator.NewMessage().Bytes()
message, err := creator.FromBytes(messageBytes)
parser := serde.KafqaParser{}
messageBytes, _ := parser.Bytes(messageCreator.NewMessageWithFakeData())
message, err := parser.FromBytes(messageBytes)
assert.NoError(t, err)

createdSince := time.Since(message.CreatedTime)
Expand All @@ -43,8 +47,22 @@ func TestAddsCreationTimeStamp(t *testing.T) {

func TestAdds10ParasOfText(t *testing.T) {
messageCreator := creator.New()
messageBytes, _ := messageCreator.NewMessage().Bytes()
message, err := creator.FromBytes(messageBytes)
parser := serde.KafqaParser{}
messageBytes, _ := parser.Bytes(messageCreator.NewMessageWithFakeData())
message, err := parser.FromBytes(messageBytes)
assert.NoError(t, err)
assert.Equal(t, 10, len(strings.Split(string(message.Data), "\t")))
}

func TestAddsDataWhenTheDataIsPassed(t *testing.T) {
messageCreator := creator.New()
parser := serde.KafqaParser{}
testData := []byte("test")
testTime := time.Now()
messageBytes, _ := parser.Bytes(messageCreator.NewMessage(testData, testTime))
message, err := parser.FromBytes(messageBytes)
assert.NoError(t, err)
assert.Equal(t, true, testTime.Equal(message.CreatedTime))
assert.Equal(t, testData, message.Data)

}
23 changes: 0 additions & 23 deletions creator/message.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package creator

import (
"bytes"
"encoding/gob"
"fmt"
"time"
)
Expand All @@ -15,27 +13,6 @@ type Message struct {
Data []byte
}

// Bytes Serializes the message via gob encoder
func (m Message) Bytes() ([]byte, error) {
var messageBuffer bytes.Buffer
enc := gob.NewEncoder(&messageBuffer)
err := enc.Encode(m)
if err != nil {
return nil, err
}
return messageBuffer.Bytes(), nil
}

// FromBytes DeSerializes the bytes data to a message via gob decoder
func FromBytes(data []byte) (Message, error) {
var messageBuffer bytes.Buffer
messageBuffer.Write(data)
enc := gob.NewDecoder(&messageBuffer)
var m Message
err := enc.Decode(&m)
return m, err
}

func (m Message) String() string {
return fmt.Sprintf("ID: %s sequence: %d time: %s", m.ID, m.Sequence, m.CreatedTime.Format(time.RFC3339))
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ require (
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/corpix/uarand v0.1.0 // indirect
github.com/go-redis/redis v6.15.2+incompatible
github.com/gogo/protobuf v1.1.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/protobuf v1.3.1
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428
github.com/jhump/protoreflect v1.5.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/kr/pretty v0.1.0 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand All @@ -44,6 +47,8 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428 h1:Mo9W14pwbO9VfRe+ygqZ8dFbPpoIK1HFrG/zjTuQ+nc=
github.com/icrowley/fake v0.0.0-20180203215853-4178557ae428/go.mod h1:uhpZMVGznybq1itEKXj6RYw9I71qK4kH+OGMjRC4KEo=
github.com/jhump/protoreflect v1.5.0 h1:NgpVT+dX71c8hZnxHof2M7QDK7QtohIJ7DYycjnkyfc=
github.com/jhump/protoreflect v1.5.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
Expand Down Expand Up @@ -117,6 +122,7 @@ go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco=
Expand All @@ -132,6 +138,10 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
google.golang.org/genproto v0.0.0-20170818010345-ee236bd376b0 h1:ZvI3lsq5AIkr7axxmT3tfwFlJVRFLqe6Fp0W03+MJ38=
google.golang.org/genproto v0.0.0-20170818010345-ee236bd376b0/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.8.0 h1:HN69LlNA/SpyBIRxTfuU0QOntYfdeEeBWlVhRHRCOyw=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
10 changes: 8 additions & 2 deletions kafqa.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ export PRODUCER_ENABLED="true"
export APP_ENVIRONMENT="development"
export APP_DURATION_MS="500000"

export KAFKA_TOPIC="kafqa_test"
export KAFKA_TOPIC="test-kafqa"

export CONSUMER_GROUP_ID="kafqa_consumer_test_001"
export CONSUMER_ENABLED="true"
export CONSUMER_GROUP_ID="kafqa_consumer_test_1"
export CONSUMER_CONCURRENCY=1
export CONSUMER_POLL_TIMEOUT_MS="300"
export CONSUMER_KAFKA_BROKERS="localhost:9092"
Expand All @@ -21,6 +22,11 @@ export CONSUMER_ENABLE_AUTO_COMMIT="true"
export PROMETHEUS_ENABLED="true"
export PROMETHEUS_PORT="9090"

export PROTO_PARSER_ENABLED="false"
export PROTO_PARSER_MESSAGE_NAME="com.esb.userLocation.UserLocationLogMessage"
export PROTO_PARSER_FILE_PATH=scripts/proto/test.proto
export PROTO_PARSER_TIMESTAMP_INDEX=4

# Use redis store
# export STORE_TYPE="redis"
# export STORE_REDIS_HOST="127.0.0.1:6379"
Expand Down
Loading

0 comments on commit 925409f

Please sign in to comment.