Skip to content

Commit

Permalink
bugfix message tracking on output-queues only
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Oct 8, 2018
1 parent b4e1015 commit 2fbdf6d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
1 change: 0 additions & 1 deletion examples/4-tests/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func Test_1Input(t *testing.T) {
}

// Scenario (2)
//
// One processor with only one input and one output
func Test_2InputOutput(t *testing.T) {
var (
Expand Down
1 change: 1 addition & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) {

for _, output := range gg.OutputStreams() {
km.registerCodec(output.Topic(), output.Codec())
km.getOrCreateQueue(output.Topic())
}
for _, join := range gg.JointTables() {
km.getOrCreateQueue(join.Topic()).expectSimpleConsumer()
Expand Down
30 changes: 28 additions & 2 deletions tester/tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package tester
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -49,7 +49,33 @@ func Test_SimpleConsume(t *testing.T) {
}

if receivedMessage != "100" {
log.Fatalf("Message did not get through...")
t.Fatalf("Message did not get through...")
}
}

func Test_InputOutput(t *testing.T) {
gkt := New(t)

proc, _ := goka.NewProcessor([]string{}, goka.DefineGroup("group",
goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) {
ctx.Emit("output", ctx.Key(), msg)
}),
goka.Output("output", new(codec.String)),
),
goka.WithTester(gkt),
)
go proc.Run(context.Background())

mt := gkt.NewMessageTrackerFromEnd()

mt.ExpectEmpty("output")

gkt.ConsumeString("input", "key", "value")

key, value, ok := mt.NextMessage("output")

if key != "key" || !reflect.DeepEqual(value, "value") || !ok {
t.Fatalf("Message was not received in the output queue")
}
}

Expand Down

0 comments on commit 2fbdf6d

Please sign in to comment.