diff --git a/options.go b/options.go index cf5ce7d4..8b2a613d 100644 --- a/options.go +++ b/options.go @@ -116,6 +116,7 @@ type poptions struct { registry metrics.Registry partitionChannelSize int hasher func() hash.Hash32 + nilHandling NilHandling builders struct { storage StorageBuilder @@ -226,6 +227,25 @@ func WithHasher(hasher func() hash.Hash32) ProcessorOption { } } +type NilHandling int + +const ( + // NilIgnore drops any message with nil value. + NilIgnore NilHandling = 0 + iota + // NilProcess passes the nil value to ProcessCallback. + NilProcess + // NilDecode passes the nil value to decoder before calling ProcessCallback. + NilDecode +) + +// WithNilHandling configures how the processor should handle messages with nil +// value. By default the processor ignores nil messages. +func WithNilHandling(nh NilHandling) ProcessorOption { + return func(o *poptions) { + o.nilHandling = nh + } +} + func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { opt.clientID = defaultClientID opt.log = logger.Default() diff --git a/processor.go b/processor.go index 0f335361..6b6e7082 100644 --- a/processor.go +++ b/processor.go @@ -3,7 +3,6 @@ package goka import ( "errors" "fmt" - "log" "runtime/debug" "sync" "time" @@ -682,24 +681,34 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup ctx.storage = st } - // get stream subcription - codec := g.graph.codec(msg.Topic) - if codec == nil { - return fmt.Errorf("cannot handle topic %s", msg.Topic) - } + var ( + m interface{} + err error + ) - // drop nil messages - if msg.Data == nil { - log.Printf("dropping nil message for key %s from %s/%d", msg.Key, msg.Topic, msg.Partition) + // decide whether to decode or ignore message + switch { + case msg.Data == nil && g.opts.nilHandling == NilIgnore: + // drop nil messages return nil - } + case msg.Data == nil && g.opts.nilHandling == NilProcess: + // process nil messages without decoding them + m = nil + default: + // get stream subcription + codec := g.graph.codec(msg.Topic) + if codec == nil { + return fmt.Errorf("cannot handle topic %s", msg.Topic) + } - // decode message - m, err := codec.Decode(msg.Data) - if err != nil { - return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err) + // decode message + m, err = codec.Decode(msg.Data) + if err != nil { + return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err) + } } + // start context and call ProcessorCallback ctx.start() defer ctx.finish() // execute even in case of panic cb := g.graph.callback(msg.Topic) diff --git a/processor_test.go b/processor_test.go index d7767ab2..31a09f23 100644 --- a/processor_test.go +++ b/processor_test.go @@ -108,7 +108,7 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu return p } -func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor { +func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor { tm := mock.NewMockTopicManager(ctrl) producer := mock.NewMockProducer(ctrl) @@ -124,7 +124,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer, tm.EXPECT().EnsureStreamExists(loopName(group), len(partitions)).Return(nil) tm.EXPECT().EnsureTableExists(tableName(group), len(partitions)).Return(nil) tm.EXPECT().Close().Return(nil) - p, _ := NewProcessor(nil, + p, err := NewProcessor(nil, DefineGroup(group, Input(topic, rawCodec, cb), Input(topic2, rawCodec, cb), @@ -138,6 +138,7 @@ func createProcessorWithTable(ctrl *gomock.Controller, consumer kafka.Consumer, WithStorageBuilder(sb), WithPartitionChannelSize(0), ) + ensure.Nil(t, err) return p } @@ -572,7 +573,6 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) { p = createProcessor(t, ctrl, consumer, 3, sb) value = []byte("value") ) - // -- expectations -- // 1. start gomock.InOrder( @@ -638,6 +638,132 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) { ensure.Nil(t, err) } +// start processor with table and receives an error from Kafka in the events +// channel after rebalance. +func TestProcessor_StartWithTableWithErrorAfterRebalance(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + err error + consumer = mock.NewMockConsumer(ctrl) + st = mock.NewMockStorage(ctrl) + sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) { + return st, nil + } + final = make(chan bool) + ch = make(chan kafka.Event) + p = createProcessorWithTable(t, ctrl, consumer, 3, sb) + value = []byte("value") + blockit = make(chan bool) + unblocked = make(chan bool) + ) + p.graph.callbacks[topic] = func(ctx Context, msg interface{}) { + fmt.Println("hallodfads", msg) + defer close(unblocked) + <-blockit + fmt.Println("unblocked") + } + + // -- expectations -- + // 1. start + gomock.InOrder( + consumer.EXPECT().Subscribe(topOff).Return(nil), + consumer.EXPECT().Events().Return(ch), + ) + // 2. rebalance + st.EXPECT().Open().Times(6) + st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil).Times(6) + consumer.EXPECT().AddPartition(tableName(group), int32(0), int64(123)) + consumer.EXPECT().AddPartition(tableName(group), int32(1), int64(123)) + consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123)) + consumer.EXPECT().AddPartition(table, int32(0), int64(123)) + consumer.EXPECT().AddPartition(table, int32(1), int64(123)) + consumer.EXPECT().AddPartition(table, int32(2), int64(123)) + // 3. EOF messages + st.EXPECT().MarkRecovered().Times(3) + // 4. messages + consumer.EXPECT().Commit(topic, int32(1), int64(2)) + // st.EXPECT().SetEncoded("key", value).Return(nil) + // st.EXPECT().SetOffset(int64(1)) + st.EXPECT().Sync() + // 5. error + consumer.EXPECT().Close().Do(func() { close(ch) }) + consumer.EXPECT().RemovePartition(tableName(group), int32(0)) + consumer.EXPECT().RemovePartition(tableName(group), int32(1)) + consumer.EXPECT().RemovePartition(tableName(group), int32(2)) + consumer.EXPECT().RemovePartition(table, int32(0)) + consumer.EXPECT().RemovePartition(table, int32(1)) + consumer.EXPECT().RemovePartition(table, int32(2)) + st.EXPECT().Sync().Times(6) + st.EXPECT().Close().Times(6) + p.producer.(*mock.MockProducer).EXPECT().Close() + + // -- test -- + // 1. start + go func() { + err := p.Start() + ensure.NotNil(t, err) + close(final) + }() + + // 2. rebalance + ensure.True(t, len(p.partitions) == 0) + ensure.True(t, len(p.partitionViews) == 0) + ch <- (*kafka.Assignment)(&map[int32]int64{0: -1, 1: -1, 2: -1}) + err = syncWith(t, ch, -1) // with processor + ensure.Nil(t, err) + ensure.True(t, len(p.partitions) == 3) + ensure.True(t, len(p.partitionViews) == 3) + + // 3. message + ch <- &kafka.EOF{ + Topic: tableName(group), + Hwm: 0, + Partition: 0, + } + err = syncWith(t, ch, 0) // with partition + ensure.Nil(t, err) + ch <- &kafka.EOF{ + Topic: tableName(group), + Hwm: 0, + Partition: 1, + } + err = syncWith(t, ch, 1) // with partition + ensure.Nil(t, err) + ch <- &kafka.EOF{ + Topic: tableName(group), + Hwm: 0, + Partition: 2, + } + err = syncWith(t, ch, 2) // with partition + ensure.Nil(t, err) + + // 4. heavy message + ch <- &kafka.Message{ + Topic: topic, + Partition: 1, + Offset: 2, + Key: "key", + Value: value, + } + // dont wait for that + + // 4. receive error + ch <- new(kafka.Error) + + // sync with partition (should be unblocked) + close(blockit) + <-unblocked + + // 5. stop + err = doTimed(t, func() { + <-final + p.Stop() + }) + ensure.Nil(t, err) +} + func TestProcessor_Start(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -805,7 +931,7 @@ func TestProcessor_StartWithTable(t *testing.T) { } final = make(chan bool) ch = make(chan kafka.Event) - p = createProcessorWithTable(ctrl, consumer, 3, sb) + p = createProcessorWithTable(t, ctrl, consumer, 3, sb) producer = p.producer.(*mock.MockProducer) value = []byte("value") ) @@ -1268,6 +1394,89 @@ func TestProcessor_consumePanic(t *testing.T) { ensure.True(t, strings.Contains(processorErrors.Error(), "panicking")) } +type nilValue struct{} +type nilCodec struct{} + +func (nc *nilCodec) Decode(data []byte) (interface{}, error) { + if data == nil { + return new(nilValue), nil + } + return data, nil +} +func (nc *nilCodec) Encode(val interface{}) ([]byte, error) { + return nil, nil +} + +func TestProcessor_consumeNil(t *testing.T) { + + tests := []struct { + name string + cb ProcessCallback + handling NilHandling + codec Codec + }{ + { + "ignore", + func(ctx Context, msg interface{}) { + t.Error("should never call consume") + t.Fail() + }, + NilIgnore, + new(codec.String), + }, + { + "process", + func(ctx Context, msg interface{}) { + if msg != nil { + t.Errorf("message should be nil:%v", msg) + t.Fail() + } + }, + NilProcess, + new(codec.String), + }, + { + "decode", + func(ctx Context, msg interface{}) { + if _, ok := msg.(*nilValue); !ok { + t.Errorf("message should be a decoded nil value: %T", msg) + t.Fail() + } + }, + NilDecode, + new(nilCodec), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + km := NewKafkaMock(t, "test") + proc, err := NewProcessor([]string{"broker"}, + DefineGroup("test", + Input("topic", tc.codec, tc.cb), + ), + append(km.ProcessorOptions(), WithNilHandling(tc.handling))..., + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ) + go func() { + processorErrors = proc.Start() + close(done) + }() + + km.Consume("topic", "key", nil) + + proc.Stop() + <-done + ensure.Nil(t, processorErrors) + }) + } +} + func TestProcessor_failOnRecover(t *testing.T) { var ( recovered int