From 0e9ba2f748bb8b3ddc381bf40e4da9dfaa571f82 Mon Sep 17 00:00:00 2001 From: franz Date: Wed, 26 Sep 2018 16:21:10 +0200 Subject: [PATCH 1/6] tester implementation refactored --- examples/4-tests/README.md | 6 + examples/4-tests/example_test.go | 206 ++++++++++++ examples/testing/README.md | 17 - examples/testing/context_mock.go | 151 --------- examples/testing/doc.go | 3 - examples/testing/main.go | 58 ---- examples/testing/main_test.go | 87 ----- graph.go | 12 + kafka/consumer.go | 2 + options.go | 32 +- options_test.go | 4 +- processor.go | 6 +- processor_integration_test.go | 388 ++++++++++++++++++++++ processor_test.go | 359 +-------------------- tester/doc.go | 47 +++ tester/messagetracker.go | 95 ++++++ tester/queue.go | 124 ++++++++ tester/queueconsumer.go | 334 +++++++++++++++++++ tester/signal.go | 76 +++++ tester/tester.go | 531 ++++++++++++++----------------- tester/tester_test.go | 440 ++++++++++++++++++++++++- view.go | 7 + 22 files changed, 1987 insertions(+), 998 deletions(-) create mode 100644 examples/4-tests/README.md create mode 100644 examples/4-tests/example_test.go delete mode 100644 examples/testing/README.md delete mode 100644 examples/testing/context_mock.go delete mode 100644 examples/testing/doc.go delete mode 100644 examples/testing/main.go delete mode 100644 examples/testing/main_test.go create mode 100644 processor_integration_test.go create mode 100644 tester/doc.go create mode 100644 tester/messagetracker.go create mode 100644 tester/queue.go create mode 100644 tester/queueconsumer.go create mode 100644 tester/signal.go diff --git a/examples/4-tests/README.md b/examples/4-tests/README.md new file mode 100644 index 00000000..1a627438 --- /dev/null +++ b/examples/4-tests/README.md @@ -0,0 +1,6 @@ +# Test Example + +This Example demonstrates the testing capabilities of the tester package. + +Check the source file to get the examples +https://github.com/lovoo/goka/blob/master/examples/4-tests/example_test.go diff --git a/examples/4-tests/example_test.go b/examples/4-tests/example_test.go new file mode 100644 index 00000000..e01c792a --- /dev/null +++ b/examples/4-tests/example_test.go @@ -0,0 +1,206 @@ +package tests + +import ( + "context" + "fmt" + "testing" + + "github.com/facebookgo/ensure" + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" + "github.com/lovoo/goka/tester" +) + +// Scenario (1) +// One processor with only one input +func Test_1Input(t *testing.T) { + var ( + gkt = tester.New(t) + receivedMessage string + ) + + // create a new processor, registering the tester + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + receivedMessage = msg.(string) + }), + ), + goka.WithTester(gkt), + ) + + // start it + go proc.Run(context.Background()) + + // consume a message + gkt.ConsumeString("input", "key", "some message") + + // ensure the message was received + ensure.DeepEqual(t, receivedMessage, "some message") +} + +// Scenario (2) +// +// One processor with only one input and one output +func Test_2InputOutput(t *testing.T) { + var ( + gkt = tester.New(t) + ) + + // create a new processor, registering the tester + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("output", ctx.Key(), fmt.Sprintf("forwarded: %v", msg)) + }), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + + // start it + go proc.Run(context.Background()) + + // create a new message tracker so we can check that the message was being emitted. + // If we created the message tracker after the ConsumeString, there wouldn't be a message. + mt := gkt.NewMessageTrackerFromEnd() + + // send some message + gkt.ConsumeString("input", "key", "some-message") + + // make sure received the message in the output + key, value, valid := mt.NextMessage("output") + ensure.True(t, valid) + ensure.DeepEqual(t, key, "key") + ensure.DeepEqual(t, value, "forwarded: some-message") +} + +// Scenario (3) +// Instead of an output we will persist the message +func Test_3Persist(t *testing.T) { + var ( + gkt = tester.New(t) + ) + + // create a new processor, registering the tester + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(fmt.Sprintf("state: %v", msg)) + }), + goka.Persist(new(codec.String)), + ), + goka.WithTester(gkt), + ) + + // start it + go proc.Run(context.Background()) + + // send some message + gkt.ConsumeString("input", "key", "some-message") + + // make sure it's correctly persisted in the state + value := gkt.TableValue("group-table", "key") + ensure.DeepEqual(t, value, "state: some-message") +} + +// Scenario (4) +// Often setting up a processor requires quite some boiler plate. This example +// shows how to reuse it using subtests +func Test_Subtest(t *testing.T) { + var ( + gkt = tester.New(t) + ) + + // create a new processor, registering the tester + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(fmt.Sprintf("state: %v", msg)) + ctx.Emit("output", "output-key", fmt.Sprintf("forwarded: %v", msg)) + }), + goka.Persist(new(codec.String)), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + go proc.Run(context.Background()) + + t.Run("test-1", func(t *testing.T) { + // clear all values so we can start with an empty state + gkt.ClearValues() + // in a subtest we can't know what messages already exists in the topics left + // by other tests, so let's start a message tracker from here. + mt := gkt.NewMessageTrackerFromEnd() + + // send a message + gkt.ConsumeString("input", "bob", "hello") + + // check it was emitted + key, value, ok := mt.NextMessage("output") + ensure.True(t, ok) + ensure.DeepEqual(t, key, "output-key") + ensure.DeepEqual(t, value, "forwarded: hello") + + // we should be at the end + mt.ExpectEmpty("output") + + // this is equivalent + _, _, ok = mt.NextMessage("output") + ensure.False(t, ok) + }) + t.Run("test-2", func(t *testing.T) { + // clear all values so we can start with an empty state + gkt.ClearValues() + + // send a message + gkt.ConsumeString("input", "bob", "hello") + + // do some state checks + value := gkt.TableValue("group-table", "bob") + ensure.DeepEqual(t, value, "state: hello") + }) +} + +// Scenario (5) +// It's perfectly fine to have loops and use multiple processors in one tester. +func Test_Chain(t *testing.T) { + var ( + gkt = tester.New(t) + ) + + // First processor: + // input -> loop -> output1 + proc1, _ := goka.NewProcessor([]string{}, goka.DefineGroup("proc1", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Loopback(ctx.Key(), fmt.Sprintf("loop: %v", msg)) + }), + goka.Loop(new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("proc1-out", ctx.Key(), fmt.Sprintf("proc1-out: %v", msg)) + }), + goka.Output("proc1-out", new(codec.String)), + ), + goka.WithTester(gkt), + ) + + // Second processor: + // input -> persist + // create a new processor, registering the tester + proc2, _ := goka.NewProcessor([]string{}, goka.DefineGroup("proc2", + goka.Input("proc1-out", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(fmt.Sprintf("persist: %v", msg)) + }), + goka.Persist(new(codec.String)), + ), + goka.WithTester(gkt), + ) + + go proc1.Run(context.Background()) + go proc2.Run(context.Background()) + + // Now send a message to input + // when this method terminates, we know that the message and all subsequent + // messages that + gkt.ConsumeString("input", "bob", "hello world") + + // the value should be persisted in the second processor's table + value := gkt.TableValue("proc2-table", "bob") + + ensure.DeepEqual(t, value, "persist: proc1-out: loop: hello world") +} diff --git a/examples/testing/README.md b/examples/testing/README.md deleted file mode 100644 index b81914af..00000000 --- a/examples/testing/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Test Example - -This example shos how to test application code with goka. -It can be executed with `go run main.go` but it does not do much, because no -events are produced by default. It is more about the test code. - -# How to test with goka - -Testing application code is supported in two ways: - -* write unit tests mocking goka.Context -* write integration tests mocking the communication with Kafka. - - -# Mocking Context - -TODO diff --git a/examples/testing/context_mock.go b/examples/testing/context_mock.go deleted file mode 100644 index ebf098bd..00000000 --- a/examples/testing/context_mock.go +++ /dev/null @@ -1,151 +0,0 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/lovoo/goka (interfaces: Context) - -package main - -import ( - gomock "github.com/golang/mock/gomock" - goka "github.com/lovoo/goka" - time "time" -) - -// Mock of Context interface -type MockContext struct { - ctrl *gomock.Controller - recorder *_MockContextRecorder -} - -// Recorder for MockContext (not exported) -type _MockContextRecorder struct { - mock *MockContext -} - -func NewMockContext(ctrl *gomock.Controller) *MockContext { - mock := &MockContext{ctrl: ctrl} - mock.recorder = &_MockContextRecorder{mock} - return mock -} - -func (_m *MockContext) EXPECT() *_MockContextRecorder { - return _m.recorder -} - -func (_m *MockContext) Delete() { - _m.ctrl.Call(_m, "Delete") -} - -func (_mr *_MockContextRecorder) Delete() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Delete") -} - -func (_m *MockContext) Emit(_param0 goka.Stream, _param1 string, _param2 interface{}) { - _m.ctrl.Call(_m, "Emit", _param0, _param1, _param2) -} - -func (_mr *_MockContextRecorder) Emit(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Emit", arg0, arg1, arg2) -} - -func (_m *MockContext) Fail(_param0 error) { - _m.ctrl.Call(_m, "Fail", _param0) -} - -func (_mr *_MockContextRecorder) Fail(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Fail", arg0) -} - -func (_m *MockContext) Join(_param0 goka.Table) interface{} { - ret := _m.ctrl.Call(_m, "Join", _param0) - ret0, _ := ret[0].(interface{}) - return ret0 -} - -func (_mr *_MockContextRecorder) Join(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Join", arg0) -} - -func (_m *MockContext) Key() string { - ret := _m.ctrl.Call(_m, "Key") - ret0, _ := ret[0].(string) - return ret0 -} - -func (_mr *_MockContextRecorder) Key() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Key") -} - -func (_m *MockContext) Lookup(_param0 goka.Table, _param1 string) interface{} { - ret := _m.ctrl.Call(_m, "Lookup", _param0, _param1) - ret0, _ := ret[0].(interface{}) - return ret0 -} - -func (_mr *_MockContextRecorder) Lookup(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Lookup", arg0, arg1) -} - -func (_m *MockContext) Loopback(_param0 string, _param1 interface{}) { - _m.ctrl.Call(_m, "Loopback", _param0, _param1) -} - -func (_mr *_MockContextRecorder) Loopback(arg0, arg1 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Loopback", arg0, arg1) -} - -func (_m *MockContext) Offset() int64 { - ret := _m.ctrl.Call(_m, "Offset") - ret0, _ := ret[0].(int64) - return ret0 -} - -func (_mr *_MockContextRecorder) Offset() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Offset") -} - -func (_m *MockContext) Partition() int32 { - ret := _m.ctrl.Call(_m, "Partition") - ret0, _ := ret[0].(int32) - return ret0 -} - -func (_mr *_MockContextRecorder) Partition() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Partition") -} - -func (_m *MockContext) SetValue(_param0 interface{}) { - _m.ctrl.Call(_m, "SetValue", _param0) -} - -func (_mr *_MockContextRecorder) SetValue(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "SetValue", arg0) -} - -func (_m *MockContext) Timestamp() time.Time { - ret := _m.ctrl.Call(_m, "Timestamp") - ret0, _ := ret[0].(time.Time) - return ret0 -} - -func (_mr *_MockContextRecorder) Timestamp() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Timestamp") -} - -func (_m *MockContext) Topic() goka.Stream { - ret := _m.ctrl.Call(_m, "Topic") - ret0, _ := ret[0].(goka.Stream) - return ret0 -} - -func (_mr *_MockContextRecorder) Topic() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Topic") -} - -func (_m *MockContext) Value() interface{} { - ret := _m.ctrl.Call(_m, "Value") - ret0, _ := ret[0].(interface{}) - return ret0 -} - -func (_mr *_MockContextRecorder) Value() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Value") -} diff --git a/examples/testing/doc.go b/examples/testing/doc.go deleted file mode 100644 index f502f203..00000000 --- a/examples/testing/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -package main - -//go:generate mockgen -package main -destination context_mock.go github.com/lovoo/goka Context diff --git a/examples/testing/main.go b/examples/testing/main.go deleted file mode 100644 index c78a20da..00000000 --- a/examples/testing/main.go +++ /dev/null @@ -1,58 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/lovoo/goka" - "github.com/lovoo/goka/codec" -) - -func ConsumeScalar(ctx goka.Context, msg interface{}) { - scalar, is := msg.(int64) - if !is { - ctx.Fail(fmt.Errorf("Invalid message type. expected int64, was %T", msg)) - } else { - ctx.Emit("sink", "outgoing", int64(scalar+1)) - } -} - -func ConsumeScalarState(ctx goka.Context, msg interface{}) { - scalar, is := msg.(int64) - - if !is { - ctx.Fail(fmt.Errorf("Invalid message type. expected int64, was %T", msg)) - } else { - var value int64 - rawValue := ctx.Value() - if rawValue != nil { - value = rawValue.(int64) - } - value += scalar - ctx.SetValue(value) - } -} - -func createProcessor(brokers []string, extraopts ...goka.ProcessorOption) (*goka.Processor, error) { - return goka.NewProcessor(brokers, - goka.DefineGroup( - goka.Group("consume-scalar"), - goka.Persist(new(codec.Int64)), - goka.Input(goka.Stream("scalar-state"), new(codec.Int64), ConsumeScalarState), - goka.Input(goka.Stream("scalar"), new(codec.Int64), ConsumeScalar), - goka.Output(goka.Stream("sink"), new(codec.Int64)), - ), - extraopts..., - ) -} - -func main() { - proc, err := createProcessor([]string{"localhost:9092"}) - if err != nil { - panic(err) - } - - if errs := proc.Run(context.Background()); errs != nil { - fmt.Printf("Error executing processor: %v", errs) - } -} diff --git a/examples/testing/main_test.go b/examples/testing/main_test.go deleted file mode 100644 index 8b090590..00000000 --- a/examples/testing/main_test.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import ( - "context" - "fmt" - "strconv" - "testing" - - "github.com/golang/mock/gomock" - "github.com/lovoo/goka" - "github.com/lovoo/goka/tester" -) - -func Test_ConsumeScalar(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctxMock := NewMockContext(ctrl) - - // test passing a wrong type (simulating wrong codec or unmarshalling errors) - ctxMock.EXPECT().Fail(gomock.Any()) - ConsumeScalar(ctxMock, "invalid-type") - - // correct type, expects emitted value - ctxMock.EXPECT().Emit(goka.Stream("sink"), "outgoing", int64(124)) - ConsumeScalar(ctxMock, int64(123)) -} - -func Test_ConsumeScalar_Integration(t *testing.T) { - // ctrl := goka.NewMockController(t) - // defer ctrl.Finish() - tester := tester.New(t) - proc, err := createProcessor(nil, goka.WithTester(tester)) - - if err != nil { - t.Fatalf("Error creating processor: %v", err) - } - done := make(chan int, 0) - ctx, cancel := context.WithCancel(context.Background()) - go func() { - errs := proc.Run(ctx) - if errs != nil { - t.Errorf("Error executing processor: %v", err) - } - close(done) - }() - - // we are creating a "1" as valid encoded message here for convenience - msg := []byte(strconv.FormatInt(1, 10)) - - // there is no initial value for key "foo" - if val := tester.ValueForKey("foo"); val != nil { - t.Errorf("state was not initially empty: %v", val) - } - - // send the message twice - tester.Consume("scalar-state", "foo", msg) - tester.Consume("scalar-state", "foo", msg) - - fooByte, isByte := tester.ValueForKey("foo").([]byte) - if !isByte { - t.Errorf("state does not exist or is not []byte") - } - value := string(fooByte) - fmt.Printf("%v\n", value) - - if value != "2" { - t.Errorf("Expected value %s, got %s", "2", value) - } - - tester.Consume("scalar", "somekey", msg) - var ( - parsed int64 - parseErr error - ) - // expect that a value was emitted - tester.ExpectEmit("sink", "outgoing", func(value []byte) { - parsed, parseErr = strconv.ParseInt(string(value), 10, 64) - }) - if parseErr != nil || parsed != 2 { - panic(fmt.Errorf("parsing emitted message failed or had a wrong value (%d): %v", parsed, parseErr)) - } - tester.Finish(true) - - cancel() - <-done -} diff --git a/graph.go b/graph.go index 0dd2bc54..1ec5ad49 100644 --- a/graph.go +++ b/graph.go @@ -124,12 +124,14 @@ func DefineGroup(group Group, edges ...Edge) *GroupGraph { switch e := e.(type) { case inputStreams: for _, input := range e { + gg.validateInputTopic(input.Topic()) inputStr := input.(*inputStream) gg.codecs[input.Topic()] = input.Codec() gg.callbacks[input.Topic()] = inputStr.cb gg.inputStreams = append(gg.inputStreams, inputStr) } case *inputStream: + gg.validateInputTopic(e.Topic()) gg.codecs[e.Topic()] = e.Codec() gg.callbacks[e.Topic()] = e.cb gg.inputStreams = append(gg.inputStreams, e) @@ -157,6 +159,16 @@ func DefineGroup(group Group, edges ...Edge) *GroupGraph { return &gg } +func (gg *GroupGraph) validateInputTopic(topic string) { + if topic == "" { + panic("Input topic cannot be empty. This will not work.") + } + + if _, exists := gg.callbacks[topic]; exists { + panic(fmt.Errorf("Callback for topic %s already exists. It is illegal to consume a topic twice", topic)) + } +} + // Validate validates the group graph and returns an error if invalid. // Main validation checks are: // - at most one loopback stream edge is allowed diff --git a/kafka/consumer.go b/kafka/consumer.go index b0410d10..a634b109 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -33,7 +33,9 @@ type Consumer interface { Events() <-chan Event // group consume assumes co-partioned topics + // define input topics to consume Subscribe(topics map[string]int64) error + // marks the consumer ready to start consuming the messages AddGroupPartition(partition int32) Commit(topic string, partition int32, offset int64) error diff --git a/options.go b/options.go index c6667cd5..dee19e0d 100644 --- a/options.go +++ b/options.go @@ -61,7 +61,7 @@ func DefaultHasher() func() hash.Hash32 { /////////////////////////////////////////////////////////////////////////////// // ProcessorOption defines a configuration option to be used when creating a processor. -type ProcessorOption func(*poptions) +type ProcessorOption func(*poptions, *GroupGraph) // processor options type poptions struct { @@ -84,42 +84,42 @@ type poptions struct { // WithUpdateCallback defines the callback called upon recovering a message // from the log. func WithUpdateCallback(cb UpdateCallback) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.updateCallback = cb } } // WithClientID defines the client ID used to identify with Kafka. func WithClientID(clientID string) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.clientID = clientID } } // WithStorageBuilder defines a builder for the storage of each partition. func WithStorageBuilder(sb storage.Builder) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.builders.storage = sb } } // WithTopicManagerBuilder replaces the default topic manager builder. func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.builders.topicmgr = tmb } } // WithConsumerBuilder replaces the default consumer builder. func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.builders.consumer = cb } } // WithProducerBuilder replaces the default producer builder. func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.builders.producer = pb } } @@ -128,7 +128,7 @@ func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption { // This is mostly used for testing by setting it to 0 to have synchronous behavior // of goka. func WithPartitionChannelSize(size int) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.partitionChannelSize = size } } @@ -136,14 +136,14 @@ func WithPartitionChannelSize(size int) ProcessorOption { // WithLogger sets the logger the processor should use. By default, processors // use the standard library logger. func WithLogger(log logger.Logger) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.log = log } } // WithHasher sets the hash function that assigns keys to partitions. func WithHasher(hasher func() hash.Hash32) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.hasher = hasher } } @@ -163,37 +163,41 @@ const ( // WithNilHandling configures how the processor should handle messages with nil // value. By default the processor ignores nil messages. func WithNilHandling(nh NilHandling) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.nilHandling = nh } } +// Tester interface to avoid import cycles when a processor needs to register to +// the tester. type Tester interface { StorageBuilder() storage.Builder ConsumerBuilder() kafka.ConsumerBuilder ProducerBuilder() kafka.ProducerBuilder TopicManagerBuilder() kafka.TopicManagerBuilder + RegisterGroupGraph(*GroupGraph) } // WithTester configures all external connections of a processor, ie, storage, // consumer and producer func WithTester(t Tester) ProcessorOption { - return func(o *poptions) { + return func(o *poptions, gg *GroupGraph) { o.builders.storage = t.StorageBuilder() o.builders.consumer = t.ConsumerBuilder() o.builders.producer = t.ProducerBuilder() o.builders.topicmgr = t.TopicManagerBuilder() o.partitionChannelSize = 0 + t.RegisterGroupGraph(gg) } } -func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { +func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error { opt.clientID = defaultClientID opt.log = logger.Default() opt.hasher = DefaultHasher() for _, o := range opts { - o(opt) + o(opt, gg) } // StorageBuilder should always be set as a default option in NewProcessor diff --git a/options_test.go b/options_test.go index fdc90e7d..04faf401 100644 --- a/options_test.go +++ b/options_test.go @@ -10,11 +10,11 @@ import ( func newMockOptions(t *testing.T) *poptions { opts := new(poptions) - err := opts.applyOptions("") + err := opts.applyOptions(new(GroupGraph)) ensure.Err(t, err, regexp.MustCompile("StorageBuilder not set$")) opts.builders.storage = nullStorageBuilder() - err = opts.applyOptions("") + err = opts.applyOptions(new(GroupGraph)) ensure.Nil(t, err) fmt.Printf("%+v\n", opts) diff --git a/processor.go b/processor.go index c0ad2d08..227b4caa 100644 --- a/processor.go +++ b/processor.go @@ -74,7 +74,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) } opts := new(poptions) - err := opts.applyOptions(string(gg.Group()), options...) + err := opts.applyOptions(gg, options...) if err != nil { return nil, fmt.Errorf(errApplyOptions, err) } @@ -286,14 +286,14 @@ func (g *Processor) Run(ctx context.Context) (rerr error) { }() // create kafka consumer - g.opts.log.Printf("Processor: creating consumer") + g.opts.log.Printf("Processor: creating consumer [%s]", g.graph.Group()) consumer, err := g.opts.builders.consumer(g.brokers, string(g.graph.Group()), g.opts.clientID) if err != nil { return fmt.Errorf(errBuildConsumer, err) } g.consumer = consumer defer func() { - g.opts.log.Printf("Processor: closing consumer") + g.opts.log.Printf("Processor: closing consumer [%s]", g.graph.Group()) if err = g.consumer.Close(); err != nil { _ = g.errors.Collect(fmt.Errorf("error closing consumer: %v", err)) } diff --git a/processor_integration_test.go b/processor_integration_test.go new file mode 100644 index 00000000..8035d17a --- /dev/null +++ b/processor_integration_test.go @@ -0,0 +1,388 @@ +package goka_test + +import ( + "context" + "errors" + "fmt" + "log" + "strings" + "testing" + "time" + + "github.com/facebookgo/ensure" + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" + "github.com/lovoo/goka/kafka" + "github.com/lovoo/goka/mock" + "github.com/lovoo/goka/storage" + "github.com/lovoo/goka/tester" +) + +func doTimed(t *testing.T, do func()) error { + ch := make(chan bool) + go func() { + do() + close(ch) + }() + + select { + case <-time.After(2 * time.Second): + t.Fail() + return errors.New("function took too long to complete") + case <-ch: + } + + return nil +} + +func TestProcessor_StatelessContext(t *testing.T) { + ctrl := mock.NewMockController(t) + defer ctrl.Finish() + var ( + tester = tester.New(t) + ) + + callPersist := func(ctx goka.Context, message interface{}) { + log.Println("processing") + // call a random setvalue, this is expected to fail + ctx.SetValue("value") + t.Errorf("SetValue should panic. We should not have come to that point.") + } + + proc, err := goka.NewProcessor(nil, + goka.DefineGroup( + "stateless-ctx", + goka.Input("input-topic", new(codec.Bytes), callPersist), + ), + goka.WithTester(tester), + ) + ensure.Nil(t, err) + done := make(chan bool) + go func() { + err = proc.Run(context.Background()) + ensure.NotNil(t, err) + close(done) + }() + err = doTimed(t, func() { + // consume a random key/message, the content doesn't matter as this should fail + tester.ConsumeString("input-topic", "key", "msg") + <-done + }) + ensure.Nil(t, err) +} + +func TestProcessor_ProducerError(t *testing.T) { + + t.Run("SetValue", func(t *testing.T) { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + return kafka.NewPromise().Finish(errors.New("producer error")) + }) + + consume := func(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg) + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + goka.Persist(new(codec.String)), + ), + goka.WithTester(tester), + ) + + ensure.Nil(t, err) + var ( + procErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + procErrors = proc.Run(ctx) + close(done) + }() + + tester.ConsumeString("topic", "key", "world") + cancel() + <-done + ensure.NotNil(t, procErrors) + }) + + t.Run("Emit", func(t *testing.T) { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + return kafka.NewPromise().Finish(errors.New("producer error")) + }) + + consume := func(ctx goka.Context, msg interface{}) { + ctx.Emit("blubbb", "key", []byte("some message is emitted")) + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + goka.Persist(new(codec.String)), + ), + goka.WithTester(tester), + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + tester.ConsumeString("topic", "key", "world") + + cancel() + <-done + ensure.True(t, processorErrors != nil) + }) + + t.Run("Value-stateless", func(t *testing.T) { + tester := tester.New(t) + tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { + return kafka.NewPromise().Finish(errors.New("producer error")) + }) + + consume := func(ctx goka.Context, msg interface{}) { + func() { + defer goka.PanicStringContains(t, "stateless") + _ = ctx.Value() + }() + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + ), + goka.WithTester(tester), + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + tester.ConsumeString("topic", "key", "world") + + // stopping the processor. It should actually not produce results + cancel() + <-done + ensure.Nil(t, processorErrors) + }) + +} + +func TestProcessor_consumeFail(t *testing.T) { + tester := tester.New(t) + + consume := func(ctx goka.Context, msg interface{}) { + ctx.Fail(errors.New("consume-failed")) + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + ), + goka.WithTester(tester), + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + tester.ConsumeString("topic", "key", "world") + + cancel() + <-done + ensure.True(t, strings.Contains(processorErrors.Error(), "consume-failed")) +} + +func TestProcessor_consumePanic(t *testing.T) { + tester := tester.New(t) + + consume := func(ctx goka.Context, msg interface{}) { + panic("panicking") + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + ), + goka.WithTester(tester), + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + tester.ConsumeString("topic", "key", "world") + + cancel() + <-done + ensure.NotNil(t, processorErrors) + ensure.True(t, strings.Contains(processorErrors.Error(), "panicking")) +} + +type nilValue struct{} +type nilCodec struct{} + +func (nc *nilCodec) Decode(data []byte) (interface{}, error) { + if data == nil { + return new(nilValue), nil + } + return data, nil +} +func (nc *nilCodec) Encode(val interface{}) ([]byte, error) { + return nil, nil +} + +func TestProcessor_consumeNil(t *testing.T) { + + tests := []struct { + name string + cb goka.ProcessCallback + handling goka.NilHandling + codec goka.Codec + }{ + { + "ignore", + func(ctx goka.Context, msg interface{}) { + t.Error("should never call consume") + t.Fail() + }, + goka.NilIgnore, + new(codec.String), + }, + { + "process", + func(ctx goka.Context, msg interface{}) { + if msg != nil { + t.Errorf("message should be nil:%v", msg) + t.Fail() + } + }, + goka.NilProcess, + new(codec.String), + }, + { + "decode", + func(ctx goka.Context, msg interface{}) { + if _, ok := msg.(*nilValue); !ok { + t.Errorf("message should be a decoded nil value: %T", msg) + t.Fail() + } + }, + goka.NilDecode, + new(nilCodec), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tester := tester.New(t) + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", tc.codec, tc.cb), + ), + goka.WithTester(tester), + goka.WithNilHandling(tc.handling), + ) + + ensure.Nil(t, err) + var ( + processorErrors error + done = make(chan struct{}) + ctx, cancel = context.WithCancel(context.Background()) + ) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + tester.Consume("topic", "key", nil) + + cancel() + <-done + ensure.Nil(t, processorErrors) + }) + } +} + +// tests shutting down the processor during recovery +func TestProcessor_failOnRecover(t *testing.T) { + var ( + recovered int + processorErrors error + _ = processorErrors // make linter happy + done = make(chan struct{}) + msgToRecover = 100 + ) + + tester := tester.New(t) + + consume := func(ctx goka.Context, msg interface{}) { + log.Println("consuming message..", ctx.Key()) + } + + proc, err := goka.NewProcessor([]string{"broker"}, + goka.DefineGroup("test", + goka.Input("topic", new(codec.String), consume), + goka.Persist(new(codec.Bytes)), + ), + goka.WithTester(tester), + goka.WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error { + log.Printf("recovered state: %s: %s", key, string(value)) + recovered++ + time.Sleep(1 * time.Millisecond) + return nil + }), + ) + ensure.Nil(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + processorErrors = proc.Run(ctx) + close(done) + }() + + for i := 0; i < msgToRecover; i++ { + tester.Consume("test-table", "key", []byte(fmt.Sprintf("state-%d", i))) + } + + // let's wait until half of them are roughly recovered + time.Sleep(50 * time.Millisecond) + // stop the processor and wait for it + cancel() + <-done + // make sure the recovery was aborted, so we have recovered something but not all + + // we can't test that anymore since there is no recovery-functionality in the tester implemented + //ensure.True(t, recovered > 0 && recovered < msgToRecover) +} diff --git a/processor_test.go b/processor_test.go index 8e549145..25640425 100644 --- a/processor_test.go +++ b/processor_test.go @@ -8,7 +8,6 @@ import ( "log" "os" "os/signal" - "strings" "sync" "syscall" "testing" @@ -20,7 +19,6 @@ import ( "github.com/lovoo/goka/mock" "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" - "github.com/lovoo/goka/tester" "github.com/facebookgo/ensure" "github.com/golang/mock/gomock" @@ -463,7 +461,7 @@ func TestNewProcessor(t *testing.T) { _, err = NewProcessor(nil, DefineGroup(group, Input(topic, rawCodec, cb)), WithTopicManagerBuilder(createTopicManagerBuilder(tm)), - func(o *poptions) { + func(o *poptions, gg *GroupGraph) { o.builders.storage = nil }, ) @@ -1426,361 +1424,6 @@ func TestProcessor_HasGetStateless(t *testing.T) { ensure.True(t, value == nil) } -func TestProcessor_StatelessContext(t *testing.T) { - ctrl := mock.NewMockController(t) - defer ctrl.Finish() - var ( - tester = tester.New(t).SetCodec(new(codec.Bytes)) - //count int64 - //wait = make(chan bool) - ) - - callPersist := func(ctx Context, message interface{}) { - log.Println("processing") - // call a random setvalue, this is expected to fail - ctx.SetValue("value") - t.Errorf("SetValue should panic. We should not have come to that point.") - } - - proc, err := NewProcessor(nil, - DefineGroup( - "stateless-ctx", - Input("input-topic", new(codec.Bytes), callPersist), - ), - WithTester(tester), - ) - ensure.Nil(t, err) - done := make(chan bool) - go func() { - err = proc.Run(context.Background()) - ensure.NotNil(t, err) - close(done) - }() - err = doTimed(t, func() { - // consume a random key/message, the content doesn't matter as this should fail - tester.ConsumeString("input-topic", "key", "msg") - <-done - }) - ensure.Nil(t, err) -} - -func TestProcessor_ProducerError(t *testing.T) { - - t.Run("SetValue", func(t *testing.T) { - tester := tester.New(t) - tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { - return kafka.NewPromise().Finish(errors.New("producer error")) - }) - - consume := func(ctx Context, msg interface{}) { - ctx.SetValue(msg) - } - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - Persist(new(codec.String)), - ), - WithTester(tester), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.ConsumeString("topic", "key", "world") - cancel() - <-done - ensure.True(t, processorErrors != nil) - }) - - t.Run("Emit", func(t *testing.T) { - tester := tester.New(t) - tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { - return kafka.NewPromise().Finish(errors.New("producer error")) - }) - - consume := func(ctx Context, msg interface{}) { - ctx.Emit("blubbb", "key", []byte("some message is emitted")) - } - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - Persist(new(codec.String)), - ), - WithTester(tester), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.ConsumeString("topic", "key", "world") - - cancel() - <-done - ensure.True(t, processorErrors != nil) - }) - - t.Run("Value-stateless", func(t *testing.T) { - tester := tester.New(t) - tester.ReplaceEmitHandler(func(topic, key string, value []byte) *kafka.Promise { - return kafka.NewPromise().Finish(errors.New("producer error")) - }) - - consume := func(ctx Context, msg interface{}) { - func() { - defer PanicStringContains(t, "stateless") - _ = ctx.Value() - }() - } - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - ), - WithTester(tester), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.ConsumeString("topic", "key", "world") - - // stopping the processor. It should actually not produce results - cancel() - <-done - ensure.Nil(t, processorErrors) - }) - -} - -func TestProcessor_consumeFail(t *testing.T) { - tester := tester.New(t) - - consume := func(ctx Context, msg interface{}) { - ctx.Fail(errors.New("consume-failed")) - } - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - ), - WithTester(tester), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.ConsumeString("topic", "key", "world") - - cancel() - <-done - ensure.True(t, strings.Contains(processorErrors.Error(), "consume-failed")) -} - -func TestProcessor_consumePanic(t *testing.T) { - tester := tester.New(t) - - consume := func(ctx Context, msg interface{}) { - panic("panicking") - } - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - ), - WithTester(tester), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.ConsumeString("topic", "key", "world") - - cancel() - <-done - ensure.NotNil(t, processorErrors) - ensure.True(t, strings.Contains(processorErrors.Error(), "panicking")) -} - -type nilValue struct{} -type nilCodec struct{} - -func (nc *nilCodec) Decode(data []byte) (interface{}, error) { - if data == nil { - return new(nilValue), nil - } - return data, nil -} -func (nc *nilCodec) Encode(val interface{}) ([]byte, error) { - return nil, nil -} - -func TestProcessor_consumeNil(t *testing.T) { - - tests := []struct { - name string - cb ProcessCallback - handling NilHandling - codec Codec - }{ - { - "ignore", - func(ctx Context, msg interface{}) { - t.Error("should never call consume") - t.Fail() - }, - NilIgnore, - new(codec.String), - }, - { - "process", - func(ctx Context, msg interface{}) { - if msg != nil { - t.Errorf("message should be nil:%v", msg) - t.Fail() - } - }, - NilProcess, - new(codec.String), - }, - { - "decode", - func(ctx Context, msg interface{}) { - if _, ok := msg.(*nilValue); !ok { - t.Errorf("message should be a decoded nil value: %T", msg) - t.Fail() - } - }, - NilDecode, - new(nilCodec), - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - tester := tester.New(t) - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", tc.codec, tc.cb), - ), - WithTester(tester), - WithNilHandling(tc.handling), - ) - - ensure.Nil(t, err) - var ( - processorErrors error - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - tester.Consume("topic", "key", nil) - - cancel() - <-done - ensure.Nil(t, processorErrors) - }) - } -} - -func TestProcessor_failOnRecover(t *testing.T) { - var ( - recovered int - processorErrors error - _ = processorErrors // make linter happy - done = make(chan struct{}) - msgToRecover = 100 - ) - - tester := tester.New(t) - - consume := func(ctx Context, msg interface{}) { - log.Println("consuming message..", ctx.Key()) - } - - tester.SetGroupTableCreator(func() (string, []byte) { - time.Sleep(10 * time.Millisecond) - recovered++ - if recovered > msgToRecover { - return "", nil - } - return "key", []byte(fmt.Sprintf("state-%d", recovered)) - }) - - proc, err := NewProcessor([]string{"broker"}, - DefineGroup("test", - Input("topic", new(codec.String), consume), - Persist(rawCodec), - ), - WithTester(tester), - WithUpdateCallback(func(s storage.Storage, partition int32, key string, value []byte) error { - log.Printf("recovered state: %s: %s", key, string(value)) - return nil - }), - ) - - ensure.Nil(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - processorErrors = proc.Run(ctx) - close(done) - }() - - time.Sleep(100 * time.Millisecond) - log.Println("stopping") - cancel() - <-done - log.Println("stopped") - // make sure the recovery was aborted - ensure.True(t, recovered < msgToRecover) -} - // Example shows how to use a callback. For each partition of the topics, a new // goroutine will be created. Topics should be co-partitioned (they should have // the same number of partitions and be partitioned by the same key). diff --git a/tester/doc.go b/tester/doc.go new file mode 100644 index 00000000..028585aa --- /dev/null +++ b/tester/doc.go @@ -0,0 +1,47 @@ +/* + +This package provides a kafka mock that allows integration testing of goka processors. + +Usage + +Simply append a tester option when creating the processor for testing. +Usually it makes sense to move the processor creation to a function that accepts +extra options. That way the test can use exactly the same processor setup. + + // creates the processor defining its group graph + func createProcessor(brokers []string, options ...goka.ProcessorOption) *goka.Processor{ + return goka.NewProcessor(brokers, goka.DefineGroup("group", + // some group definitions + options..., + ), + ) + } + +In the main function we would run the processor like this: + func main(){ + proc := createProcessor([]string{"broker1:9092"}) + proc.Run(ctx.Background()) + } +And in the unit test something like: + func TestProcessor(t *testing.T){ + // create tester + tester := tester.New(t) + // create the processor + proc := createProcessor(nil, goka.WithTester(tester)) + + // .. do extra initialization if necessary + + go proc.Run(ctx.Background()) + + // execute the actual test + tester.Consume("input-topic", "key", "value") + + value := tester.TableValue("group-table", "key") + if value != expected{ + t.Fatalf("got unexpected table value") + } + } + +See https://github.com/lovoo/goka/tree/master/examples/testing for a full example +*/ +package tester diff --git a/tester/messagetracker.go b/tester/messagetracker.go new file mode 100644 index 00000000..f8883d91 --- /dev/null +++ b/tester/messagetracker.go @@ -0,0 +1,95 @@ +package tester + +// MessageTracker tracks message offsets for each topic for convenient +// 'expect message x to be in topic y' in unit tests +type MessageTracker struct { + t T + nextMessageIndex map[string]int + tester *Tester +} + +func newMessageTracker(tester *Tester, t T) *MessageTracker { + return &MessageTracker{ + t: t, + nextMessageIndex: make(map[string]int), + tester: tester, + } +} + +// NextMessage returns the next message from the topic since the last time this +// function was called (or MoveToEnd) +func (mt *MessageTracker) NextMessage(topic string) (string, interface{}, bool) { + + key, msgRaw, hasNext := mt.NextMessageRaw(topic) + + if !hasNext { + return key, msgRaw, hasNext + } + + decoded, err := mt.tester.codecForTopic(topic).Decode(msgRaw) + if err != nil { + mt.t.Fatalf("Error decoding message: %v", err) + } + return key, decoded, true +} + +// NextMessageRaw returns the next message in passed topic +func (mt *MessageTracker) NextMessageRaw(topic string) (string, []byte, bool) { + q := mt.tester.queueForTopic(topic) + if mt.nextMessageIndex[topic] >= q.size() { + return "", nil, false + } + msg := q.message(mt.nextMessageIndex[topic]) + + mt.nextMessageIndex[topic]++ + return msg.key, msg.value, true +} + +// ExpectEmpty ensures the topic does not contain more messages +func (mt *MessageTracker) ExpectEmpty(topic string) { + if mt.nextMessageIndex[topic] == mt.tester.queueForTopic(topic).size() { + return + } + + codec := mt.tester.codecForTopic(topic) + var remaining []interface{} + for _, msg := range mt.tester.queueForTopic(topic).messagesFrom(mt.nextMessageIndex[topic]) { + decoded, _ := codec.Decode(msg.value) + remaining = append(remaining, decoded) + } + mt.t.Fatalf("Expected topic %s to be empty, but was not (%#v)", topic, remaining) +} + +// MoveToEnd marks the topic to be read regardless of its content +func (mt *MessageTracker) MoveToEnd(topic string) { + mt.nextMessageIndex[topic] = int(mt.tester.queueForTopic(topic).hwm) +} + +// MoveAllToEnd marks all topics to be read +func (mt *MessageTracker) MoveAllToEnd() { + for topic := range mt.tester.topicQueues { + mt.nextMessageIndex[topic] = int(mt.tester.queueForTopic(topic).hwm) + } +} + +// ExpectEmit ensures a message exists in passed topic and key. The message may be +// inspected/unmarshalled by a passed expecter function. +// DEPRECATED: This function is only to get some compatibility and should be removed in future +func (mt *MessageTracker) ExpectEmit(topic string, key string, expecter func(value []byte)) { + + for { + nextKey, value, hasNext := mt.NextMessageRaw(topic) + if !hasNext { + break + } + + if key != nextKey { + continue + } + + // found one, stop here + expecter(value) + return + } + mt.t.Errorf("Did not find expected message in %s for key %s", topic, key) +} diff --git a/tester/queue.go b/tester/queue.go new file mode 100644 index 00000000..4dc6b07f --- /dev/null +++ b/tester/queue.go @@ -0,0 +1,124 @@ +package tester + +import ( + "fmt" + "sync" +) + +type message struct { + offset int64 + key string + value []byte +} + +type queue struct { + sync.Mutex + topic string + messages []*message + hwm int64 + waitConsumerInit sync.WaitGroup + simpleConsumers map[*queueConsumer]int64 + groupConsumers map[*queueConsumer]int64 +} + +func newQueue(topic string) *queue { + + return &queue{ + topic: topic, + simpleConsumers: make(map[*queueConsumer]int64), + groupConsumers: make(map[*queueConsumer]int64), + } +} + +func (q *queue) size() int { + return len(q.messages) +} + +func (q *queue) message(offset int) *message { + return q.messages[offset] +} + +func (q *queue) messagesFrom(from int) []*message { + return q.messages[from:] +} + +func (q *queue) expectGroupConsumer() { + q.Lock() + defer q.Unlock() + q.groupConsumers[newQueueConsumer(q.topic, q)] = 0 +} + +func (q *queue) expectSimpleConsumer() { + q.Lock() + defer q.Unlock() + q.simpleConsumers[newQueueConsumer(q.topic, q)] = 0 +} + +func (q *queue) bindConsumer(cons *consumer, groupConsumer bool) *queueConsumer { + q.Lock() + defer q.Unlock() + + consumers := q.simpleConsumers + if groupConsumer { + consumers = q.groupConsumers + } + for qCons := range consumers { + if !qCons.isBound() { + qCons.bindToConsumer(cons) + return qCons + } + } + panic(fmt.Errorf("did not find an unbound consumer for %s. The group graph was not parsed correctly", q.topic)) +} + +func (q *queue) messagesFromOffset(offset int64) []*message { + q.Lock() + defer q.Unlock() + return q.messages[offset:] +} + +// wait until all consumers are ready to consume (only for startup) +func (q *queue) waitConsumersInit() { + logger.Printf("Consumers in Queue %s", q.topic) + for cons := range q.groupConsumers { + logger.Printf("waiting for group consumer %s to be running", cons.queue.topic) + <-cons.state.WaitForState(running) + logger.Printf(" --> %s is running", cons.queue.topic) + } + + for cons := range q.simpleConsumers { + logger.Printf("waiting for simple consumer %s to be ready", cons.queue.topic) + select { + case <-cons.state.WaitForState(running): + case <-cons.state.WaitForState(stopped): + } + logger.Printf(" --> %s is ready", cons.queue.topic) + } +} + +func (q *queue) waitForConsumers() int { + // wait until all consumers for the queue have processed all the messages + var numMessagesConsumed int + for sub := range q.simpleConsumers { + logger.Printf("waiting for simple consumer %s to finish up", q.topic) + numMessagesConsumed += sub.catchupAndSync() + logger.Printf(">> done waiting for simple consumer %s to finish up", q.topic) + } + for sub := range q.groupConsumers { + logger.Printf("waiting for simple consumer %s to finish up", q.topic) + numMessagesConsumed += sub.catchupAndSync() + logger.Printf(">> done waiting for simple consumer %s to finish up", q.topic) + } + return numMessagesConsumed +} + +func (q *queue) push(key string, value []byte) { + q.Lock() + defer q.Unlock() + q.messages = append(q.messages, &message{ + offset: q.hwm, + key: key, + value: value, + }) + q.hwm++ +} diff --git a/tester/queueconsumer.go b/tester/queueconsumer.go new file mode 100644 index 00000000..47a45cae --- /dev/null +++ b/tester/queueconsumer.go @@ -0,0 +1,334 @@ +package tester + +import ( + "fmt" + "sync" + "time" + + "github.com/lovoo/goka/kafka" +) + +type consumer struct { + tester *Tester + events chan kafka.Event + subscribedTopics map[string]*queueConsumer + simpleConsumers map[string]*queueConsumer + closeOnce sync.Once +} + +const ( + unbound State = iota + bound + running + stopping + stopped + killed +) + +type queueConsumer struct { + queue *queue + nextOffset int64 + waitEventBuffer sync.WaitGroup + state *Signal + eventBuffer chan kafka.Event + events chan kafka.Event + consumer *consumer +} + +func newQueueConsumer(topic string, queue *queue) *queueConsumer { + qc := &queueConsumer{ + queue: queue, + eventBuffer: make(chan kafka.Event, 100000), + state: NewSignal(unbound, bound, stopped, stopping, running, killed).SetState(unbound), + } + return qc +} + +func (qc *queueConsumer) bindToConsumer(cons *consumer) { + logger.Printf("binding consumer to topic %s", qc.queue.topic) + if !qc.state.IsState(unbound) { + panic(fmt.Errorf("error binding %s to consumer. Already bound", qc.queue.topic)) + } + qc.state.SetState(bound) + qc.consumer = cons + qc.events = cons.events +} + +func (qc *queueConsumer) isBound() bool { + return !qc.state.IsState(unbound) +} + +func (qc *queueConsumer) isRunning() bool { + return qc.state.IsState(running) +} + +func (qc *queueConsumer) setRunning() { + qc.state.SetState(running) +} + +func (qc *queueConsumer) stop() { + logger.Printf("closing the queueConsumer for topic %s", qc.queue.topic) + if !qc.state.IsState(running) { + panic(fmt.Sprintf("trying to stop consumer %s which is not running (state=%d)", qc.queue.topic, qc.state.State())) + } + qc.state.SetState(stopping) + logger.Printf("[consumer %s]waiting for stopped", qc.queue.topic) + <-qc.state.WaitForState(stopped) + logger.Printf("[consumer %s] stopped", qc.queue.topic) +} + +func (qc *queueConsumer) kill() { + qc.stop() + qc.state.SetState(killed) +} + +func (qc *queueConsumer) startLoop(setRunning bool) { + logger.Printf("starting queue consumer %s", qc.queue.topic) + // not bound or already running + if qc.state.IsState(unbound) || qc.state.IsState(running) || qc.state.IsState(stopping) { + panic(fmt.Errorf("the queue consumer %s is in state %v. Cannot start", qc.queue.topic, qc.state.State())) + } + if setRunning { + qc.state.SetState(running) + } + go qc.consumeBuffer() +} + +func (qc *queueConsumer) consumeBuffer() { + defer func() { + err := recover() + if err != nil { + logger.Printf("Error consuming the buffer: %v", err) + } + qc.state.SetState(stopped) + }() + + for { + select { + case event, ok := <-qc.eventBuffer: + if !ok { + return + } + logger.Printf("[consumer %s]: From Buffer %#v", qc.queue.topic, event) + + select { + case qc.events <- event: + qc.waitEventBuffer.Done() + + logger.Printf("[consumer %s]: Buffer->Events %#v", qc.queue.topic, event) + case <-qc.state.WaitForState(stopping): + logger.Printf("[consumer %s] received stopping signal", qc.queue.topic) + + logger.Printf("[consumer %s] DROPPING MESSAGE (%#v) because the consumer is closed", qc.queue.topic, event) + qc.waitEventBuffer.Done() + return + } + + case <-qc.state.WaitForState(stopping): + logger.Printf("[consumer %s] received stopping signal", qc.queue.topic) + return + } + } +} + +func (qc *queueConsumer) catchupAndSync() int { + logger.Printf("[consumer %s] catching up", qc.queue.topic) + numMessages := qc.catchupQueue(-1) + logger.Printf("[consumer %s] catching up DONE (%d messages)", qc.queue.topic, numMessages) + + eventsProcessed := make(chan struct{}) + go func() { + logger.Printf("[consumer %s] wait for all events to be processed", qc.queue.topic) + qc.waitEventBuffer.Wait() + logger.Printf("[consumer %s] done processing events", qc.queue.topic) + close(eventsProcessed) + }() + + select { + case <-eventsProcessed: + case <-qc.state.WaitForState(killed): + // The consumer was killed, so we assume the test is done already. + return 0 + case <-qc.state.WaitForState(stopped): + } + return numMessages +} + +func (qc *queueConsumer) startGroupConsumer() { + logger.Printf("[consumer %s] starting group consumer", qc.queue.topic) + qc.catchupQueue(-1) +} + +func (qc *queueConsumer) addToBuffer(event kafka.Event) { + qc.waitEventBuffer.Add(1) + + qc.eventBuffer <- event +} + +func (qc *queueConsumer) startSimpleConsumer(offset int64, firstStart bool) { + logger.Printf("[consumer %s] starting simple consumer (offset=%d)", qc.queue.topic, offset) + if firstStart { + qc.addToBuffer(&kafka.BOF{ + Hwm: qc.queue.hwm, + Offset: 0, + Partition: 0, + Topic: qc.queue.topic, + }) + qc.catchupQueue(offset) + qc.addToBuffer(&kafka.EOF{ + Hwm: qc.queue.hwm, + Partition: 0, + Topic: qc.queue.topic, + }) + } + qc.startLoop(true) +} + +func (qc *queueConsumer) catchupQueue(fromOffset int64) int { + // we'll always get from the beginning when the consumer + // requests -1 or -2 (for end or beginning resp) + if fromOffset < 0 { + fromOffset = qc.nextOffset + } + + // count how many messages we had to catch up on + var forwardedMessages int + for _, msg := range qc.queue.messagesFromOffset(fromOffset) { + qc.addToBuffer(&kafka.Message{ + Key: string(msg.key), + Offset: msg.offset, + Partition: 0, + Timestamp: time.Unix(msg.offset, 0), + Topic: qc.queue.topic, + Value: msg.value, + }) + forwardedMessages++ + // mark the next offset to consume in case we stop here + qc.nextOffset = msg.offset + 1 + } + + qc.addToBuffer(&kafka.EOF{ + Hwm: qc.queue.hwm, + Partition: 0, + Topic: qc.queue.topic, + }) + + // push some more NOPs + for i := 0; i < 2; i++ { + qc.addToBuffer(&kafka.NOP{ + Partition: 0, + Topic: qc.queue.topic, + }) + } + return forwardedMessages +} + +func (qc *queueConsumer) rebalance() { + qc.addToBuffer(&kafka.Assignment{ + 0: -1, + }) +} + +func newConsumer(tester *Tester) *consumer { + return &consumer{ + tester: tester, + events: make(chan kafka.Event, 0), + simpleConsumers: make(map[string]*queueConsumer), + subscribedTopics: make(map[string]*queueConsumer), + } +} + +// Events returns the event channel of the consumer mock +func (tc *consumer) Events() <-chan kafka.Event { + return tc.events +} + +// Subscribe marks the consumer to subscribe to passed topics. +// The consumerMock simply marks the topics as handled to make sure to +// pass emitted messages back to the processor. +func (tc *consumer) Subscribe(topics map[string]int64) error { + for topic := range topics { + if _, exists := tc.subscribedTopics[topic]; exists { + logger.Printf("consumer for %s already exists. This is strange", topic) + } + logger.Printf("Subscribe %s", topic) + tc.subscribedTopics[topic] = tc.tester.getOrCreateQueue(topic).bindConsumer(tc, true) + tc.subscribedTopics[topic].rebalance() + tc.subscribedTopics[topic].startLoop(false) + } + return nil +} + +// AddGroupPartition adds a partition for group consumption. +// No action required in the mock. +func (tc *consumer) AddGroupPartition(partition int32) { + for _, consumer := range tc.subscribedTopics { + logger.Printf("AddGroupPartition %s", consumer.queue.topic) + consumer.startGroupConsumer() + consumer.setRunning() + } +} + +// Commit commits an offest. +// No action required in the mock. +func (tc *consumer) Commit(topic string, partition int32, offset int64) error { + return nil +} + +// AddPartition marks the topic as a table topic. +// The mock has to know the group table topic to ignore emit calls (which would never be consumed) +func (tc *consumer) AddPartition(topic string, partition int32, initialOffset int64) error { + logger.Printf("AddPartition %s", topic) + var firstStart bool + if _, exists := tc.simpleConsumers[topic]; !exists { + firstStart = true + tc.simpleConsumers[topic] = tc.tester.getOrCreateQueue(topic).bindConsumer(tc, false) + } else { + logger.Printf("AddPartition %s: consumer already existed. Will reuse the one", topic) + } + if tc.simpleConsumers[topic].isRunning() { + panic(fmt.Errorf("simple consumer for %s already running. RemovePartition not called or race condition", topic)) + } + tc.simpleConsumers[topic].startSimpleConsumer(initialOffset, firstStart) + + return nil +} + +// RemovePartition removes a partition from a topic. +// No action required in the mock. +func (tc *consumer) RemovePartition(topic string, partition int32) error { + logger.Printf("consumer RemovePartition %s", topic) + if cons, exists := tc.simpleConsumers[topic]; exists { + cons.stop() + } else { + logger.Printf("consumer for topic %s did not exist. Cannot Remove partition", topic) + } + return nil +} + +// Close closes the consumer. +func (tc *consumer) Close() error { + tc.closeOnce.Do(func() { + logger.Printf("closing tester consumer. Will close all subscribed topics") + for _, cons := range tc.subscribedTopics { + if cons.isRunning() { + logger.Printf("closing queue consumer for %s", cons.queue.topic) + cons.kill() + } else { + logger.Printf("queue consumer for %s is not running", cons.queue.topic) + } + } + + for _, cons := range tc.simpleConsumers { + if cons.isRunning() { + logger.Printf("closing simple consumer for %s", cons.queue.topic) + cons.kill() + } else { + logger.Printf("queue consumer for %s is not running", cons.queue.topic) + } + } + + close(tc.events) + }) + return nil +} diff --git a/tester/signal.go b/tester/signal.go new file mode 100644 index 00000000..040c5a09 --- /dev/null +++ b/tester/signal.go @@ -0,0 +1,76 @@ +package tester + +import ( + "fmt" + "sync" +) + +// State types a state of the Signal +type State int + +// Signal allows synchronization on a state, waiting for that state and checking +// the current state +type Signal struct { + sync.Mutex + state State + waitChans map[State][]chan struct{} + allowedStates map[State]bool +} + +// NewSignal creates a new Signal based on the states +func NewSignal(states ...State) *Signal { + s := &Signal{ + waitChans: make(map[State][]chan struct{}), + allowedStates: make(map[State]bool), + } + for _, state := range states { + s.allowedStates[state] = true + } + + return s +} + +// SetState changes the state of the signal +// and notifies all goroutines waiting for the new state +func (s *Signal) SetState(state State) *Signal { + s.Lock() + defer s.Unlock() + if !s.allowedStates[state] { + panic(fmt.Errorf("trying to set illegal state %v", state)) + } + + // set the state and notify all channels waiting for it. + s.state = state + for _, waitChan := range s.waitChans[state] { + close(waitChan) + } + delete(s.waitChans, state) + + return s +} + +// IsState returns if the signal is in the requested state +func (s *Signal) IsState(state State) bool { + return s.state == state +} + +// State returns the current state +func (s *Signal) State() State { + return s.state +} + +// WaitForState returns a channel that closes when the signal reaches passed +// state. +func (s *Signal) WaitForState(state State) chan struct{} { + s.Lock() + defer s.Unlock() + cb := make(chan struct{}) + + if s.IsState(state) { + close(cb) + } else { + s.waitChans[state] = append(s.waitChans[state], cb) + } + + return cb +} diff --git a/tester/tester.go b/tester/tester.go index f150f483..69f3e440 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -1,15 +1,19 @@ package tester import ( + "flag" "fmt" "hash" + "io/ioutil" "log" + "os" + "reflect" "sync" "github.com/facebookgo/ensure" "github.com/golang/protobuf/proto" - "github.com/lovoo/goka/codec" + "github.com/lovoo/goka" "github.com/lovoo/goka/kafka" "github.com/lovoo/goka/storage" ) @@ -20,35 +24,76 @@ type Codec interface { Decode(data []byte) (value interface{}, err error) } +var ( + debug = flag.Bool("tester-debug", false, "show debug prints of the tester.") + logger = log.New(ioutil.Discard, " ", 0) +) + // EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to // simulate producer errors type EmitHandler func(topic string, key string, value []byte) *kafka.Promise +type queuedMessage struct { + topic string + key string + value []byte +} + // Tester allows interacting with a test processor type Tester struct { t T - consumerMock *consumerMock - producerMock *producerMock - topicMgrMock *topicMgrMock - emitHandler EmitHandler - storage storage.Storage - codec Codec - offset int64 - tableOffset int64 - incomingEvents chan kafka.Event - consumerEvents chan kafka.Event - // Stores a map of all topics that are handled by the processor. - // Every time an emit is called, those messages for handled topics are relayed - // after the consume-function has finished. - // All other messages are stored in the emitted-slice for further inspection - handledTopics map[string]bool - groupTopic string - emitted []*kafka.Message - - groupTableCreator func() (string, []byte) - callQueue []func() - wg sync.WaitGroup + producerMock *producerMock + topicMgrMock *topicMgrMock + emitHandler EmitHandler + storages map[string][]storage.Storage + + codecs map[string]goka.Codec + topicQueues map[string]*queue + mQueues sync.RWMutex + + queuedMessages []*queuedMessage +} + +func (km *Tester) queueForTopic(topic string) *queue { + km.mQueues.RLock() + defer km.mQueues.RUnlock() + q, exists := km.topicQueues[topic] + if !exists { + panic(fmt.Errorf("No queue for topic %s", topic)) + } + return q +} + +// CreateMessageTracker creates a message tracker that starts tracking +// the messages from the end of the current queues +func (km *Tester) NewMessageTrackerFromEnd() *MessageTracker { + km.waitStartup() + + mt := newMessageTracker(km, km.t) + km.mQueues.RLock() + defer km.mQueues.RUnlock() + for topic := range km.topicQueues { + mt.MoveToEnd(topic) + } + return mt +} + +func (km *Tester) getOrCreateQueue(topic string) *queue { + km.mQueues.RLock() + _, exists := km.topicQueues[topic] + km.mQueues.RUnlock() + if !exists { + km.mQueues.Lock() + if _, exists = km.topicQueues[topic]; !exists { + km.topicQueues[topic] = newQueue(topic) + } + km.mQueues.Unlock() + } + + km.mQueues.RLock() + defer km.mQueues.RUnlock() + return km.topicQueues[topic] } // T abstracts the interface we assume from the test case. @@ -59,108 +104,108 @@ type T interface { Fatal(a ...interface{}) } -// New returns a new testprocessor mocking every external service -// It should be passed as goka.WithTester to goka.NewProcessor. It essentially -// replaces the storage/consumer/producer/topicmanager with a mock. -// For example, a normal call to NewProcessor like this -// goka.NewProcessor(brokers, group, subscriptions, -// option_a, -// option_b, -// option_c, -// ) -// would become in the unit test: -// tester := tester.New(t) -// NewProcessor(brokers, group, subscriptions, -// option_a, -// option_b, -// option_c, -// WithTester(tester), -// ) +// New returns a new Tester. +// It should be passed as goka.WithTester to goka.NewProcessor. func New(t T) *Tester { + + // activate the logger if debug is turned on + if *debug { + logger.SetOutput(os.Stdout) + } + tester := &Tester{ - storage: storage.NewMemory(), - t: t, - incomingEvents: make(chan kafka.Event), - consumerEvents: make(chan kafka.Event), - handledTopics: make(map[string]bool), - codec: new(codec.Bytes), + t: t, + codecs: make(map[string]goka.Codec), + topicQueues: make(map[string]*queue), + storages: make(map[string][]storage.Storage), } - tester.consumerMock = newConsumerMock(tester) tester.producerMock = newProducerMock(tester.handleEmit) tester.topicMgrMock = newTopicMgrMock(tester) - return tester } -// SetCodec sets the codec for the group table. -func (km *Tester) SetCodec(codec Codec) *Tester { - km.codec = codec - return km +func (km *Tester) registerCodec(topic string, codec goka.Codec) { + if existingCodec, exists := km.codecs[topic]; exists { + if reflect.TypeOf(codec) != reflect.TypeOf(existingCodec) { + panic(fmt.Errorf("There are different codecs for the same topic. This is messed up (%#v, %#v)", codec, existingCodec)) + } + } + km.codecs[topic] = codec +} + +func (km *Tester) codecForTopic(topic string) goka.Codec { + codec, exists := km.codecs[topic] + if !exists { + panic(fmt.Errorf("No codec for topic %s registered.", topic)) + } + return codec } -// SetGroupTableCreator sets a creator for the group table. -func (km *Tester) SetGroupTableCreator(creator func() (string, []byte)) { - km.groupTableCreator = creator +// RegisterGroupGraph is called by a processor when the tester is passed via +// `WithTester(..)`. +// This will setup the tester with the neccessary consumer structure +func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) { + if gg.GroupTable() != nil { + km.getOrCreateQueue(gg.GroupTable().Topic()).expectSimpleConsumer() + km.registerCodec(gg.GroupTable().Topic(), gg.GroupTable().Codec()) + } + + for _, input := range gg.InputStreams() { + km.getOrCreateQueue(input.Topic()).expectGroupConsumer() + km.registerCodec(input.Topic(), input.Codec()) + } + + for _, output := range gg.OutputStreams() { + km.registerCodec(output.Topic(), output.Codec()) + } + for _, join := range gg.JointTables() { + km.getOrCreateQueue(join.Topic()).expectSimpleConsumer() + km.registerCodec(join.Topic(), join.Codec()) + } + + if loop := gg.LoopStream(); loop != nil { + km.getOrCreateQueue(loop.Topic()).expectGroupConsumer() + km.registerCodec(loop.Topic(), loop.Codec()) + } + + for _, lookup := range gg.LookupTables() { + km.getOrCreateQueue(lookup.Topic()).expectSimpleConsumer() + km.registerCodec(lookup.Topic(), lookup.Codec()) + } + } +// TopicManagerBuilder returns the topicmanager builder when this tester is used as an option +// to a processor func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder { return func(brokers []string) (kafka.TopicManager, error) { return km.topicMgrMock, nil } } +// ConsumerBuilder returns the consumer builder when this tester is used as an option +// to a processor func (km *Tester) ConsumerBuilder() kafka.ConsumerBuilder { return func(b []string, group, clientID string) (kafka.Consumer, error) { - if km.groupTopic == "" { - km.groupTopic = fmt.Sprintf("%s-table", group) - } - return km.consumerMock, nil + return newConsumer(km), nil } } +// ProducerBuilder returns the producer builder when this tester is used as an option +// to a processor func (km *Tester) ProducerBuilder() kafka.ProducerBuilder { return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { return km.producerMock, nil } } +// StorageBuilder returns the storage builder when this tester is used as an option +// to a processor func (km *Tester) StorageBuilder() storage.Builder { return func(topic string, partition int32) (storage.Storage, error) { - return km.storage, nil - } -} - -// initProtocol initiates the protocol with the client basically making the KafkaMock -// usable. -func (km *Tester) initProtocol() { - defer func() { - if r := recover(); r != nil { - log.Printf("tester: panic initProtocol: %+v", r) - } - }() - km.consumerEvents <- &kafka.Assignment{ - 0: -1, - } - - for km.groupTableCreator != nil { - key, value := km.groupTableCreator() - if key == "" || value == nil { - break - } - km.consumerEvents <- &kafka.Message{ - Topic: km.groupTopic, - Partition: 0, - Offset: km.tableOffset, - Key: key, - Value: value, - } - } - - km.consumerEvents <- &kafka.EOF{Partition: 0} - km.consumerEvents <- &kafka.NOP{Partition: -1} - - for ev := range km.incomingEvents { - km.consumerEvents <- ev + st := storage.NewMemory() + km.storages[topic] = append(km.storages[topic], st) + return st, nil } } @@ -170,135 +215,85 @@ func (km *Tester) ConsumeProto(topic string, key string, msg proto.Message) { if err != nil && km.t != nil { km.t.Errorf("Error marshaling message for consume: %v", err) } - km.consumeData(topic, key, data) - km.makeCalls() + km.waitStartup() + km.pushMessage(topic, key, data) + km.waitForConsumers() } // ConsumeString simulates a message with a string payload. func (km *Tester) ConsumeString(topic string, key string, msg string) { - km.consumeData(topic, key, []byte(msg)) - km.makeCalls() + km.waitStartup() + km.pushMessage(topic, key, []byte(msg)) + km.waitForConsumers() } -// Consume simulates a message with a byte slice payload. -func (km *Tester) Consume(topic string, key string, msg []byte) { - km.consumeData(topic, key, msg) - km.makeCalls() -} +func (km *Tester) waitForConsumers() { -// ConsumeData simulates a message with a byte slice payload. This is the same -// as Consume. -// ConsumeData is a helper function consuming marshalled data. This function is -// used by ConsumeProto by the test case as well as any emit calls of the -// processor being tested. -func (km *Tester) ConsumeData(topic string, key string, data []byte) { - km.consumeData(topic, key, data) - km.makeCalls() -} + logger.Printf("waiting for consumers") + for { + if len(km.queuedMessages) == 0 { + break + } + next := km.queuedMessages[0] + km.queuedMessages = km.queuedMessages[1:] -func (km *Tester) consumeData(topic string, key string, data []byte) { - defer func() { - if r := recover(); r != nil { - log.Printf("tester: panic ConsumeData: %+v\n", r) + km.getOrCreateQueue(next.topic).push(next.key, next.value) + + km.mQueues.RLock() + for { + var messagesConsumed int + for _, queue := range km.topicQueues { + messagesConsumed += queue.waitForConsumers() + } + if messagesConsumed == 0 { + break + } } - }() - km.offset++ - kafkaMsg := &kafka.Message{ - Topic: topic, - Partition: 0, - Offset: km.offset, - - Key: key, - Value: data, + km.mQueues.RUnlock() } - // send message to processing goroutine - km.incomingEvents <- kafkaMsg - // wait until partition processing goroutine processes message by requiring it to read - // the following NOP. - km.incomingEvents <- &kafka.NOP{Partition: 0} - km.incomingEvents <- &kafka.NOP{Partition: 0} - - // wait util processor goroutine is ready - km.incomingEvents <- &kafka.NOP{Partition: -1} - km.incomingEvents <- &kafka.NOP{Partition: -1} -} -func (km *Tester) consumeError(err error) { - km.incomingEvents <- &kafka.Error{Err: err} - // no need to send NOP (actuallly we can't, otherwise we might panic - // as the channels are already closed due to the error first). + logger.Printf("waiting for consumers done") } -// ValueForKey attempts to get a value from KafkaMock's storage. -func (km *Tester) ValueForKey(key string) interface{} { - item, err := km.storage.Get(key) - ensure.Nil(km.t, err) - if item == nil { - return nil +func (km *Tester) waitStartup() { + logger.Printf("Tester: Waiting for startup") + km.mQueues.RLock() + defer km.mQueues.RUnlock() + for _, queue := range km.topicQueues { + queue.waitConsumersInit() } - value, err := km.codec.Decode(item) - ensure.Nil(km.t, err) - return value -} - -// SetValue sets a value in the storage. -func (km *Tester) SetValue(key string, value interface{}) { - data, err := km.codec.Encode(value) - ensure.Nil(km.t, err) - err = km.storage.Set(key, data) - ensure.Nil(km.t, err) + logger.Printf("Tester: Waiting for startup done") } -// ReplaceEmitHandler replaces the emitter. -func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) { - km.producerMock.emitter = emitter -} +// Consume a message using the topic's configured codec +func (km *Tester) Consume(topic string, key string, msg interface{}) { + km.waitStartup() -// ExpectEmit ensures a message exists in passed topic and key. The message may be -// inspected/unmarshalled by a passed expecter function. -func (km *Tester) ExpectEmit(topic string, key string, expecter func(value []byte)) { - for i := 0; i < len(km.emitted); i++ { - msg := km.emitted[i] - if msg.Topic != topic || msg.Key != key { - continue - } - if expecter != nil { - expecter(msg.Value) + // if the user wants to send a nil for some reason, + // just let her. Goka should handle it accordingly :) + value := reflect.ValueOf(msg) + if msg == nil || (value.Kind() == reflect.Ptr && value.IsNil()) { + km.pushMessage(topic, key, nil) + } else { + data, err := km.codecForTopic(topic).Encode(msg) + if err != nil { + panic(fmt.Errorf("Error encoding value %v: %v", msg, err)) } - // remove element from slice - // https://github.com/golang/go/wiki/SliceTricks - km.emitted = append(km.emitted[:i], km.emitted[i+1:]...) - return + km.pushMessage(topic, key, data) } - km.t.Errorf("Expected emit for key %s in topic %s was not present.", key, topic) + km.waitForConsumers() } -// ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the -// emitted values -func (km *Tester) ExpectAllEmitted(handler func(topic string, key string, value []byte)) { - for _, emitted := range km.emitted { - handler(emitted.Topic, emitted.Key, emitted.Value) - } - km.emitted = make([]*kafka.Message, 0) -} - -// Finish marks the kafkamock that there is no emit to be expected. -// Set @param fail to true, if kafkamock is supposed to fail the test case in case -// of remaining emits. -// Clears the list of emits either case. -// This should always be called at the end of a test case to make sure -// no emits of prior test cases are stuck in the list and mess with the test results. -func (km *Tester) Finish(fail bool) { - if len(km.emitted) > 0 { - if fail { - km.t.Errorf("The following emits are still in the list, although it's supposed to be empty:") - for _, emitted := range km.emitted { - km.t.Errorf(" topic: %s key: %s", emitted.Topic, emitted.Key) - } - } - } - km.emitted = make([]*kafka.Message, 0) +// ConsumeData pushes a marshalled byte slice to a topic and a key +func (km *Tester) ConsumeData(topic string, key string, data []byte) { + km.waitStartup() + km.pushMessage(topic, key, data) + km.waitForConsumers() +} + +func (km *Tester) pushMessage(topic string, key string, data []byte) { + km.queuedMessages = append(km.queuedMessages, &queuedMessage{topic: topic, key: key, value: data}) } // handleEmit handles an Emit-call on the producerMock. @@ -306,111 +301,68 @@ func (km *Tester) Finish(fail bool) { // to handled topics or putting the emitted messages in the emitted-messages-list func (km *Tester) handleEmit(topic string, key string, value []byte) *kafka.Promise { promise := kafka.NewPromise() - if topic == km.groupTopic { - return promise.Finish(nil) - } - if _, hasTopic := km.handledTopics[topic]; hasTopic { - km.newCall(func() { - km.consumeData(topic, key, value) - }) - } else { - km.offset++ - km.emitted = append(km.emitted, &kafka.Message{ - Topic: topic, - Key: key, - Value: value, - Offset: km.offset, - }) - } + km.pushMessage(topic, key, value) return promise.Finish(nil) } -// creates a new call being executed after the consume function has run. -func (km *Tester) newCall(call func()) { - km.wg.Add(1) - km.callQueue = append(km.callQueue, call) -} - -// executes all calls on the call queue. -// Executing calls may put new calls on the queue (if they emit something), -// so this function executes until no further calls are being made. -func (km *Tester) makeCalls() { - go func() { - for len(km.callQueue) > 0 { - call := km.callQueue[0] - call() - km.callQueue = km.callQueue[1:] - km.wg.Done() - } - }() - km.wg.Wait() -} +// TableValue attempts to get a value from any table that is used in the kafka mock. +func (km *Tester) TableValue(table goka.Table, key string) interface{} { + km.waitStartup() -// ClearValues resets everything that might be in the storage by deleting everything -// using the iterator. -func (km *Tester) ClearValues() { - it, _ := km.storage.Iterator() - for it.Next() { - km.storage.Delete(string(it.Key())) + topic := string(table) + sts := km.storages[topic] + if len(sts) == 0 { + panic(fmt.Errorf("topic %s does not exist", topic)) } -} -type consumerMock struct { - tester *Tester -} - -func newConsumerMock(tester *Tester) *consumerMock { - return &consumerMock{ - tester: tester, + item, err := sts[0].Get(key) + ensure.Nil(km.t, err) + if item == nil { + return nil } + value, err := km.codecForTopic(topic).Decode(item) + ensure.Nil(km.t, err) + return value } -// Events returns the event channel of the consumer mock -func (km *consumerMock) Events() <-chan kafka.Event { - return km.tester.consumerEvents -} - -// Subscribe marks the consumer to subscribe to passed topics. -// The consumerMock simply marks the topics as handled to make sure to -// pass emitted messages back to the processor. -func (km *consumerMock) Subscribe(topics map[string]int64) error { - for topic := range topics { - km.tester.handledTopics[topic] = true - } - go km.tester.initProtocol() - return nil -} +// SetTableValue sets a value in a processor's or view's table direcly via storage +func (km *Tester) SetTableValue(table goka.Table, key string, value interface{}) { + km.waitStartup() -// AddGroupPartition adds a partition for group consumption. -// No action required in the mock. -func (km *consumerMock) AddGroupPartition(partition int32) { -} + logger.Printf("setting value is not implemented yet.") -// Commit commits an offest. -// No action required in the mock. -func (km *consumerMock) Commit(topic string, partition int32, offset int64) error { - return nil -} + topic := string(table) + sts := km.storages[topic] + if len(sts) == 0 { + panic(fmt.Errorf("storage for topic %s does not exist", topic)) + } + data, err := km.codecForTopic(topic).Encode(value) + ensure.Nil(km.t, err) -// AddPartition marks the topic as a table topic. -// The mock has to know the group table topic to ignore emit calls (which would never be consumed) -func (km *consumerMock) AddPartition(topic string, partition int32, initialOffset int64) error { - return nil + for _, st := range sts { + err = st.Set(key, data) + if err != nil { + panic(fmt.Errorf("Error setting key %s in storage %s: %v", key, table, err)) + } + } } -// RemovePartition removes a partition from a topic. -// No action required in the mock. -func (km *consumerMock) RemovePartition(topic string, partition int32) error { - return nil +// ReplaceEmitHandler replaces the emitter. +func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) { + km.producerMock.emitter = emitter } -// Close closes the consumer. -// No action required in the mock. -func (km *consumerMock) Close() error { - close(km.tester.incomingEvents) - close(km.tester.consumerEvents) - fmt.Println("closed consumer mock") - return nil +// ClearValues resets all table values +func (km *Tester) ClearValues() { + for topic, sts := range km.storages { + for _, st := range sts { + logger.Printf("clearing all values from storage for topic %s", topic) + it, _ := st.Iterator() + for it.Next() { + st.Delete(string(it.Key())) + } + } + } } type topicMgrMock struct { @@ -430,7 +382,6 @@ func (tm *topicMgrMock) EnsureStreamExists(topic string, npar int) error { // Partitions returns the number of partitions of a topic, that are assigned to the running // instance, i.e. it doesn't represent all partitions of a topic. func (tm *topicMgrMock) Partitions(topic string) ([]int32, error) { - tm.tester.handledTopics[topic] = true return []int32{0}, nil } diff --git a/tester/tester_test.go b/tester/tester_test.go index f22a1ad1..ea3ed495 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -2,40 +2,450 @@ package tester import ( "context" + "fmt" "log" + "sync" "testing" + "time" + "github.com/facebookgo/ensure" "github.com/lovoo/goka" "github.com/lovoo/goka/codec" ) -func Test_Blubb(t *testing.T) { +// simple consume function that is used in different tests +func increment(ctx goka.Context, msg interface{}) { + value := ctx.Value() + var state int64 + if value != nil { + state = value.(int64) + } + state++ + + ctx.SetValue(state) +} + +func runProcOrFail(proc *goka.Processor) { + go func() { + err := proc.Run(context.Background()) + panic(fmt.Errorf("Processor run errors: %v", err)) + }() +} + +func Test_SimpleConsume(t *testing.T) { + gkt := New(t) + + var receivedMessage string + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + receivedMessage = msg.(string) + }), + ), + goka.WithTester(gkt), + ) + go proc.Run(context.Background()) + for i := 0; i < 101; i++ { + gkt.ConsumeString("input", "key", fmt.Sprintf("%d", i)) + } + + if receivedMessage != "100" { + log.Fatalf("Message did not get through...") + } +} + +func Test_SimplePersist(t *testing.T) { + + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), increment), + goka.Persist(new(codec.Int64)), + ), + goka.WithTester(gkt), + ) + go func() { + err := proc.Run(context.Background()) + t.Fatalf("Processor run errors: %v", err) + }() + + for i := 0; i < 100; i++ { + gkt.ConsumeString("input", "key", fmt.Sprintf("message - %d", i)) + } + + value := gkt.TableValue("group-table", "key") + if value.(int64) != 100 { + t.Fatalf("Message did not get through. was %d", value.(int64)) + } +} + +func Test_Persist_InitialState(t *testing.T) { + + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), increment), + goka.Persist(new(codec.Int64)), + ), + goka.WithTester(gkt), + ) + + go func() { + err := proc.Run(context.Background()) + t.Fatalf("Processor run errors: %v", err) + }() + + gkt.SetTableValue("group-table", "existing", int64(150)) + gkt.ConsumeString("input", "existing", "") + + if gkt.TableValue("group-table", "existing").(int64) != 151 { + t.Fatalf("initial state was not loaded. Expected 151, got %v", gkt.TableValue("group-table", "existing")) + } +} + +// Tests multiple processors in a single mock +func Test_MultiProcessor(t *testing.T) { + + gkt := New(t) + + // first processor gets input and emits to increment topic + input, _ := goka.NewProcessor([]string{}, goka.DefineGroup("numbers", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + time.Sleep(10 * time.Millisecond) + ctx.Emit("forward1", ctx.Key(), "") + }), + goka.Output("forward1", new(codec.String)), + ), + goka.WithTester(gkt), + ) + + forward, _ := goka.NewProcessor([]string{}, goka.DefineGroup("forward1", + goka.Input("forward1", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("forward2", ctx.Key(), "") + }), + goka.Output("forward2", new(codec.String)), + ), + goka.WithTester(gkt), + ) + forward2, _ := goka.NewProcessor([]string{}, goka.DefineGroup("forward2", + goka.Input("forward2", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("forward3", ctx.Key(), "") + }), + goka.Output("forward3", new(codec.String)), + ), + goka.WithTester(gkt), + ) + forward3, _ := goka.NewProcessor([]string{}, goka.DefineGroup("forward3", + goka.Input("forward3", new(codec.String), func(ctx goka.Context, msg interface{}) { + // sleep in between so we know for sure when the waiting implementation is somehow buggy + time.Sleep(10 * time.Millisecond) + ctx.Emit("increment", ctx.Key(), "") + }), + goka.Output("increment", new(codec.String)), + ), + goka.WithTester(gkt), + ) + // second processor increments its state + incrementer, _ := goka.NewProcessor([]string{}, goka.DefineGroup("accu", + goka.Input("increment", new(codec.String), increment), + goka.Persist(new(codec.Int64)), + ), + goka.WithTester(gkt), + ) + + runProcOrFail(input) + runProcOrFail(forward) + runProcOrFail(forward2) + runProcOrFail(forward3) + runProcOrFail(incrementer) + + gkt.ConsumeString("input", "test", "") + + if gkt.TableValue("accu-table", "test").(int64) != 1 { + t.Fatalf("the message did not reached the end") + } +} + +func Test_Loop(t *testing.T) { + gkt := New(t) + + // first processor gets input and emits to increment topic + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("looptest", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Loopback("loop-key", "loopvalue") + }), + goka.Persist(new(codec.Int64)), + goka.Loop(new(codec.String), increment), + ), + goka.WithTester(gkt), + ) + runProcOrFail(proc) + + gkt.ConsumeString("input", "test", "") + if gkt.TableValue("looptest-table", "loop-key").(int64) != 1 { + t.Fatalf("loop failed") + } + time.Sleep(10 * time.Millisecond) +} - kafkaMock := New(t).SetCodec(new(codec.String)) +func Test_Lookup(t *testing.T) { - proc, err := goka.NewProcessor([]string{}, goka.DefineGroup("group", - goka.Input("group-testloop", new(codec.String), func(ctx goka.Context, msg interface{}) { - log.Printf("%v", msg) + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("lookup", + goka.Input("set", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg) }), - goka.Input("topic", new(codec.String), func(ctx goka.Context, msg interface{}) { - ctx.Emit("group-testloop", "key", msg) + goka.Persist(new(codec.String)), + ), + goka.WithTester(gkt), + ) + + // add a lookup table + lookupProc, err := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + val := ctx.Lookup("lookup-table", "somekey").(string) + if val != "42" { + ctx.Fail(fmt.Errorf("lookup value was unexpected")) + } + }), + goka.Lookup("lookup-table", new(codec.String)), + ), + goka.WithTester(gkt), + ) + + if err != nil { + t.Fatalf("Error creating processor: %v", err) + } + runProcOrFail(proc) + runProcOrFail(lookupProc) + gkt.Consume("set", "somekey", "42") + gkt.Consume("input", "sender", "message") +} + +func Test_Join(t *testing.T) { + + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("join", + goka.Input("set", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg) }), - goka.Output("group-testloop", new(codec.String)), goka.Persist(new(codec.String)), ), - goka.WithTester(kafkaMock), + goka.WithTester(gkt), + ) + + // add a lookup table + lookupProc, err := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + val := ctx.Lookup("join-table", "somekey").(string) + if val != "42" { + ctx.Fail(fmt.Errorf("join value was unexpected")) + } + }), + goka.Lookup("join-table", new(codec.String)), + ), + goka.WithTester(gkt), ) + if err != nil { - log.Fatalf("%v", err) + t.Fatalf("Error creating processor: %v", err) + } + runProcOrFail(proc) + runProcOrFail(lookupProc) + gkt.Consume("set", "somekey", "42") + gkt.Consume("input", "sender", "message") +} + +func Test_MessageTracker_Default(t *testing.T) { + + gkt := New(t) + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("lookup", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("output", ctx.Key(), msg) + }), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + runProcOrFail(proc) + mt := gkt.NewMessageTrackerFromEnd() + gkt.Consume("input", "somekey", "123") + + mt.ExpectEmit("output", "somekey", func(value []byte) { + if string(value) != "123" { + t.Fatalf("unexpected output. expected '123', got %s", string(value)) + } + }) + + gkt.Consume("input", "somekey", "124") + key, value, hasNext := mt.NextMessage("output") + if key != "somekey" || value.(string) != "124" || !hasNext { + t.Fatalf("next emitted was something unexpected (key=%s, value=%s, hasNext=%t)", key, value.(string), hasNext) + } + _, _, hasNext = mt.NextMessage("output") + if hasNext { + t.Fatalf("got another emitted message which shouldn't be there") } + mt.ExpectEmpty("output") + +} + +func Test_MessageTracker_Extra(t *testing.T) { + + gkt := New(t) + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("lookup", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("output", ctx.Key(), msg) + }), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + runProcOrFail(proc) + gkt.Consume("input", "somekey", "123") + + tracker := gkt.NewMessageTrackerFromEnd() + + // the new message tracker should start at the end, so the already emitted message + // shouldn't appear + tracker.ExpectEmpty("output") + + gkt.Consume("input", "somekey", "124") + key, value, hasNext := tracker.NextMessage("output") + if key != "somekey" || value.(string) != "124" || !hasNext { + t.Fatalf("next emitted was something unexpected (key=%s, value=%s, hasNext=%t)", key, value.(string), hasNext) + } +} + +func Test_Shutdown(t *testing.T) { + gkt := New(t) + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("lookup", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + }), + ), + goka.WithTester(gkt), + ) + ctx, cancel := context.WithCancel(context.Background()) - done := make(chan bool) + var ( + wg sync.WaitGroup + procErr error + ) + wg.Add(1) go func() { - proc.Run(ctx) - close(done) + defer wg.Done() + procErr = proc.Run(ctx) }() - kafkaMock.ConsumeString("topic", "sender", "message") + + gkt.Consume("input", "test", "test") + + time.Sleep(10 * time.Millisecond) cancel() - <-done + + gkt.Consume("input", "test", "test") + + wg.Wait() + ensure.Nil(t, procErr, "no error, we cancelled the processor") +} + +func Test_LookupWithInitialData(t *testing.T) { + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, + goka.DefineGroup("group", + goka.Inputs(goka.Streams{"input-a", "input-b"}, + new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Loopback(ctx.Key(), "first-loop") + }), + goka.Loop(new(codec.String), func(ctx goka.Context, msg interface{}) { + if msg.(string) == "first-loop" { + ctx.Loopback(ctx.Key(), "second-loop") + } else { + lookupValue := ctx.Lookup("lookup-table", "somekey") + if lookupValue != nil { + ctx.SetValue(fmt.Sprintf("%d", lookupValue)) + } + ctx.Emit("output", ctx.Key(), msg) + } + }), + goka.Output("output", new(codec.String)), + goka.Lookup("lookup-table", new(codec.Int64)), + goka.Persist(new(codec.String)), + ), + goka.WithTester(gkt), + ) + + go proc.Run(context.Background()) + gkt.Consume("lookup-table", "somekey", int64(123)) + + // regression test: this used to block + gkt.Consume("input-a", "key", "value") +} + +func Test_MultiLookup(t *testing.T) { + gkt := New(t) + proc, _ := goka.NewProcessor([]string{}, + goka.DefineGroup("group", + goka.Inputs(goka.Streams{"input"}, + new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.SetValue(msg) + }), + goka.Persist(new(codec.String)), + ), + goka.WithTester(gkt), + ) + + var foundValue int + + lookup1, _ := goka.NewProcessor([]string{}, + goka.DefineGroup("lookup1", + goka.Inputs(goka.Streams{"trigger"}, + new(codec.String), func(ctx goka.Context, msg interface{}) { + lookupValue := ctx.Lookup("group-table", ctx.Key()) + if lookupValue.(string) != msg.(string) { + t.Fatalf("expected %s, got %s", msg, lookupValue) + } else { + foundValue++ + } + }), + goka.Lookup("group-table", new(codec.String)), + ), + goka.WithTester(gkt), + ) + lookup2, _ := goka.NewProcessor([]string{}, + goka.DefineGroup("lookup2", + goka.Inputs(goka.Streams{"trigger"}, + new(codec.String), func(ctx goka.Context, msg interface{}) { + lookupValue := ctx.Lookup("group-table", ctx.Key()) + if lookupValue.(string) != msg.(string) { + t.Fatalf("expected %s, got %s", msg, lookupValue) + } else { + foundValue++ + } + }), + goka.Lookup("group-table", new(codec.String)), + ), + goka.WithTester(gkt), + ) + + go proc.Run(context.Background()) + go lookup1.Run(context.Background()) + go lookup2.Run(context.Background()) + + // set the lookup table value + gkt.Consume("input", "value-from-input", "43") + gkt.Consume("trigger", "value-from-input", "43") + if foundValue != 2 { + t.Fatalf("did not find value in lookup table") + } + + foundValue = 0 + + gkt.SetTableValue("group-table", "set-in-table", "44") + gkt.Consume("trigger", "set-in-table", "44") + if foundValue != 2 { + t.Fatalf("did not find value in lookup table") + } } diff --git a/view.go b/view.go index c6e787ca..61d1ed44 100644 --- a/view.go +++ b/view.go @@ -353,6 +353,13 @@ func (v *View) run(ctx context.Context) error { case <-ctx.Done(): return nil } + case *kafka.NOP: + partition := v.partitions[int(ev.Partition)] + select { + case partition.ch <- ev: + case <-ctx.Done(): + return nil + } case *kafka.Error: return fmt.Errorf("view: error from kafka consumer: %v", ev) default: From d5810800fac985ecc01c39e13aaa6e5962afebe7 Mon Sep 17 00:00:00 2001 From: franz Date: Fri, 5 Oct 2018 11:45:37 +0200 Subject: [PATCH 2/6] gain some performance by mocking away the logger if not used --- tester/tester.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/tester/tester.go b/tester/tester.go index 69f3e440..a05da2a6 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "hash" - "io/ioutil" "log" "os" "reflect" @@ -24,9 +23,17 @@ type Codec interface { Decode(data []byte) (value interface{}, err error) } +type debugLogger interface { + Printf(s string, args ...interface{}) +} + +type nilLogger int + +func (*nilLogger) Printf(s string, args ...interface{}) {} + var ( - debug = flag.Bool("tester-debug", false, "show debug prints of the tester.") - logger = log.New(ioutil.Discard, " ", 0) + debug = flag.Bool("tester-debug", false, "show debug prints of the tester.") + logger debugLogger = new(nilLogger) ) // EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to @@ -110,7 +117,7 @@ func New(t T) *Tester { // activate the logger if debug is turned on if *debug { - logger.SetOutput(os.Stdout) + logger = log.New(os.Stderr, " ", 0) } tester := &Tester{ @@ -355,6 +362,7 @@ func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) { // ClearValues resets all table values func (km *Tester) ClearValues() { for topic, sts := range km.storages { + _ = topic for _, st := range sts { logger.Printf("clearing all values from storage for topic %s", topic) it, _ := st.Iterator() @@ -417,6 +425,6 @@ func (p *producerMock) Emit(topic string, key string, value []byte) *kafka.Promi // Close closes the producer mock // No action required in the mock. func (p *producerMock) Close() error { - fmt.Println("Closing producer mock") + logger.Printf("Closing producer mock") return nil } From 56c8fb22188d1de67ffd640d7c08926c2b950b4e Mon Sep 17 00:00:00 2001 From: franz Date: Fri, 5 Oct 2018 13:43:19 +0200 Subject: [PATCH 3/6] bugfix message tracking on output-queues only --- examples/4-tests/example_test.go | 1 - tester/tester.go | 1 + tester/tester_test.go | 30 ++++++++++++++++++++++++++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/examples/4-tests/example_test.go b/examples/4-tests/example_test.go index e01c792a..bcc3d455 100644 --- a/examples/4-tests/example_test.go +++ b/examples/4-tests/example_test.go @@ -39,7 +39,6 @@ func Test_1Input(t *testing.T) { } // Scenario (2) -// // One processor with only one input and one output func Test_2InputOutput(t *testing.T) { var ( diff --git a/tester/tester.go b/tester/tester.go index a05da2a6..fc6e9de8 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -164,6 +164,7 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) { for _, output := range gg.OutputStreams() { km.registerCodec(output.Topic(), output.Codec()) + km.getOrCreateQueue(output.Topic()) } for _, join := range gg.JointTables() { km.getOrCreateQueue(join.Topic()).expectSimpleConsumer() diff --git a/tester/tester_test.go b/tester/tester_test.go index ea3ed495..d2588720 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -3,7 +3,7 @@ package tester import ( "context" "fmt" - "log" + "reflect" "sync" "testing" "time" @@ -49,7 +49,33 @@ func Test_SimpleConsume(t *testing.T) { } if receivedMessage != "100" { - log.Fatalf("Message did not get through...") + t.Fatalf("Message did not get through...") + } +} + +func Test_InputOutput(t *testing.T) { + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("output", ctx.Key(), msg) + }), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + go proc.Run(context.Background()) + + mt := gkt.NewMessageTrackerFromEnd() + + mt.ExpectEmpty("output") + + gkt.ConsumeString("input", "key", "value") + + key, value, ok := mt.NextMessage("output") + + if key != "key" || !reflect.DeepEqual(value, "value") || !ok { + t.Fatalf("Message was not received in the output queue") } } From b6a218f7b2c4c4a02721ecc7291bfdc1feaf493c Mon Sep 17 00:00:00 2001 From: franz Date: Mon, 8 Oct 2018 13:35:25 +0200 Subject: [PATCH 4/6] add emitter to tester --- emitter.go | 2 +- options.go | 25 +++++++++++++++++-------- tester/tester.go | 6 ++++++ 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/emitter.go b/emitter.go index 0455f170..4485b53f 100644 --- a/emitter.go +++ b/emitter.go @@ -29,7 +29,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO opts := new(eoptions) - err := opts.applyOptions(options...) + err := opts.applyOptions(topic, codec, options...) if err != nil { return nil, fmt.Errorf(errApplyOptions, err) } diff --git a/options.go b/options.go index dee19e0d..868bd229 100644 --- a/options.go +++ b/options.go @@ -176,6 +176,7 @@ type Tester interface { ProducerBuilder() kafka.ProducerBuilder TopicManagerBuilder() kafka.TopicManagerBuilder RegisterGroupGraph(*GroupGraph) + RegisterEmitter(Stream, Codec) } // WithTester configures all external connections of a processor, ie, storage, @@ -338,7 +339,7 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error { // EmitterOption defines a configuration option to be used when creating an // emitter. -type EmitterOption func(*eoptions) +type EmitterOption func(*eoptions, Stream, Codec) // emitter options type eoptions struct { @@ -356,46 +357,54 @@ type eoptions struct { // WithEmitterLogger sets the logger the emitter should use. By default, // emitters use the standard library logger. func WithEmitterLogger(log logger.Logger) EmitterOption { - return func(o *eoptions) { + return func(o *eoptions, topic Stream, codec Codec) { o.log = log } } // WithEmitterClientID defines the client ID used to identify with kafka. func WithEmitterClientID(clientID string) EmitterOption { - return func(o *eoptions) { + return func(o *eoptions, topic Stream, codec Codec) { o.clientID = clientID } } // WithEmitterTopicManagerBuilder replaces the default topic manager builder. func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption { - return func(o *eoptions) { + return func(o *eoptions, topic Stream, codec Codec) { o.builders.topicmgr = tmb } } // WithEmitterProducerBuilder replaces the default producer builder. func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption { - return func(o *eoptions) { + return func(o *eoptions, topic Stream, codec Codec) { o.builders.producer = pb } } // WithEmitterHasher sets the hash function that assigns keys to partitions. func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption { - return func(o *eoptions) { + return func(o *eoptions, topic Stream, codec Codec) { o.hasher = hasher } } -func (opt *eoptions) applyOptions(opts ...EmitterOption) error { +func WithEmitterTester(t Tester) EmitterOption { + return func(o *eoptions, topic Stream, codec Codec) { + o.builders.producer = t.ProducerBuilder() + o.builders.topicmgr = t.TopicManagerBuilder() + t.RegisterEmitter(topic, codec) + } +} + +func (opt *eoptions) applyOptions(topic Stream, codec Codec, opts ...EmitterOption) error { opt.clientID = defaultClientID opt.log = logger.Default() opt.hasher = DefaultHasher() for _, o := range opts { - o(opt) + o(opt, topic, codec) } // config not set, use default one diff --git a/tester/tester.go b/tester/tester.go index fc6e9de8..faba330f 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -183,6 +183,12 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) { } +// RegisterEmitter registers an emitter to be working with the tester. +func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec) { + km.registerCodec(string(topic), codec) + km.getOrCreateQueue(string(topic)) +} + // TopicManagerBuilder returns the topicmanager builder when this tester is used as an option // to a processor func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder { From b3a5703e75279ae43b4457b3091dd0ba48849336 Mon Sep 17 00:00:00 2001 From: franz Date: Fri, 26 Oct 2018 11:31:02 +0200 Subject: [PATCH 5/6] tester: use same storage for a topic for different views/processors --- tester/tester.go | 43 ++++++++++++++++++++----------------------- tester/tester_test.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/tester/tester.go b/tester/tester.go index faba330f..62ea4c44 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -53,7 +53,7 @@ type Tester struct { producerMock *producerMock topicMgrMock *topicMgrMock emitHandler EmitHandler - storages map[string][]storage.Storage + storages map[string]storage.Storage codecs map[string]goka.Codec topicQueues map[string]*queue @@ -72,7 +72,7 @@ func (km *Tester) queueForTopic(topic string) *queue { return q } -// CreateMessageTracker creates a message tracker that starts tracking +// NewMessageTrackerFromEnd creates a message tracker that starts tracking // the messages from the end of the current queues func (km *Tester) NewMessageTrackerFromEnd() *MessageTracker { km.waitStartup() @@ -124,7 +124,7 @@ func New(t T) *Tester { t: t, codecs: make(map[string]goka.Codec), topicQueues: make(map[string]*queue), - storages: make(map[string][]storage.Storage), + storages: make(map[string]storage.Storage), } tester.producerMock = newProducerMock(tester.handleEmit) tester.topicMgrMock = newTopicMgrMock(tester) @@ -217,8 +217,11 @@ func (km *Tester) ProducerBuilder() kafka.ProducerBuilder { // to a processor func (km *Tester) StorageBuilder() storage.Builder { return func(topic string, partition int32) (storage.Storage, error) { + if st, exists := km.storages[topic]; exists { + return st, nil + } st := storage.NewMemory() - km.storages[topic] = append(km.storages[topic], st) + km.storages[topic] = st return st, nil } } @@ -324,12 +327,11 @@ func (km *Tester) TableValue(table goka.Table, key string) interface{} { km.waitStartup() topic := string(table) - sts := km.storages[topic] - if len(sts) == 0 { + st, exists := km.storages[topic] + if !exists { panic(fmt.Errorf("topic %s does not exist", topic)) } - - item, err := sts[0].Get(key) + item, err := st.Get(key) ensure.Nil(km.t, err) if item == nil { return nil @@ -346,18 +348,16 @@ func (km *Tester) SetTableValue(table goka.Table, key string, value interface{}) logger.Printf("setting value is not implemented yet.") topic := string(table) - sts := km.storages[topic] - if len(sts) == 0 { + st, exists := km.storages[topic] + if !exists { panic(fmt.Errorf("storage for topic %s does not exist", topic)) } data, err := km.codecForTopic(topic).Encode(value) ensure.Nil(km.t, err) - for _, st := range sts { - err = st.Set(key, data) - if err != nil { - panic(fmt.Errorf("Error setting key %s in storage %s: %v", key, table, err)) - } + err = st.Set(key, data) + if err != nil { + panic(fmt.Errorf("Error setting key %s in storage %s: %v", key, table, err)) } } @@ -368,14 +368,11 @@ func (km *Tester) ReplaceEmitHandler(emitter EmitHandler) { // ClearValues resets all table values func (km *Tester) ClearValues() { - for topic, sts := range km.storages { - _ = topic - for _, st := range sts { - logger.Printf("clearing all values from storage for topic %s", topic) - it, _ := st.Iterator() - for it.Next() { - st.Delete(string(it.Key())) - } + for topic, st := range km.storages { + logger.Printf("clearing all values from storage for topic %s", topic) + it, _ := st.Iterator() + for it.Next() { + st.Delete(string(it.Key())) } } } diff --git a/tester/tester_test.go b/tester/tester_test.go index d2588720..edca4dc9 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -3,6 +3,7 @@ package tester import ( "context" "fmt" + "log" "reflect" "sync" "testing" @@ -21,7 +22,6 @@ func increment(ctx goka.Context, msg interface{}) { state = value.(int64) } state++ - ctx.SetValue(state) } @@ -210,7 +210,6 @@ func Test_Loop(t *testing.T) { if gkt.TableValue("looptest-table", "loop-key").(int64) != 1 { t.Fatalf("loop failed") } - time.Sleep(10 * time.Millisecond) } func Test_Lookup(t *testing.T) { @@ -475,3 +474,29 @@ func Test_MultiLookup(t *testing.T) { t.Fatalf("did not find value in lookup table") } } + +func Test_ManyConsume(t *testing.T) { + var inputs goka.Streams + for i := 0; i < 100; i++ { + inputs = append(inputs, goka.Stream(fmt.Sprintf("input-%d", i))) + } + + received := map[string]bool{} + + gkt := New(t) + proc, _ := goka.NewProcessor([]string{}, + goka.DefineGroup("group", + goka.Inputs(inputs, new(codec.String), func(ctx goka.Context, msg interface{}) { received[string(ctx.Topic())] = true }), + ), + goka.WithTester(gkt), + ) + go proc.Run(context.Background()) + + // we'll just try to get something consumed + for i := 0; i < 100; i++ { + gkt.Consume(fmt.Sprintf("input-%d", i), "something", "something") + } + if len(received) != 100 { + t.Fatalf("did not receive all messages") + } +} From 50abe071ed5f7315d9d314fd00f623e281798c42 Mon Sep 17 00:00:00 2001 From: franz Date: Fri, 26 Oct 2018 11:38:18 +0200 Subject: [PATCH 6/6] refactor message tracker implementation --- examples/4-tests/example_test.go | 12 ++--- tester/messagetracker.go | 75 ++++++++++---------------------- tester/tester.go | 17 +++++--- tester/tester_test.go | 34 ++++++--------- 4 files changed, 54 insertions(+), 84 deletions(-) diff --git a/examples/4-tests/example_test.go b/examples/4-tests/example_test.go index bcc3d455..7d5c99f3 100644 --- a/examples/4-tests/example_test.go +++ b/examples/4-tests/example_test.go @@ -60,13 +60,13 @@ func Test_2InputOutput(t *testing.T) { // create a new message tracker so we can check that the message was being emitted. // If we created the message tracker after the ConsumeString, there wouldn't be a message. - mt := gkt.NewMessageTrackerFromEnd() + mt := gkt.NewMessageTracker("output") // send some message gkt.ConsumeString("input", "key", "some-message") // make sure received the message in the output - key, value, valid := mt.NextMessage("output") + key, value, valid := mt.Next() ensure.True(t, valid) ensure.DeepEqual(t, key, "key") ensure.DeepEqual(t, value, "forwarded: some-message") @@ -126,22 +126,22 @@ func Test_Subtest(t *testing.T) { gkt.ClearValues() // in a subtest we can't know what messages already exists in the topics left // by other tests, so let's start a message tracker from here. - mt := gkt.NewMessageTrackerFromEnd() + mt := gkt.NewMessageTracker("output") // send a message gkt.ConsumeString("input", "bob", "hello") // check it was emitted - key, value, ok := mt.NextMessage("output") + key, value, ok := mt.Next() ensure.True(t, ok) ensure.DeepEqual(t, key, "output-key") ensure.DeepEqual(t, value, "forwarded: hello") // we should be at the end - mt.ExpectEmpty("output") + mt.ExpectAtEnd() // this is equivalent - _, _, ok = mt.NextMessage("output") + _, _, ok = mt.Next() ensure.False(t, ok) }) t.Run("test-2", func(t *testing.T) { diff --git a/tester/messagetracker.go b/tester/messagetracker.go index f8883d91..eb76325a 100644 --- a/tester/messagetracker.go +++ b/tester/messagetracker.go @@ -4,92 +4,65 @@ package tester // 'expect message x to be in topic y' in unit tests type MessageTracker struct { t T - nextMessageIndex map[string]int + topic string + nextMessageIndex int tester *Tester } -func newMessageTracker(tester *Tester, t T) *MessageTracker { +func newMessageTracker(tester *Tester, t T, topic string) *MessageTracker { return &MessageTracker{ - t: t, - nextMessageIndex: make(map[string]int), - tester: tester, + t: t, + topic: topic, + tester: tester, } } -// NextMessage returns the next message from the topic since the last time this +// Next returns the next message since the last time this // function was called (or MoveToEnd) -func (mt *MessageTracker) NextMessage(topic string) (string, interface{}, bool) { +// It uses the known codec for the topic to decode the message +func (mt *MessageTracker) Next() (string, interface{}, bool) { - key, msgRaw, hasNext := mt.NextMessageRaw(topic) + key, msgRaw, hasNext := mt.NextRaw() if !hasNext { return key, msgRaw, hasNext } - decoded, err := mt.tester.codecForTopic(topic).Decode(msgRaw) + decoded, err := mt.tester.codecForTopic(mt.topic).Decode(msgRaw) if err != nil { mt.t.Fatalf("Error decoding message: %v", err) } return key, decoded, true } -// NextMessageRaw returns the next message in passed topic -func (mt *MessageTracker) NextMessageRaw(topic string) (string, []byte, bool) { - q := mt.tester.queueForTopic(topic) - if mt.nextMessageIndex[topic] >= q.size() { +// NextRaw returns the next message similar to Next(), but without the decoding +func (mt *MessageTracker) NextRaw() (string, []byte, bool) { + q := mt.tester.queueForTopic(mt.topic) + if mt.nextMessageIndex >= q.size() { return "", nil, false } - msg := q.message(mt.nextMessageIndex[topic]) + msg := q.message(mt.nextMessageIndex) - mt.nextMessageIndex[topic]++ + mt.nextMessageIndex++ return msg.key, msg.value, true } -// ExpectEmpty ensures the topic does not contain more messages -func (mt *MessageTracker) ExpectEmpty(topic string) { - if mt.nextMessageIndex[topic] == mt.tester.queueForTopic(topic).size() { +// ExpectLastMessage ensures the message tracker is at the end of the topic +func (mt *MessageTracker) ExpectAtEnd() { + if mt.nextMessageIndex == mt.tester.queueForTopic(mt.topic).size() { return } - codec := mt.tester.codecForTopic(topic) + codec := mt.tester.codecForTopic(mt.topic) var remaining []interface{} - for _, msg := range mt.tester.queueForTopic(topic).messagesFrom(mt.nextMessageIndex[topic]) { + for _, msg := range mt.tester.queueForTopic(mt.topic).messagesFrom(mt.nextMessageIndex) { decoded, _ := codec.Decode(msg.value) remaining = append(remaining, decoded) } - mt.t.Fatalf("Expected topic %s to be empty, but was not (%#v)", topic, remaining) + mt.t.Fatalf("Expected topic %s to be empty, but was not (%#v)", mt.topic, remaining) } // MoveToEnd marks the topic to be read regardless of its content func (mt *MessageTracker) MoveToEnd(topic string) { - mt.nextMessageIndex[topic] = int(mt.tester.queueForTopic(topic).hwm) -} - -// MoveAllToEnd marks all topics to be read -func (mt *MessageTracker) MoveAllToEnd() { - for topic := range mt.tester.topicQueues { - mt.nextMessageIndex[topic] = int(mt.tester.queueForTopic(topic).hwm) - } -} - -// ExpectEmit ensures a message exists in passed topic and key. The message may be -// inspected/unmarshalled by a passed expecter function. -// DEPRECATED: This function is only to get some compatibility and should be removed in future -func (mt *MessageTracker) ExpectEmit(topic string, key string, expecter func(value []byte)) { - - for { - nextKey, value, hasNext := mt.NextMessageRaw(topic) - if !hasNext { - break - } - - if key != nextKey { - continue - } - - // found one, stop here - expecter(value) - return - } - mt.t.Errorf("Did not find expected message in %s for key %s", topic, key) + mt.nextMessageIndex = int(mt.tester.queueForTopic(mt.topic).hwm) } diff --git a/tester/tester.go b/tester/tester.go index 62ea4c44..0ebb6aca 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -9,7 +9,6 @@ import ( "reflect" "sync" - "github.com/facebookgo/ensure" "github.com/golang/protobuf/proto" "github.com/lovoo/goka" @@ -74,10 +73,10 @@ func (km *Tester) queueForTopic(topic string) *queue { // NewMessageTrackerFromEnd creates a message tracker that starts tracking // the messages from the end of the current queues -func (km *Tester) NewMessageTrackerFromEnd() *MessageTracker { +func (km *Tester) NewMessageTracker(topic string) *MessageTracker { km.waitStartup() - mt := newMessageTracker(km, km.t) + mt := newMessageTracker(km, km.t, topic) km.mQueues.RLock() defer km.mQueues.RUnlock() for topic := range km.topicQueues { @@ -332,12 +331,16 @@ func (km *Tester) TableValue(table goka.Table, key string) interface{} { panic(fmt.Errorf("topic %s does not exist", topic)) } item, err := st.Get(key) - ensure.Nil(km.t, err) + if err != nil { + km.t.Fatalf("Error getting table value from storage (table=%s, key=%s): %v", table, key, err) + } if item == nil { return nil } value, err := km.codecForTopic(topic).Decode(item) - ensure.Nil(km.t, err) + if err != nil { + km.t.Fatalf("error decoding value from storage (table=%s, key=%s, value=%v): %v", table, key, item, err) + } return value } @@ -353,7 +356,9 @@ func (km *Tester) SetTableValue(table goka.Table, key string, value interface{}) panic(fmt.Errorf("storage for topic %s does not exist", topic)) } data, err := km.codecForTopic(topic).Encode(value) - ensure.Nil(km.t, err) + if err != nil { + km.t.Fatalf("error decoding value from storage (table=%s, key=%s, value=%v): %v", table, key, value, err) + } err = st.Set(key, data) if err != nil { diff --git a/tester/tester_test.go b/tester/tester_test.go index edca4dc9..40c93969 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -3,13 +3,11 @@ package tester import ( "context" "fmt" - "log" "reflect" "sync" "testing" "time" - "github.com/facebookgo/ensure" "github.com/lovoo/goka" "github.com/lovoo/goka/codec" ) @@ -66,13 +64,13 @@ func Test_InputOutput(t *testing.T) { ) go proc.Run(context.Background()) - mt := gkt.NewMessageTrackerFromEnd() + mt := gkt.NewMessageTracker("output") - mt.ExpectEmpty("output") + mt.ExpectAtEnd() gkt.ConsumeString("input", "key", "value") - key, value, ok := mt.NextMessage("output") + key, value, ok := mt.Next() if key != "key" || !reflect.DeepEqual(value, "value") || !ok { t.Fatalf("Message was not received in the output queue") @@ -294,26 +292,18 @@ func Test_MessageTracker_Default(t *testing.T) { goka.WithTester(gkt), ) runProcOrFail(proc) - mt := gkt.NewMessageTrackerFromEnd() - gkt.Consume("input", "somekey", "123") - - mt.ExpectEmit("output", "somekey", func(value []byte) { - if string(value) != "123" { - t.Fatalf("unexpected output. expected '123', got %s", string(value)) - } - }) - + mt := gkt.NewMessageTracker("output") gkt.Consume("input", "somekey", "124") - key, value, hasNext := mt.NextMessage("output") + key, value, hasNext := mt.Next() if key != "somekey" || value.(string) != "124" || !hasNext { t.Fatalf("next emitted was something unexpected (key=%s, value=%s, hasNext=%t)", key, value.(string), hasNext) } - _, _, hasNext = mt.NextMessage("output") + _, _, hasNext = mt.Next() if hasNext { t.Fatalf("got another emitted message which shouldn't be there") } - mt.ExpectEmpty("output") + mt.ExpectAtEnd() } @@ -331,14 +321,14 @@ func Test_MessageTracker_Extra(t *testing.T) { runProcOrFail(proc) gkt.Consume("input", "somekey", "123") - tracker := gkt.NewMessageTrackerFromEnd() + tracker := gkt.NewMessageTracker("output") // the new message tracker should start at the end, so the already emitted message // shouldn't appear - tracker.ExpectEmpty("output") + tracker.ExpectAtEnd() gkt.Consume("input", "somekey", "124") - key, value, hasNext := tracker.NextMessage("output") + key, value, hasNext := tracker.Next() if key != "somekey" || value.(string) != "124" || !hasNext { t.Fatalf("next emitted was something unexpected (key=%s, value=%s, hasNext=%t)", key, value.(string), hasNext) } @@ -372,7 +362,9 @@ func Test_Shutdown(t *testing.T) { gkt.Consume("input", "test", "test") wg.Wait() - ensure.Nil(t, procErr, "no error, we cancelled the processor") + if procErr != nil { + t.Fatalf("got error for shutting down processor: %v", procErr) + } } func Test_LookupWithInitialData(t *testing.T) {