diff --git a/examples/4-tests/example_test.go b/examples/4-tests/example_test.go index e01c792a..bcc3d455 100644 --- a/examples/4-tests/example_test.go +++ b/examples/4-tests/example_test.go @@ -39,7 +39,6 @@ func Test_1Input(t *testing.T) { } // Scenario (2) -// // One processor with only one input and one output func Test_2InputOutput(t *testing.T) { var ( diff --git a/tester/tester.go b/tester/tester.go index a05da2a6..fc6e9de8 100644 --- a/tester/tester.go +++ b/tester/tester.go @@ -164,6 +164,7 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) { for _, output := range gg.OutputStreams() { km.registerCodec(output.Topic(), output.Codec()) + km.getOrCreateQueue(output.Topic()) } for _, join := range gg.JointTables() { km.getOrCreateQueue(join.Topic()).expectSimpleConsumer() diff --git a/tester/tester_test.go b/tester/tester_test.go index ea3ed495..d2588720 100644 --- a/tester/tester_test.go +++ b/tester/tester_test.go @@ -3,7 +3,7 @@ package tester import ( "context" "fmt" - "log" + "reflect" "sync" "testing" "time" @@ -49,7 +49,33 @@ func Test_SimpleConsume(t *testing.T) { } if receivedMessage != "100" { - log.Fatalf("Message did not get through...") + t.Fatalf("Message did not get through...") + } +} + +func Test_InputOutput(t *testing.T) { + gkt := New(t) + + proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group", + goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) { + ctx.Emit("output", ctx.Key(), msg) + }), + goka.Output("output", new(codec.String)), + ), + goka.WithTester(gkt), + ) + go proc.Run(context.Background()) + + mt := gkt.NewMessageTrackerFromEnd() + + mt.ExpectEmpty("output") + + gkt.ConsumeString("input", "key", "value") + + key, value, ok := mt.NextMessage("output") + + if key != "key" || !reflect.DeepEqual(value, "value") || !ok { + t.Fatalf("Message was not received in the output queue") } }