Skip to content

Commit

Permalink
Merge pull request #45 from lovoo/consume-nil-options
Browse files Browse the repository at this point in the history
nil handling options
  • Loading branch information
db7 committed Oct 11, 2017
2 parents 5ab3dcc + 769cdda commit fc342d9
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 18 deletions.
20 changes: 20 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type poptions struct {
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32
nilHandling NilHandling

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -226,6 +227,25 @@ func WithHasher(hasher func() hash.Hash32) ProcessorOption {
}
}

type NilHandling int

const (
// NilIgnore drops any message with nil value.
NilIgnore NilHandling = 0 + iota
// NilProcess passes the nil value to ProcessCallback.
NilProcess
// NilDecode passes the nil value to decoder before calling ProcessCallback.
NilDecode
)

// WithNilHandling configures how the processor should handle messages with nil
// value. By default the processor ignores nil messages.
func WithNilHandling(nh NilHandling) ProcessorOption {
return func(o *poptions) {
o.nilHandling = nh
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
37 changes: 23 additions & 14 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -682,24 +681,34 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup
ctx.storage = st
}

// get stream subcription
codec := g.graph.codec(msg.Topic)
if codec == nil {
return fmt.Errorf("cannot handle topic %s", msg.Topic)
}
var (
m interface{}
err error
)

// drop nil messages
if msg.Data == nil {
log.Printf("dropping nil message for key %s from %s/%d", msg.Key, msg.Topic, msg.Partition)
// decide whether to decode or ignore message
switch {
case msg.Data == nil && g.opts.nilHandling == NilIgnore:
// drop nil messages
return nil
}
case msg.Data == nil && g.opts.nilHandling == NilProcess:
// process nil messages without decoding them
m = nil
default:
// get stream subcription
codec := g.graph.codec(msg.Topic)
if codec == nil {
return fmt.Errorf("cannot handle topic %s", msg.Topic)
}

// decode message
m, err := codec.Decode(msg.Data)
if err != nil {
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
// decode message
m, err = codec.Decode(msg.Data)
if err != nil {
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
}
}

// start context and call ProcessorCallback
ctx.start()
defer ctx.finish() // execute even in case of panic
cb := g.graph.callback(msg.Topic)
Expand Down
217 changes: 213 additions & 4 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu
return p
}

func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor {
func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor {
tm := mock.NewMockTopicManager(ctrl)
producer := mock.NewMockProducer(ctrl)

Expand All @@ -124,7 +124,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer,
tm.EXPECT().EnsureStreamExists(loopName(group), len(partitions)).Return(nil)
tm.EXPECT().EnsureTableExists(tableName(group), len(partitions)).Return(nil)
tm.EXPECT().Close().Return(nil)
p, _ := NewProcessor(nil,
p, err := NewProcessor(nil,
DefineGroup(group,
Input(topic, rawCodec, cb),
Input(topic2, rawCodec, cb),
Expand All @@ -138,6 +138,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer,
WithStorageBuilder(sb),
WithPartitionChannelSize(0),
)
ensure.Nil(t, err)
return p
}

Expand Down Expand Up @@ -572,7 +573,6 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
// 1. start
gomock.InOrder(
Expand Down Expand Up @@ -638,6 +638,132 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
ensure.Nil(t, err)
}

// start processor with table and receives an error from Kafka in the events
// channel after rebalance.
func TestProcessor_StartWithTableWithErrorAfterRebalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessorWithTable(t, ctrl, consumer, 3, sb)
value = []byte("value")
blockit = make(chan bool)
unblocked = make(chan bool)
)
p.graph.callbacks[topic] = func(ctx Context, msg interface{}) {
fmt.Println("hallodfads", msg)
defer close(unblocked)
<-blockit
fmt.Println("unblocked")
}

// -- expectations --
// 1. start
gomock.InOrder(
consumer.EXPECT().Subscribe(topOff).Return(nil),
consumer.EXPECT().Events().Return(ch),
)
// 2. rebalance
st.EXPECT().Open().Times(6)
st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil).Times(6)
consumer.EXPECT().AddPartition(tableName(group), int32(0), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(1), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
consumer.EXPECT().AddPartition(table, int32(0), int64(123))
consumer.EXPECT().AddPartition(table, int32(1), int64(123))
consumer.EXPECT().AddPartition(table, int32(2), int64(123))
// 3. EOF messages
st.EXPECT().MarkRecovered().Times(3)
// 4. messages
consumer.EXPECT().Commit(topic, int32(1), int64(2))
// st.EXPECT().SetEncoded("key", value).Return(nil)
// st.EXPECT().SetOffset(int64(1))
st.EXPECT().Sync()
// 5. error
consumer.EXPECT().Close().Do(func() { close(ch) })
consumer.EXPECT().RemovePartition(tableName(group), int32(0))
consumer.EXPECT().RemovePartition(tableName(group), int32(1))
consumer.EXPECT().RemovePartition(tableName(group), int32(2))
consumer.EXPECT().RemovePartition(table, int32(0))
consumer.EXPECT().RemovePartition(table, int32(1))
consumer.EXPECT().RemovePartition(table, int32(2))
st.EXPECT().Sync().Times(6)
st.EXPECT().Close().Times(6)
p.producer.(*mock.MockProducer).EXPECT().Close()

// -- test --
// 1. start
go func() {
err := p.Start()
ensure.NotNil(t, err)
close(final)
}()

// 2. rebalance
ensure.True(t, len(p.partitions) == 0)
ensure.True(t, len(p.partitionViews) == 0)
ch <- (*kafka.Assignment)(&map[int32]int64{0: -1, 1: -1, 2: -1})
err = syncWith(t, ch, -1) // with processor
ensure.Nil(t, err)
ensure.True(t, len(p.partitions) == 3)
ensure.True(t, len(p.partitionViews) == 3)

// 3. message
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 0,
}
err = syncWith(t, ch, 0) // with partition
ensure.Nil(t, err)
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 1,
}
err = syncWith(t, ch, 1) // with partition
ensure.Nil(t, err)
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 2,
}
err = syncWith(t, ch, 2) // with partition
ensure.Nil(t, err)

// 4. heavy message
ch <- &kafka.Message{
Topic: topic,
Partition: 1,
Offset: 2,
Key: "key",
Value: value,
}
// dont wait for that

// 4. receive error
ch <- new(kafka.Error)

// sync with partition (should be unblocked)
close(blockit)
<-unblocked

// 5. stop
err = doTimed(t, func() {
<-final
p.Stop()
})
ensure.Nil(t, err)
}

func TestProcessor_Start(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -805,7 +931,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
}
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessorWithTable(ctrl, consumer, 3, sb)
p = createProcessorWithTable(t, ctrl, consumer, 3, sb)
producer = p.producer.(*mock.MockProducer)
value = []byte("value")
)
Expand Down Expand Up @@ -1268,6 +1394,89 @@ func TestProcessor_consumePanic(t *testing.T) {
ensure.True(t, strings.Contains(processorErrors.Error(), "panicking"))
}

type nilValue struct{}
type nilCodec struct{}

func (nc *nilCodec) Decode(data []byte) (interface{}, error) {
if data == nil {
return new(nilValue), nil
}
return data, nil
}
func (nc *nilCodec) Encode(val interface{}) ([]byte, error) {
return nil, nil
}

func TestProcessor_consumeNil(t *testing.T) {

tests := []struct {
name string
cb ProcessCallback
handling NilHandling
codec Codec
}{
{
"ignore",
func(ctx Context, msg interface{}) {
t.Error("should never call consume")
t.Fail()
},
NilIgnore,
new(codec.String),
},
{
"process",
func(ctx Context, msg interface{}) {
if msg != nil {
t.Errorf("message should be nil:%v", msg)
t.Fail()
}
},
NilProcess,
new(codec.String),
},
{
"decode",
func(ctx Context, msg interface{}) {
if _, ok := msg.(*nilValue); !ok {
t.Errorf("message should be a decoded nil value: %T", msg)
t.Fail()
}
},
NilDecode,
new(nilCodec),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
km := NewKafkaMock(t, "test")
proc, err := NewProcessor([]string{"broker"},
DefineGroup("test",
Input("topic", tc.codec, tc.cb),
),
append(km.ProcessorOptions(), WithNilHandling(tc.handling))...,
)

ensure.Nil(t, err)
var (
processorErrors error
done = make(chan struct{})
)
go func() {
processorErrors = proc.Start()
close(done)
}()

km.Consume("topic", "key", nil)

proc.Stop()
<-done
ensure.Nil(t, processorErrors)
})
}
}

func TestProcessor_failOnRecover(t *testing.T) {
var (
recovered int
Expand Down

0 comments on commit fc342d9

Please sign in to comment.