From 0323644dc1c03c6eb9fd8df25550f5db1015f5e8 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 09:15:13 +0100 Subject: [PATCH 01/12] replace topicManager and producer builder options --- emitter.go | 2 +- kafka/producer.go | 15 ++------ kafkamock.go | 13 +++++-- options.go | 89 ++++++++++++++++++++--------------------------- processor.go | 2 +- processor_test.go | 62 +++++++++++++++++++++++---------- 6 files changed, 97 insertions(+), 86 deletions(-) diff --git a/emitter.go b/emitter.go index 98abdf9e..72beb0a9 100644 --- a/emitter.go +++ b/emitter.go @@ -33,7 +33,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO return nil, fmt.Errorf(errApplyOptions, err) } - prod, err := opts.builders.producer(brokers) + prod, err := opts.builders.producer(brokers, opts.clientID, opts.hasher) if err != nil { return nil, fmt.Errorf(errBuildProducer, err) } diff --git a/kafka/producer.go b/kafka/producer.go index f298f361..5a10db5d 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/Shopify/sarama" - "github.com/lovoo/goka/logger" ) // Producer abstracts the kafka producer @@ -15,21 +14,19 @@ type Producer interface { } type producer struct { - log logger.Logger producer sarama.AsyncProducer stop chan bool done chan bool } // NewProducer creates new kafka producer for passed brokers. -func NewProducer(brokers []string, config *sarama.Config, log logger.Logger) (Producer, error) { +func NewProducer(brokers []string, config *sarama.Config) (Producer, error) { aprod, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, fmt.Errorf("Failed to start Sarama producer: %v", err) } p := producer{ - log: log, producer: aprod, stop: make(chan bool), done: make(chan bool), @@ -66,17 +63,11 @@ func (p *producer) run() { return case err := <-p.producer.Errors(): - promise, is := err.Msg.Metadata.(*Promise) - if !is { - p.log.Panicf("invalid metadata type. expected *Promise, got %T", err.Msg.Metadata) - } + promise := err.Msg.Metadata.(*Promise) promise.Finish(err.Err) case msg := <-p.producer.Successes(): - promise, is := msg.Metadata.(*Promise) - if !is { - p.log.Panicf("invalid metadata type. expected *Promise, got %T", msg.Metadata) - } + promise := msg.Metadata.(*Promise) promise.Finish(nil) } } diff --git a/kafkamock.go b/kafkamock.go index 7244b396..cf3bd1eb 100644 --- a/kafkamock.go +++ b/kafkamock.go @@ -2,6 +2,7 @@ package goka import ( "fmt" + "hash" "sync" "github.com/facebookgo/ensure" @@ -124,12 +125,20 @@ func (km *KafkaMock) ProcessorOptions() []ProcessorOption { return km.storage, nil }), WithConsumerBuilder(km.consumerBuilder), - WithProducer(km.producerMock), - WithTopicManager(km.topicMgrMock), + WithProducerBuilder(km.producerBuilder), + WithTopicManagerBuilder(km.topicManagerBuilder), WithPartitionChannelSize(0), } } +func (km *KafkaMock) topicManagerBuilder(brokers []string) (kafka.TopicManager, error) { + return km.topicMgrMock, nil +} + +func (km *KafkaMock) producerBuilder(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) { + return km.producerMock, nil +} + func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) { return km.consumerMock, nil } diff --git a/options.go b/options.go index 0e42883b..a0252bbc 100644 --- a/options.go +++ b/options.go @@ -26,6 +26,13 @@ type StorageBuilder func(topic string, partition int32) (storage.Storage, error) // ConsumerBuilder creates a Kafka consumer. type ConsumerBuilder func(brokers []string, group, clientID string) (kafka.Consumer, error) +// ProducerBuilder create a Kafka producer. +type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (kafka.Producer, error) + +// TopicManagerBuilder creates a TopicManager to check partition counts and +// create tables. +type TopicManagerBuilder func(brokers []string) (kafka.TopicManager, error) + /////////////////////////////////////////////////////////////////////////////// // default values /////////////////////////////////////////////////////////////////////////////// @@ -81,24 +88,22 @@ func DefaultHasher() func() hash.Hash32 { } -// DefaultConsumerBuilder creates Kafka Consumer using the sarama library. +// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. func DefaultConsumerBuilder(brokers []string, group, clientID string) (kafka.Consumer, error) { config := kafka.CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry) return kafka.NewSaramaConsumer(brokers, group, config) } -type producerBuilder func(brokers []string) (kafka.Producer, error) -type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error) - -func defaultProducerBuilder(clientID string, hasher func() hash.Hash32, log logger.Logger) producerBuilder { - return func(brokers []string) (kafka.Producer, error) { - partitioner := sarama.NewCustomHashPartitioner(hasher) - config := kafka.CreateDefaultSaramaConfig(clientID, partitioner, metrics.DefaultRegistry) - return kafka.NewProducer(brokers, &config.Config, log) - } +// DefaultProducerBuilder creates a Kafka producer using the Sarama library. +func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (kafka.Producer, error) { + partitioner := sarama.NewCustomHashPartitioner(hasher) + config := kafka.CreateDefaultSaramaConfig(clientID, partitioner, metrics.DefaultRegistry) + return kafka.NewProducer(brokers, &config.Config) } -func defaultTopicManagerBuilder(brokers []string) (kafka.TopicManager, error) { +// DefaultTopicManagerBuilder creates TopicManager using the Sarama library. +// This topic manager cannot create topics. +func DefaultTopicManagerBuilder(brokers []string) (kafka.TopicManager, error) { return kafka.NewSaramaTopicManager(brokers) } @@ -122,8 +127,8 @@ type poptions struct { builders struct { storage StorageBuilder consumer ConsumerBuilder - producer producerBuilder - topicmgr topicmgrBuilder + producer ProducerBuilder + topicmgr TopicManagerBuilder } } @@ -149,34 +154,24 @@ func WithStorageBuilder(sb StorageBuilder) ProcessorOption { } } -// WithTopicManager defines a topic manager. -func WithTopicManager(tm kafka.TopicManager) ProcessorOption { +// WithTopicManagerBuilder replaces the default topic manager builder. +func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption { return func(o *poptions) { - o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) { - if tm == nil { - return nil, fmt.Errorf("TopicManager cannot be nil") - } - return tm, nil - } + o.builders.topicmgr = tmb } } -// WithConsumerBuilder creates a Kafka consumer. +// WithConsumerBuilder replaces the default consumer builder. func WithConsumerBuilder(cb ConsumerBuilder) ProcessorOption { return func(o *poptions) { o.builders.consumer = cb } } -// WithProducer replaces goka's default producer. -func WithProducer(p kafka.Producer) ProcessorOption { +// WithProducerBuilder replaces the default producer builder. +func WithProducerBuilder(pb ProducerBuilder) ProcessorOption { return func(o *poptions) { - o.builders.producer = func(brokers []string) (kafka.Producer, error) { - if p == nil { - return nil, fmt.Errorf("producer cannot be nil") - } - return p, nil - } + o.builders.producer = pb } } @@ -240,10 +235,10 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { opt.builders.consumer = DefaultConsumerBuilder } if opt.builders.producer == nil { - opt.builders.producer = defaultProducerBuilder(opt.clientID, opt.hasher, opt.log) + opt.builders.producer = DefaultProducerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = defaultTopicManagerBuilder + opt.builders.topicmgr = DefaultTopicManagerBuilder } return nil @@ -267,7 +262,7 @@ type voptions struct { builders struct { storage StorageBuilder consumer ConsumerBuilder - topicmgr topicmgrBuilder + topicmgr TopicManagerBuilder } } @@ -353,7 +348,7 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error { opt.builders.consumer = DefaultConsumerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = defaultTopicManagerBuilder + opt.builders.topicmgr = DefaultTopicManagerBuilder } return nil @@ -376,8 +371,8 @@ type eoptions struct { hasher func() hash.Hash32 builders struct { - topicmgr topicmgrBuilder - producer producerBuilder + topicmgr TopicManagerBuilder + producer ProducerBuilder } } @@ -397,26 +392,16 @@ func WithEmitterClientID(clientID string) EmitterOption { } // WithEmitterTopicManager defines a topic manager. -func WithEmitterTopicManager(tm kafka.TopicManager) EmitterOption { +func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption { return func(o *eoptions) { - o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) { - if tm == nil { - return nil, fmt.Errorf("TopicManager cannot be nil") - } - return tm, nil - } + o.builders.topicmgr = tmb } } // WithEmitterProducer replaces goka's default producer. Mainly for testing. -func WithEmitterProducer(p kafka.Producer) EmitterOption { +func WithEmitterProducer(pb ProducerBuilder) EmitterOption { return func(o *eoptions) { - o.builders.producer = func(brokers []string) (kafka.Producer, error) { - if p == nil { - return nil, fmt.Errorf("producer cannot be nil") - } - return p, nil - } + o.builders.producer = pb } } @@ -438,10 +423,10 @@ func (opt *eoptions) applyOptions(opts ...EmitterOption) error { // config not set, use default one if opt.builders.producer == nil { - opt.builders.producer = defaultProducerBuilder(opt.clientID, opt.hasher, opt.log) + opt.builders.producer = DefaultProducerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = defaultTopicManagerBuilder + opt.builders.topicmgr = DefaultTopicManagerBuilder } return nil diff --git a/processor.go b/processor.go index 26548f46..216d4ba2 100644 --- a/processor.go +++ b/processor.go @@ -90,7 +90,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) } // create kafka producer - producer, err := opts.builders.producer(brokers) + producer, err := opts.builders.producer(brokers, opts.clientID, opts.hasher) if err != nil { return nil, fmt.Errorf(errBuildProducer, err) } diff --git a/processor_test.go b/processor_test.go index fc1bb874..29f0124c 100644 --- a/processor_test.go +++ b/processor_test.go @@ -3,6 +3,7 @@ package goka import ( "errors" "fmt" + "hash" "log" "os" "os/signal" @@ -48,6 +49,31 @@ func createConsumerBuilder(c kafka.Consumer) ConsumerBuilder { return c, nil } } + +func createFailedConsumerBuilder() ConsumerBuilder { + return func(b []string, g, id string) (kafka.Consumer, error) { + return nil, errors.New("failed creating consumer") + } +} + +func createProducerBuilder(p kafka.Producer) ProducerBuilder { + return func(b []string, id string, hasher func() hash.Hash32) (kafka.Producer, error) { + return p, nil + } +} + +func createFailedProducerBuilder() ProducerBuilder { + return func(b []string, id string, hasher func() hash.Hash32) (kafka.Producer, error) { + return nil, errors.New("failed creating producer") + } +} + +func createTopicManagerBuilder(tm kafka.TopicManager) TopicManagerBuilder { + return func(b []string) (kafka.TopicManager, error) { + return tm, nil + } +} + func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, npar int) *Processor { tm := mock.NewMockTopicManager(ctrl) producer := mock.NewMockProducer(ctrl) @@ -68,9 +94,9 @@ func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, Input(topic2, rawCodec, cb), Loop(rawCodec, cb), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), WithPartitionChannelSize(0), ) return p @@ -101,9 +127,9 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu Loop(rawCodec, cb), Persist(new(codec.String)), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), WithStorageBuilder(sb), WithPartitionChannelSize(0), ) @@ -135,9 +161,9 @@ func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer ka Join(table, rawCodec), Persist(rawCodec), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), WithStorageBuilder(sb), WithPartitionChannelSize(0), ) @@ -375,7 +401,7 @@ func TestNewProcessor(t *testing.T) { tm.EXPECT().Close().Return(nil) _, err = NewProcessor(nil, DefineGroup(group, Input(topic, rawCodec, cb)), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), ) ensure.NotNil(t, err) @@ -384,9 +410,9 @@ func TestNewProcessor(t *testing.T) { tm.EXPECT().Close().Return(nil) _, err = NewProcessor(nil, DefineGroup(group, Input(topic, rawCodec, cb)), - WithTopicManager(tm), - WithConsumerBuilder(createConsumerBuilder(nil)), - WithProducer(nil), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), + WithConsumerBuilder(createFailedConsumerBuilder()), + WithProducerBuilder(createProducerBuilder(nil)), ) ensure.NotNil(t, err) @@ -395,9 +421,9 @@ func TestNewProcessor(t *testing.T) { tm.EXPECT().Close().Return(nil) _, err = NewProcessor(nil, DefineGroup(group, Input(topic, rawCodec, cb)), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(nil), + WithProducerBuilder(createFailedProducerBuilder()), ) ensure.NotNil(t, err) @@ -414,9 +440,9 @@ func TestNewProcessor(t *testing.T) { Loop(rawCodec, cb), Persist(rawCodec), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), ) ensure.Nil(t, err) ensure.DeepEqual(t, p.graph.GroupTable().Topic(), tableName(group)) @@ -434,9 +460,9 @@ func TestNewProcessor(t *testing.T) { Input(topic, rawCodec, cb), Input(topic2, rawCodec, cb), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), ) ensure.Nil(t, err) ensure.True(t, p.graph.GroupTable() == nil) @@ -454,9 +480,9 @@ func TestNewProcessor(t *testing.T) { Input(topic, rawCodec, cb), Join(table, rawCodec), ), - WithTopicManager(tm), + WithTopicManagerBuilder(createTopicManagerBuilder(tm)), WithConsumerBuilder(createConsumerBuilder(consumer)), - WithProducer(producer), + WithProducerBuilder(createProducerBuilder(producer)), ) ensure.Nil(t, err) ensure.True(t, p.graph.GroupTable() == nil) From ab086edd9c3eb8664bce4bffc685775c088252f3 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 09:22:29 +0100 Subject: [PATCH 02/12] move kafka builders into kafka package --- kafka/builders.go | 27 +++++++++++++++++++++++++++ options.go | 35 +++++++---------------------------- 2 files changed, 34 insertions(+), 28 deletions(-) create mode 100644 kafka/builders.go diff --git a/kafka/builders.go b/kafka/builders.go new file mode 100644 index 00000000..af8e8d5f --- /dev/null +++ b/kafka/builders.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "hash" + + "github.com/Shopify/sarama" + metrics "github.com/rcrowley/go-metrics" +) + +// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. +func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) { + config := CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry) + return NewSaramaConsumer(brokers, group, config) +} + +// DefaultProducerBuilder creates a Kafka producer using the Sarama library. +func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { + partitioner := sarama.NewCustomHashPartitioner(hasher) + config := CreateDefaultSaramaConfig(clientID, partitioner, metrics.DefaultRegistry) + return NewProducer(brokers, &config.Config) +} + +// DefaultTopicManagerBuilder creates TopicManager using the Sarama library. +// This topic manager cannot create topics. +func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) { + return NewSaramaTopicManager(brokers) +} diff --git a/options.go b/options.go index a0252bbc..e4226e4b 100644 --- a/options.go +++ b/options.go @@ -6,12 +6,10 @@ import ( "hash/fnv" "path/filepath" - "github.com/Shopify/sarama" "github.com/lovoo/goka/kafka" "github.com/lovoo/goka/logger" "github.com/lovoo/goka/storage" - metrics "github.com/rcrowley/go-metrics" "github.com/syndtr/goleveldb/leveldb" ) @@ -88,25 +86,6 @@ func DefaultHasher() func() hash.Hash32 { } -// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. -func DefaultConsumerBuilder(brokers []string, group, clientID string) (kafka.Consumer, error) { - config := kafka.CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry) - return kafka.NewSaramaConsumer(brokers, group, config) -} - -// DefaultProducerBuilder creates a Kafka producer using the Sarama library. -func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (kafka.Producer, error) { - partitioner := sarama.NewCustomHashPartitioner(hasher) - config := kafka.CreateDefaultSaramaConfig(clientID, partitioner, metrics.DefaultRegistry) - return kafka.NewProducer(brokers, &config.Config) -} - -// DefaultTopicManagerBuilder creates TopicManager using the Sarama library. -// This topic manager cannot create topics. -func DefaultTopicManagerBuilder(brokers []string) (kafka.TopicManager, error) { - return kafka.NewSaramaTopicManager(brokers) -} - /////////////////////////////////////////////////////////////////////////////// // processor options /////////////////////////////////////////////////////////////////////////////// @@ -232,13 +211,13 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error { return fmt.Errorf("StorageBuilder not set") } if opt.builders.consumer == nil { - opt.builders.consumer = DefaultConsumerBuilder + opt.builders.consumer = kafka.DefaultConsumerBuilder } if opt.builders.producer == nil { - opt.builders.producer = DefaultProducerBuilder + opt.builders.producer = kafka.DefaultProducerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = DefaultTopicManagerBuilder + opt.builders.topicmgr = kafka.DefaultTopicManagerBuilder } return nil @@ -345,10 +324,10 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error { return fmt.Errorf("StorageBuilder not set") } if opt.builders.consumer == nil { - opt.builders.consumer = DefaultConsumerBuilder + opt.builders.consumer = kafka.DefaultConsumerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = DefaultTopicManagerBuilder + opt.builders.topicmgr = kafka.DefaultTopicManagerBuilder } return nil @@ -423,10 +402,10 @@ func (opt *eoptions) applyOptions(opts ...EmitterOption) error { // config not set, use default one if opt.builders.producer == nil { - opt.builders.producer = DefaultProducerBuilder + opt.builders.producer = kafka.DefaultProducerBuilder } if opt.builders.topicmgr == nil { - opt.builders.topicmgr = DefaultTopicManagerBuilder + opt.builders.topicmgr = kafka.DefaultTopicManagerBuilder } return nil From 6732c23f7fa1112659649e88cdd119efe3e45e44 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 09:27:17 +0100 Subject: [PATCH 03/12] move storage builder into storage package --- options.go | 16 ---------------- processor.go | 2 +- storage/builders.go | 35 +++++++++++++++++++++++++++++++++++ view.go | 2 +- 4 files changed, 37 insertions(+), 18 deletions(-) create mode 100644 storage/builders.go diff --git a/options.go b/options.go index e4226e4b..21e5750c 100644 --- a/options.go +++ b/options.go @@ -9,8 +9,6 @@ import ( "github.com/lovoo/goka/kafka" "github.com/lovoo/goka/logger" "github.com/lovoo/goka/storage" - - "github.com/syndtr/goleveldb/leveldb" ) // UpdateCallback is invoked upon arrival of a message for a table partition. @@ -64,20 +62,6 @@ func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) return s.Set(key, value) } -// DefaultStorageBuilder builds a LevelDB storage with default configuration. -// The database will be stored in the given path. -func DefaultStorageBuilder(path string) StorageBuilder { - return func(topic string, partition int32) (storage.Storage, error) { - fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition)) - db, err := leveldb.OpenFile(fp, nil) - if err != nil { - return nil, fmt.Errorf("error opening leveldb: %v", err) - } - - return storage.New(db) - } -} - // DefaultHasher returns an FNV hasher builder to assign keys to partitions. func DefaultHasher() func() hash.Hash32 { return func() hash.Hash32 { diff --git a/processor.go b/processor.go index 216d4ba2..9eb9d7de 100644 --- a/processor.go +++ b/processor.go @@ -61,7 +61,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) WithLogger(logger.Default()), WithUpdateCallback(DefaultUpdate), WithPartitionChannelSize(defaultPartitionChannelSize), - WithStorageBuilder(DefaultStorageBuilder(DefaultProcessorStoragePath(gg.Group()))), + WithStorageBuilder(storage.DefaultStorageBuilder(DefaultProcessorStoragePath(gg.Group()))), }, // user-defined options (may overwrite default ones) diff --git a/storage/builders.go b/storage/builders.go new file mode 100644 index 00000000..fbda1bae --- /dev/null +++ b/storage/builders.go @@ -0,0 +1,35 @@ +package storage + +import ( + "fmt" + "path/filepath" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +// DefaultStorageBuilder builds a LevelDB storage with default configuration. +// The database will be stored in the given path. +func DefaultStorageBuilder(path string) func(topic string, partition int32) (Storage, error) { + return func(topic string, partition int32) (Storage, error) { + fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition)) + db, err := leveldb.OpenFile(fp, nil) + if err != nil { + return nil, fmt.Errorf("error opening leveldb: %v", err) + } + return New(db) + } +} + +// StorageBuilderWithOptions builds LevelDB storage with the given options and +// in the given path. +func StorageBuilderWithOptions(path string, opts *opt.Options) func(topic string, partition int32) (Storage, error) { + return func(topic string, partition int32) (Storage, error) { + fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition)) + db, err := leveldb.OpenFile(fp, nil) + if err != nil { + return nil, fmt.Errorf("error opening leveldb: %v", err) + } + return New(db) + } +} diff --git a/view.go b/view.go index 039ca1c2..3ba76000 100644 --- a/view.go +++ b/view.go @@ -35,7 +35,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) WithViewLogger(logger.Default()), WithViewCallback(DefaultUpdate), WithViewPartitionChannelSize(defaultPartitionChannelSize), - WithViewStorageBuilder(DefaultStorageBuilder(DefaultViewStoragePath())), + WithViewStorageBuilder(storage.DefaultStorageBuilder(DefaultViewStoragePath())), }, // then the user passed options From f23ff9be04666e8722b7ea3c8e81eaa9ff335488 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 09:58:19 +0100 Subject: [PATCH 04/12] move kafka builder types into kafka package --- kafka/builders.go | 19 ++++++++++++++++++ options.go | 49 ++++++++++++++++------------------------------- processor_test.go | 10 +++++----- view_test.go | 8 ++++++-- 4 files changed, 47 insertions(+), 39 deletions(-) diff --git a/kafka/builders.go b/kafka/builders.go index af8e8d5f..2f8b0bdf 100644 --- a/kafka/builders.go +++ b/kafka/builders.go @@ -4,15 +4,30 @@ import ( "hash" "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" metrics "github.com/rcrowley/go-metrics" ) +// ConsumerBuilder creates a Kafka consumer. +type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, error) + // DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) { config := CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry) return NewSaramaConsumer(brokers, group, config) } +// ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library. +func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder { + return func(brokers []string, group, clientID string) (Consumer, error) { + config.ClientID = clientID + return NewSaramaConsumer(brokers, group, config) + } +} + +// ProducerBuilder create a Kafka producer. +type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) + // DefaultProducerBuilder creates a Kafka producer using the Sarama library. func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { partitioner := sarama.NewCustomHashPartitioner(hasher) @@ -20,6 +35,10 @@ func DefaultProducerBuilder(brokers []string, clientID string, hasher func() has return NewProducer(brokers, &config.Config) } +// TopicManagerBuilder creates a TopicManager to check partition counts and +// create tables. +type TopicManagerBuilder func(brokers []string) (TopicManager, error) + // DefaultTopicManagerBuilder creates TopicManager using the Sarama library. // This topic manager cannot create topics. func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) { diff --git a/options.go b/options.go index 21e5750c..42133f4f 100644 --- a/options.go +++ b/options.go @@ -19,16 +19,6 @@ type UpdateCallback func(s storage.Storage, partition int32, key string, value [ // table. StorageBuilder creates one storage for each partition of the topic. type StorageBuilder func(topic string, partition int32) (storage.Storage, error) -// ConsumerBuilder creates a Kafka consumer. -type ConsumerBuilder func(brokers []string, group, clientID string) (kafka.Consumer, error) - -// ProducerBuilder create a Kafka producer. -type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (kafka.Producer, error) - -// TopicManagerBuilder creates a TopicManager to check partition counts and -// create tables. -type TopicManagerBuilder func(brokers []string) (kafka.TopicManager, error) - /////////////////////////////////////////////////////////////////////////////// // default values /////////////////////////////////////////////////////////////////////////////// @@ -89,9 +79,9 @@ type poptions struct { builders struct { storage StorageBuilder - consumer ConsumerBuilder - producer ProducerBuilder - topicmgr TopicManagerBuilder + consumer kafka.ConsumerBuilder + producer kafka.ProducerBuilder + topicmgr kafka.TopicManagerBuilder } } @@ -118,21 +108,21 @@ func WithStorageBuilder(sb StorageBuilder) ProcessorOption { } // WithTopicManagerBuilder replaces the default topic manager builder. -func WithTopicManagerBuilder(tmb TopicManagerBuilder) ProcessorOption { +func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption { return func(o *poptions) { o.builders.topicmgr = tmb } } // WithConsumerBuilder replaces the default consumer builder. -func WithConsumerBuilder(cb ConsumerBuilder) ProcessorOption { +func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption { return func(o *poptions) { o.builders.consumer = cb } } // WithProducerBuilder replaces the default producer builder. -func WithProducerBuilder(pb ProducerBuilder) ProcessorOption { +func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption { return func(o *poptions) { o.builders.producer = pb } @@ -224,8 +214,8 @@ type voptions struct { builders struct { storage StorageBuilder - consumer ConsumerBuilder - topicmgr TopicManagerBuilder + consumer kafka.ConsumerBuilder + topicmgr kafka.TopicManagerBuilder } } @@ -252,22 +242,17 @@ func WithViewStorageBuilder(sb StorageBuilder) ViewOption { } } -// WithViewConsumer replaces goka's default view consumer. Mainly for testing. -func WithViewConsumerBuilder(cb ConsumerBuilder) ViewOption { +// WithViewConsumer replaces default view consumer. +func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption { return func(o *voptions) { o.builders.consumer = cb } } -// WithViewTopicManager defines a topic manager. -func WithViewTopicManager(tm kafka.TopicManager) ViewOption { +// WithViewTopicManager replaces the default topic manager. +func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption { return func(o *voptions) { - o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) { - if tm == nil { - return nil, fmt.Errorf("TopicManager cannot be nil") - } - return tm, nil - } + o.builders.topicmgr = tmb } } @@ -334,8 +319,8 @@ type eoptions struct { hasher func() hash.Hash32 builders struct { - topicmgr TopicManagerBuilder - producer ProducerBuilder + topicmgr kafka.TopicManagerBuilder + producer kafka.ProducerBuilder } } @@ -355,14 +340,14 @@ func WithEmitterClientID(clientID string) EmitterOption { } // WithEmitterTopicManager defines a topic manager. -func WithEmitterTopicManagerBuilder(tmb TopicManagerBuilder) EmitterOption { +func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption { return func(o *eoptions) { o.builders.topicmgr = tmb } } // WithEmitterProducer replaces goka's default producer. Mainly for testing. -func WithEmitterProducer(pb ProducerBuilder) EmitterOption { +func WithEmitterProducer(pb kafka.ProducerBuilder) EmitterOption { return func(o *eoptions) { o.builders.producer = pb } diff --git a/processor_test.go b/processor_test.go index 29f0124c..3acce835 100644 --- a/processor_test.go +++ b/processor_test.go @@ -44,31 +44,31 @@ func syncWith(t *testing.T, ch chan kafka.Event, p ...int32) error { ch <- &kafka.NOP{Partition: -1} }) } -func createConsumerBuilder(c kafka.Consumer) ConsumerBuilder { +func createConsumerBuilder(c kafka.Consumer) kafka.ConsumerBuilder { return func(b []string, g, id string) (kafka.Consumer, error) { return c, nil } } -func createFailedConsumerBuilder() ConsumerBuilder { +func createFailedConsumerBuilder() kafka.ConsumerBuilder { return func(b []string, g, id string) (kafka.Consumer, error) { return nil, errors.New("failed creating consumer") } } -func createProducerBuilder(p kafka.Producer) ProducerBuilder { +func createProducerBuilder(p kafka.Producer) kafka.ProducerBuilder { return func(b []string, id string, hasher func() hash.Hash32) (kafka.Producer, error) { return p, nil } } -func createFailedProducerBuilder() ProducerBuilder { +func createFailedProducerBuilder() kafka.ProducerBuilder { return func(b []string, id string, hasher func() hash.Hash32) (kafka.Producer, error) { return nil, errors.New("failed creating producer") } } -func createTopicManagerBuilder(tm kafka.TopicManager) TopicManagerBuilder { +func createTopicManagerBuilder(tm kafka.TopicManager) kafka.TopicManagerBuilder { return func(b []string) (kafka.TopicManager, error) { return tm, nil } diff --git a/view_test.go b/view_test.go index a43806cf..fc4b77f0 100644 --- a/view_test.go +++ b/view_test.go @@ -266,14 +266,18 @@ func TestNewView(t *testing.T) { tm.EXPECT().Partitions(tableName(group)).Return(nil, errors.New("some error")), tm.EXPECT().Close(), ) - _, err = NewView(nil, GroupTable(group), new(codec.Bytes), WithViewConsumerBuilder(createConsumerBuilder(consumer)), WithViewTopicManager(tm)) + _, err = NewView(nil, GroupTable(group), new(codec.Bytes), + WithViewConsumerBuilder(createConsumerBuilder(consumer)), + WithViewTopicManagerBuilder(createTopicManagerBuilder(tm))) ensure.NotNil(t, err) gomock.InOrder( tm.EXPECT().Partitions(tableName(group)).Return([]int32{0, 1, 2}, nil), tm.EXPECT().Close(), ) - v, err := NewView(nil, GroupTable(group), new(codec.Bytes), WithViewConsumerBuilder(createConsumerBuilder(consumer)), WithViewTopicManager(tm)) + v, err := NewView(nil, GroupTable(group), new(codec.Bytes), + WithViewConsumerBuilder(createConsumerBuilder(consumer)), + WithViewTopicManagerBuilder(createTopicManagerBuilder(tm))) ensure.Nil(t, err) ensure.DeepEqual(t, v.topic, tableName(group)) ensure.DeepEqual(t, v.consumer, consumer) From 18de210becb07dcf0e2759bfa9867ac3c92bc7b3 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 10:02:30 +0100 Subject: [PATCH 05/12] move storage builder type into storage package --- options.go | 12 ++++-------- processor.go | 2 +- processor_test.go | 6 +++--- storage/builders.go | 12 ++++++++---- view.go | 2 +- view_test.go | 2 +- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/options.go b/options.go index 42133f4f..432044f2 100644 --- a/options.go +++ b/options.go @@ -15,10 +15,6 @@ import ( // The partition storage shall be updated in the callback. type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error -// StorageBuilder creates a local storage (a persistent cache) for a topic -// table. StorageBuilder creates one storage for each partition of the topic. -type StorageBuilder func(topic string, partition int32) (storage.Storage, error) - /////////////////////////////////////////////////////////////////////////////// // default values /////////////////////////////////////////////////////////////////////////////// @@ -78,7 +74,7 @@ type poptions struct { nilHandling NilHandling builders struct { - storage StorageBuilder + storage storage.Builder consumer kafka.ConsumerBuilder producer kafka.ProducerBuilder topicmgr kafka.TopicManagerBuilder @@ -101,7 +97,7 @@ func WithClientID(clientID string) ProcessorOption { } // WithStorageBuilder defines a builder for the storage of each partition. -func WithStorageBuilder(sb StorageBuilder) ProcessorOption { +func WithStorageBuilder(sb storage.Builder) ProcessorOption { return func(o *poptions) { o.builders.storage = sb } @@ -213,7 +209,7 @@ type voptions struct { hasher func() hash.Hash32 builders struct { - storage StorageBuilder + storage storage.Builder consumer kafka.ConsumerBuilder topicmgr kafka.TopicManagerBuilder } @@ -236,7 +232,7 @@ func WithViewCallback(cb UpdateCallback) ViewOption { } // WithViewStorageBuilder defines a builder for the storage of each partition. -func WithViewStorageBuilder(sb StorageBuilder) ViewOption { +func WithViewStorageBuilder(sb storage.Builder) ViewOption { return func(o *voptions) { o.builders.storage = sb } diff --git a/processor.go b/processor.go index 9eb9d7de..f6118ab9 100644 --- a/processor.go +++ b/processor.go @@ -61,7 +61,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) WithLogger(logger.Default()), WithUpdateCallback(DefaultUpdate), WithPartitionChannelSize(defaultPartitionChannelSize), - WithStorageBuilder(storage.DefaultStorageBuilder(DefaultProcessorStoragePath(gg.Group()))), + WithStorageBuilder(storage.DefaultBuilder(DefaultProcessorStoragePath(gg.Group()))), }, // user-defined options (may overwrite default ones) diff --git a/processor_test.go b/processor_test.go index 3acce835..3337202e 100644 --- a/processor_test.go +++ b/processor_test.go @@ -27,7 +27,7 @@ var ( rawCodec = new(codec.Bytes) ) -func nullStorageBuilder() StorageBuilder { +func nullStorageBuilder() storage.Builder { return func(topic string, partition int32) (storage.Storage, error) { return &storage.Null{}, nil } @@ -102,7 +102,7 @@ func createProcessorStateless(ctrl *gomock.Controller, consumer kafka.Consumer, return p } -func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor { +func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb storage.Builder) *Processor { tm := mock.NewMockTopicManager(ctrl) producer := mock.NewMockProducer(ctrl) @@ -137,7 +137,7 @@ func createProcessor(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consu return p } -func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb StorageBuilder) *Processor { +func createProcessorWithTable(t *testing.T, ctrl *gomock.Controller, consumer kafka.Consumer, npar int, sb storage.Builder) *Processor { tm := mock.NewMockTopicManager(ctrl) producer := mock.NewMockProducer(ctrl) diff --git a/storage/builders.go b/storage/builders.go index fbda1bae..4b0e6e95 100644 --- a/storage/builders.go +++ b/storage/builders.go @@ -8,9 +8,13 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" ) -// DefaultStorageBuilder builds a LevelDB storage with default configuration. +// Builder creates a local storage (a persistent cache) for a topic +// table. Builder creates one storage for each partition of the topic. +type Builder func(topic string, partition int32) (Storage, error) + +// DefaultBuilder builds a LevelDB storage with default configuration. // The database will be stored in the given path. -func DefaultStorageBuilder(path string) func(topic string, partition int32) (Storage, error) { +func DefaultBuilder(path string) Builder { return func(topic string, partition int32) (Storage, error) { fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition)) db, err := leveldb.OpenFile(fp, nil) @@ -21,9 +25,9 @@ func DefaultStorageBuilder(path string) func(topic string, partition int32) (Sto } } -// StorageBuilderWithOptions builds LevelDB storage with the given options and +// BuilderWithOptions builds LevelDB storage with the given options and // in the given path. -func StorageBuilderWithOptions(path string, opts *opt.Options) func(topic string, partition int32) (Storage, error) { +func BuilderWithOptions(path string, opts *opt.Options) Builder { return func(topic string, partition int32) (Storage, error) { fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition)) db, err := leveldb.OpenFile(fp, nil) diff --git a/view.go b/view.go index 3ba76000..3bf081c2 100644 --- a/view.go +++ b/view.go @@ -35,7 +35,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) WithViewLogger(logger.Default()), WithViewCallback(DefaultUpdate), WithViewPartitionChannelSize(defaultPartitionChannelSize), - WithViewStorageBuilder(storage.DefaultStorageBuilder(DefaultViewStoragePath())), + WithViewStorageBuilder(storage.DefaultBuilder(DefaultViewStoragePath())), }, // then the user passed options diff --git a/view_test.go b/view_test.go index fc4b77f0..92e57659 100644 --- a/view_test.go +++ b/view_test.go @@ -20,7 +20,7 @@ var ( recoveredMessages int ) -func createTestView(t *testing.T, consumer kafka.Consumer, sb StorageBuilder, tm kafka.TopicManager) *View { +func createTestView(t *testing.T, consumer kafka.Consumer, sb storage.Builder, tm kafka.TopicManager) *View { recoveredMessages = 0 opts := &voptions{ log: logger.Default(), From fca7d71d024c0d3f63e2e5d4ed944efdce99cf96 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 10:13:37 +0100 Subject: [PATCH 06/12] add ProducerBuilderWithConfig --- kafka/builders.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kafka/builders.go b/kafka/builders.go index 2f8b0bdf..7775b099 100644 --- a/kafka/builders.go +++ b/kafka/builders.go @@ -35,6 +35,16 @@ func DefaultProducerBuilder(brokers []string, clientID string, hasher func() has return NewProducer(brokers, &config.Config) } +// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library. +func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder { + return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { + partitioner := sarama.NewCustomHashPartitioner(hasher) + config.ClientID = clientID + config.Producer.Partitioner = partitioner + return NewProducer(brokers, &config.Config) + } +} + // TopicManagerBuilder creates a TopicManager to check partition counts and // create tables. type TopicManagerBuilder func(brokers []string) (TopicManager, error) From 8def3484b502c75db0e6a1801aefc949192051ad Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 11:07:20 +0100 Subject: [PATCH 07/12] simplify CreateDefaultConfig --- kafka/builders.go | 10 ++++------ kafka/config.go | 12 ++---------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/kafka/builders.go b/kafka/builders.go index 7775b099..1bd334b3 100644 --- a/kafka/builders.go +++ b/kafka/builders.go @@ -5,7 +5,6 @@ import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - metrics "github.com/rcrowley/go-metrics" ) // ConsumerBuilder creates a Kafka consumer. @@ -13,7 +12,7 @@ type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, e // DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) { - config := CreateDefaultSaramaConfig(clientID, nil, metrics.DefaultRegistry) + config := CreateDefaultConfig(clientID) return NewSaramaConsumer(brokers, group, config) } @@ -30,17 +29,16 @@ type ProducerBuilder func(brokers []string, clientID string, hasher func() hash. // DefaultProducerBuilder creates a Kafka producer using the Sarama library. func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { - partitioner := sarama.NewCustomHashPartitioner(hasher) - config := CreateDefaultSaramaConfig(clientID, partitioner, metrics.DefaultRegistry) + config := CreateDefaultConfig(clientID) + config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher) return NewProducer(brokers, &config.Config) } // ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library. func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder { return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { - partitioner := sarama.NewCustomHashPartitioner(hasher) config.ClientID = clientID - config.Producer.Partitioner = partitioner + config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher) return NewProducer(brokers, &config.Config) } } diff --git a/kafka/config.go b/kafka/config.go index 99dc5aeb..a5cb6233 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -3,11 +3,10 @@ package kafka import ( "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" - metrics "github.com/rcrowley/go-metrics" ) -// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values. -func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config { +// CreateDefaultConfig creates a (bsm) sarama configuration with default values. +func CreateDefaultConfig(clientID string) *cluster.Config { config := cluster.NewConfig() config.Version = sarama.V0_10_1_0 @@ -30,12 +29,5 @@ func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerCo // consumer group configuration config.Group.Return.Notifications = true - // register registry to get kafka metrics - config.Config.MetricRegistry = registry - - // set partitioner - if partitioner != nil { - config.Producer.Partitioner = partitioner - } return config } From 2d57eaebbeb5ed7ff678184eb22530284a6f9c80 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 14:04:48 +0100 Subject: [PATCH 08/12] add builders for zookeeper-based topic manager --- kafka/builders.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/kafka/builders.go b/kafka/builders.go index 1bd334b3..97943363 100644 --- a/kafka/builders.go +++ b/kafka/builders.go @@ -52,3 +52,19 @@ type TopicManagerBuilder func(brokers []string) (TopicManager, error) func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) { return NewSaramaTopicManager(brokers) } + +// ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to +// check partition counts and create tables. +func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder { + return func([]string) (TopicManager, error) { + return NewTopicManager(servers, NewTopicManagerConfig()) + } +} + +// ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to +// check partition counts and create tables given a topic configuration. +func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder { + return func([]string) (TopicManager, error) { + return NewTopicManager(servers, config) + } +} From 02a054ff84bef46a8e59c538efb2e7c793a3297a Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 23 Feb 2018 15:00:58 +0100 Subject: [PATCH 09/12] rename CreateDefaultConfig as NewConfig --- kafka/builders.go | 6 ++++-- kafka/config.go | 6 ++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/builders.go b/kafka/builders.go index 97943363..b1228ee1 100644 --- a/kafka/builders.go +++ b/kafka/builders.go @@ -12,7 +12,8 @@ type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, e // DefaultConsumerBuilder creates a Kafka consumer using the Sarama library. func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) { - config := CreateDefaultConfig(clientID) + config := NewConfig() + config.ClientID = clientID return NewSaramaConsumer(brokers, group, config) } @@ -29,7 +30,8 @@ type ProducerBuilder func(brokers []string, clientID string, hasher func() hash. // DefaultProducerBuilder creates a Kafka producer using the Sarama library. func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) { - config := CreateDefaultConfig(clientID) + config := NewConfig() + config.ClientID = clientID config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher) return NewProducer(brokers, &config.Config) } diff --git a/kafka/config.go b/kafka/config.go index a5cb6233..2bb691a1 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -5,12 +5,10 @@ import ( cluster "github.com/bsm/sarama-cluster" ) -// CreateDefaultConfig creates a (bsm) sarama configuration with default values. -func CreateDefaultConfig(clientID string) *cluster.Config { +// NewConfig creates a (bsm) sarama configuration with default values. +func NewConfig() *cluster.Config { config := cluster.NewConfig() - config.Version = sarama.V0_10_1_0 - config.ClientID = clientID // consumer configuration config.Consumer.Return.Errors = true From 0d0b663ac8aa2a1b70fab2822aa83de394f95cda Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Sun, 25 Feb 2018 12:54:28 +0100 Subject: [PATCH 10/12] add memory builder --- storage/builders.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/storage/builders.go b/storage/builders.go index 4b0e6e95..3e3a44f2 100644 --- a/storage/builders.go +++ b/storage/builders.go @@ -37,3 +37,10 @@ func BuilderWithOptions(path string, opts *opt.Options) Builder { return New(db) } } + +// MemoryBuilder builds in-memory storage. +func MemoryBuilder() Builder { + return func(topic string, partition int32) (Storage, error) { + return NewMemory() + } +} From 8530c469ce84a8e2cea82d1dc635f38a2ed16c2e Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Sun, 25 Feb 2018 20:20:45 +0100 Subject: [PATCH 11/12] add missing return nil --- storage/builders.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/builders.go b/storage/builders.go index 3e3a44f2..e1db675c 100644 --- a/storage/builders.go +++ b/storage/builders.go @@ -41,6 +41,6 @@ func BuilderWithOptions(path string, opts *opt.Options) Builder { // MemoryBuilder builds in-memory storage. func MemoryBuilder() Builder { return func(topic string, partition int32) (Storage, error) { - return NewMemory() + return NewMemory(), nil } } From d51a470e4218580a87a13b166c4ce5e62f24c0e8 Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Tue, 27 Feb 2018 14:59:56 +0100 Subject: [PATCH 12/12] fix emitter producer builder option --- options.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/options.go b/options.go index 432044f2..fb5a0c5f 100644 --- a/options.go +++ b/options.go @@ -335,15 +335,15 @@ func WithEmitterClientID(clientID string) EmitterOption { } } -// WithEmitterTopicManager defines a topic manager. +// WithEmitterTopicManager replaces the default topic manager builder. func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption { return func(o *eoptions) { o.builders.topicmgr = tmb } } -// WithEmitterProducer replaces goka's default producer. Mainly for testing. -func WithEmitterProducer(pb kafka.ProducerBuilder) EmitterOption { +// WithEmitterProducerBuilder replaces the default producer builder. +func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption { return func(o *eoptions) { o.builders.producer = pb }