Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nil handling options #45

Merged
merged 1 commit into from
Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type poptions struct {
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32
nilHandling NilHandling

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -226,6 +227,25 @@ func WithHasher(hasher func() hash.Hash32) ProcessorOption {
}
}

type NilHandling int

const (
// NilIgnore drops any message with nil value.
NilIgnore NilHandling = 0 + iota
// NilProcess passes the nil value to ProcessCallback.
NilProcess
// NilDecode passes the nil value to decoder before calling ProcessCallback.
NilDecode
)

// WithNilHandling configures how the processor should handle messages with nil
// value. By default the processor ignores nil messages.
func WithNilHandling(nh NilHandling) ProcessorOption {
return func(o *poptions) {
o.nilHandling = nh
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
37 changes: 23 additions & 14 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -682,24 +681,34 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup
ctx.storage = st
}

// get stream subcription
codec := g.graph.codec(msg.Topic)
if codec == nil {
return fmt.Errorf("cannot handle topic %s", msg.Topic)
}
var (
m interface{}
err error
)

// drop nil messages
if msg.Data == nil {
log.Printf("dropping nil message for key %s from %s/%d", msg.Key, msg.Topic, msg.Partition)
// decide whether to decode or ignore message
switch {
case msg.Data == nil && g.opts.nilHandling == NilIgnore:
// drop nil messages
return nil
}
case msg.Data == nil && g.opts.nilHandling == NilProcess:
// process nil messages without decoding them
m = nil
default:
// get stream subcription
codec := g.graph.codec(msg.Topic)
if codec == nil {
return fmt.Errorf("cannot handle topic %s", msg.Topic)
}

// decode message
m, err := codec.Decode(msg.Data)
if err != nil {
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
// decode message
m, err = codec.Decode(msg.Data)
if err != nil {
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
}
}

// start context and call ProcessorCallback
ctx.start()
defer ctx.finish() // execute even in case of panic
cb := g.graph.callback(msg.Topic)
Expand Down
217 changes: 213 additions & 4 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu
return p
}

func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor {
func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor {
tm := mock.NewMockTopicManager(ctrl)
producer := mock.NewMockProducer(ctrl)

Expand All @@ -124,7 +124,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer,
tm.EXPECT().EnsureStreamExists(loopName(group), len(partitions)).Return(nil)
tm.EXPECT().EnsureTableExists(tableName(group), len(partitions)).Return(nil)
tm.EXPECT().Close().Return(nil)
p, _ := NewProcessor(nil,
p, err := NewProcessor(nil,
DefineGroup(group,
Input(topic, rawCodec, cb),
Input(topic2, rawCodec, cb),
Expand All @@ -138,6 +138,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer,
WithStorageBuilder(sb),
WithPartitionChannelSize(0),
)
ensure.Nil(t, err)
return p
}

Expand Down Expand Up @@ -572,7 +573,6 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
// 1. start
gomock.InOrder(
Expand Down Expand Up @@ -638,6 +638,132 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
ensure.Nil(t, err)
}

// start processor with table and receives an error from Kafka in the events
// channel after rebalance.
func TestProcessor_StartWithTableWithErrorAfterRebalance(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessorWithTable(t, ctrl, consumer, 3, sb)
value = []byte("value")
blockit = make(chan bool)
unblocked = make(chan bool)
)
p.graph.callbacks[topic] = func(ctx Context, msg interface{}) {
fmt.Println("hallodfads", msg)
defer close(unblocked)
<-blockit
fmt.Println("unblocked")
}

// -- expectations --
// 1. start
gomock.InOrder(
consumer.EXPECT().Subscribe(topOff).Return(nil),
consumer.EXPECT().Events().Return(ch),
)
// 2. rebalance
st.EXPECT().Open().Times(6)
st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil).Times(6)
consumer.EXPECT().AddPartition(tableName(group), int32(0), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(1), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
consumer.EXPECT().AddPartition(table, int32(0), int64(123))
consumer.EXPECT().AddPartition(table, int32(1), int64(123))
consumer.EXPECT().AddPartition(table, int32(2), int64(123))
// 3. EOF messages
st.EXPECT().MarkRecovered().Times(3)
// 4. messages
consumer.EXPECT().Commit(topic, int32(1), int64(2))
// st.EXPECT().SetEncoded("key", value).Return(nil)
// st.EXPECT().SetOffset(int64(1))
st.EXPECT().Sync()
// 5. error
consumer.EXPECT().Close().Do(func() { close(ch) })
consumer.EXPECT().RemovePartition(tableName(group), int32(0))
consumer.EXPECT().RemovePartition(tableName(group), int32(1))
consumer.EXPECT().RemovePartition(tableName(group), int32(2))
consumer.EXPECT().RemovePartition(table, int32(0))
consumer.EXPECT().RemovePartition(table, int32(1))
consumer.EXPECT().RemovePartition(table, int32(2))
st.EXPECT().Sync().Times(6)
st.EXPECT().Close().Times(6)
p.producer.(*mock.MockProducer).EXPECT().Close()

// -- test --
// 1. start
go func() {
err := p.Start()
ensure.NotNil(t, err)
close(final)
}()

// 2. rebalance
ensure.True(t, len(p.partitions) == 0)
ensure.True(t, len(p.partitionViews) == 0)
ch <- (*kafka.Assignment)(&map[int32]int64{0: -1, 1: -1, 2: -1})
err = syncWith(t, ch, -1) // with processor
ensure.Nil(t, err)
ensure.True(t, len(p.partitions) == 3)
ensure.True(t, len(p.partitionViews) == 3)

// 3. message
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 0,
}
err = syncWith(t, ch, 0) // with partition
ensure.Nil(t, err)
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 1,
}
err = syncWith(t, ch, 1) // with partition
ensure.Nil(t, err)
ch <- &kafka.EOF{
Topic: tableName(group),
Hwm: 0,
Partition: 2,
}
err = syncWith(t, ch, 2) // with partition
ensure.Nil(t, err)

// 4. heavy message
ch <- &kafka.Message{
Topic: topic,
Partition: 1,
Offset: 2,
Key: "key",
Value: value,
}
// dont wait for that

// 4. receive error
ch <- new(kafka.Error)

// sync with partition (should be unblocked)
close(blockit)
<-unblocked

// 5. stop
err = doTimed(t, func() {
<-final
p.Stop()
})
ensure.Nil(t, err)
}

func TestProcessor_Start(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -805,7 +931,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
}
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessorWithTable(ctrl, consumer, 3, sb)
p = createProcessorWithTable(t, ctrl, consumer, 3, sb)
producer = p.producer.(*mock.MockProducer)
value = []byte("value")
)
Expand Down Expand Up @@ -1268,6 +1394,89 @@ func TestProcessor_consumePanic(t *testing.T) {
ensure.True(t, strings.Contains(processorErrors.Error(), "panicking"))
}

type nilValue struct{}
type nilCodec struct{}

func (nc *nilCodec) Decode(data []byte) (interface{}, error) {
if data == nil {
return new(nilValue), nil
}
return data, nil
}
func (nc *nilCodec) Encode(val interface{}) ([]byte, error) {
return nil, nil
}

func TestProcessor_consumeNil(t *testing.T) {

tests := []struct {
name string
cb ProcessCallback
handling NilHandling
codec Codec
}{
{
"ignore",
func(ctx Context, msg interface{}) {
t.Error("should never call consume")
t.Fail()
},
NilIgnore,
new(codec.String),
},
{
"process",
func(ctx Context, msg interface{}) {
if msg != nil {
t.Errorf("message should be nil:%v", msg)
t.Fail()
}
},
NilProcess,
new(codec.String),
},
{
"decode",
func(ctx Context, msg interface{}) {
if _, ok := msg.(*nilValue); !ok {
t.Errorf("message should be a decoded nil value: %T", msg)
t.Fail()
}
},
NilDecode,
new(nilCodec),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
km := NewKafkaMock(t, "test")
proc, err := NewProcessor([]string{"broker"},
DefineGroup("test",
Input("topic", tc.codec, tc.cb),
),
append(km.ProcessorOptions(), WithNilHandling(tc.handling))...,
)

ensure.Nil(t, err)
var (
processorErrors error
done = make(chan struct{})
)
go func() {
processorErrors = proc.Start()
close(done)
}()

km.Consume("topic", "key", nil)

proc.Stop()
<-done
ensure.Nil(t, processorErrors)
})
}
}

func TestProcessor_failOnRecover(t *testing.T) {
var (
recovered int
Expand Down