Skip to content

Commit

Permalink
Merge pull request #134 from lovoo/improve-emitter-mock
Browse files Browse the repository at this point in the history
bugfix tester, improve emitter and mock
  • Loading branch information
db7 committed Jun 21, 2018
2 parents 7d1931c + 397e995 commit 1f455a9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
3 changes: 2 additions & 1 deletion emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (e *Emitter) EmitSync(key string, msg interface{}) error {
}

// Finish waits until the emitter is finished producing all pending messages.
func (e *Emitter) Finish() {
func (e *Emitter) Finish() error {
e.wg.Wait()
return e.producer.Close()
}
20 changes: 18 additions & 2 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,19 @@ func (km *Tester) ConsumeProto(topic string, key string, msg proto.Message) {
km.t.Errorf("Error marshaling message for consume: %v", err)
}
km.ConsumeData(topic, key, data)
km.makeCalls()
}

// ConsumeString simulates a message with a string payload.
func (km *Tester) ConsumeString(topic string, key string, msg string) {
km.ConsumeData(topic, key, []byte(msg))
km.makeCalls()
}

// Consume simulates a message with a byte slice payload.
func (km *Tester) Consume(topic string, key string, msg []byte) {
km.ConsumeData(topic, key, msg)
km.makeCalls()
}

// ConsumeData simulates a message with a byte slice payload. This is the same
Expand All @@ -189,12 +192,16 @@ func (km *Tester) Consume(topic string, key string, msg []byte) {
// used by ConsumeProto by the test case as well as any emit calls of the
// processor being tested.
func (km *Tester) ConsumeData(topic string, key string, data []byte) {
km.consumeData(topic, key, data)
km.makeCalls()
}

func (km *Tester) consumeData(topic string, key string, data []byte) {
defer func() {
if r := recover(); r != nil {
log.Printf("tester: panic ConsumeData: %+v\n", r)
}
}()
defer km.makeCalls()
km.offset++
kafkaMsg := &kafka.Message{
Topic: topic,
Expand Down Expand Up @@ -304,7 +311,7 @@ func (km *Tester) handleEmit(topic string, key string, value []byte) *kafka.Prom
}
if _, hasTopic := km.handledTopics[topic]; hasTopic {
km.newCall(func() {
km.ConsumeData(topic, key, value)
km.consumeData(topic, key, value)
})
} else {
km.offset++
Expand Down Expand Up @@ -339,6 +346,15 @@ func (km *Tester) makeCalls() {
km.wg.Wait()
}

// ClearValues resets everything that might be in the storage by deleting everything
// using the iterator.
func (km *Tester) ClearValues() {
it, _ := km.storage.Iterator()
for it.Next() {
km.storage.Delete(string(it.Key()))
}
}

type consumerMock struct {
tester *Tester
}
Expand Down
41 changes: 41 additions & 0 deletions tester/tester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tester

import (
"context"
"log"
"testing"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

func Test_Blubb(t *testing.T) {

kafkaMock := New(t).SetCodec(new(codec.String))

proc, err := goka.NewProcessor([]string{}, goka.DefineGroup("group",
goka.Input("group-testloop", new(codec.String), func(ctx goka.Context, msg interface{}) {
log.Printf("%v", msg)
}),
goka.Input("topic", new(codec.String), func(ctx goka.Context, msg interface{}) {
ctx.Emit("group-testloop", "key", msg)
}),
goka.Output("group-testloop", new(codec.String)),
goka.Persist(new(codec.String)),
),
goka.WithTester(kafkaMock),
)
if err != nil {
log.Fatalf("%v", err)
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan bool)
go func() {
proc.Run(ctx)
close(done)
}()
kafkaMock.ConsumeString("topic", "sender", "message")
cancel()
<-done
}

0 comments on commit 1f455a9

Please sign in to comment.