Skip to content

Commit

Permalink
add emitter to tester
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Oct 8, 2018
1 parent 2fbdf6d commit 26671ea
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
2 changes: 1 addition & 1 deletion emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO

opts := new(eoptions)

err := opts.applyOptions(options...)
err := opts.applyOptions(topic, codec, options...)
if err != nil {
return nil, fmt.Errorf(errApplyOptions, err)
}
Expand Down
25 changes: 17 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ type Tester interface {
ProducerBuilder() kafka.ProducerBuilder
TopicManagerBuilder() kafka.TopicManagerBuilder
RegisterGroupGraph(*GroupGraph)
RegisterEmitter(Stream, Codec)
}

// WithTester configures all external connections of a processor, ie, storage,
Expand Down Expand Up @@ -338,7 +339,7 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {

// EmitterOption defines a configuration option to be used when creating an
// emitter.
type EmitterOption func(*eoptions)
type EmitterOption func(*eoptions, Stream, Codec)

// emitter options
type eoptions struct {
Expand All @@ -356,46 +357,54 @@ type eoptions struct {
// WithEmitterLogger sets the logger the emitter should use. By default,
// emitters use the standard library logger.
func WithEmitterLogger(log logger.Logger) EmitterOption {
return func(o *eoptions) {
return func(o *eoptions, topic Stream, codec Codec) {
o.log = log
}
}

// WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterClientID(clientID string) EmitterOption {
return func(o *eoptions) {
return func(o *eoptions, topic Stream, codec Codec) {
o.clientID = clientID
}
}

// WithEmitterTopicManagerBuilder replaces the default topic manager builder.
func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption {
return func(o *eoptions) {
return func(o *eoptions, topic Stream, codec Codec) {
o.builders.topicmgr = tmb
}
}

// WithEmitterProducerBuilder replaces the default producer builder.
func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption {
return func(o *eoptions) {
return func(o *eoptions, topic Stream, codec Codec) {
o.builders.producer = pb
}
}

// WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption {
return func(o *eoptions) {
return func(o *eoptions, topic Stream, codec Codec) {
o.hasher = hasher
}
}

func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
func WithEmitterTester(t Tester) EmitterOption {
return func(o *eoptions, topic Stream, codec Codec) {
o.builders.producer = t.ProducerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
t.RegisterEmitter(topic, codec)
}
}

func (opt *eoptions) applyOptions(topic Stream, codec Codec, opts ...EmitterOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
opt.hasher = DefaultHasher()

for _, o := range opts {
o(opt)
o(opt, topic, codec)
}

// config not set, use default one
Expand Down
6 changes: 6 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ func (km *Tester) RegisterGroupGraph(gg *goka.GroupGraph) {

}

// RegisterEmitter registers an emitter to be working with the tester.
func (km *Tester) RegisterEmitter(topic goka.Stream, codec goka.Codec) {
km.registerCodec(string(topic), codec)
km.getOrCreateQueue(string(topic))
}

// TopicManagerBuilder returns the topicmanager builder when this tester is used as an option
// to a processor
func (km *Tester) TopicManagerBuilder() kafka.TopicManagerBuilder {
Expand Down

0 comments on commit 26671ea

Please sign in to comment.