Skip to content

Commit

Permalink
Merge pull request #92 from lovoo/make-builders-builders
Browse files Browse the repository at this point in the history
Make builders builders
  • Loading branch information
db7 committed Feb 27, 2018
2 parents 6a289b6 + d51a470 commit 879bd60
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 163 deletions.
2 changes: 1 addition & 1 deletion emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO
return nil, fmt.Errorf(errApplyOptions, err)
}

prod, err := opts.builders.producer(brokers)
prod, err := opts.builders.producer(brokers, opts.clientID, opts.hasher)
if err != nil {
return nil, fmt.Errorf(errBuildProducer, err)
}
Expand Down
72 changes: 72 additions & 0 deletions kafka/builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package kafka

import (
"hash"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)

// ConsumerBuilder creates a Kafka consumer.
type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, error)

// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) {
config := NewConfig()
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}

// ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder {
return func(brokers []string, group, clientID string) (Consumer, error) {
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}
}

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := NewConfig()
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
// This topic manager cannot create topics.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
return NewSaramaTopicManager(brokers)
}

// ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables.
func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, NewTopicManagerConfig())
}
}

// ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables given a topic configuration.
func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, config)
}
}
14 changes: 2 additions & 12 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package kafka
import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config {
// NewConfig creates a (bsm) sarama configuration with default values.
func NewConfig() *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID

// consumer configuration
config.Consumer.Return.Errors = true
Expand All @@ -30,12 +27,5 @@ func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerCo
// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

// set partitioner
if partitioner != nil {
config.Producer.Partitioner = partitioner
}
return config
}
15 changes: 3 additions & 12 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/logger"
)

// Producer abstracts the kafka producer
Expand All @@ -15,21 +14,19 @@ type Producer interface {
}

type producer struct {
log logger.Logger
producer sarama.AsyncProducer
stop chan bool
done chan bool
}

// NewProducer creates new kafka producer for passed brokers.
func NewProducer(brokers []string, config *sarama.Config, log logger.Logger) (Producer, error) {
func NewProducer(brokers []string, config *sarama.Config) (Producer, error) {
aprod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("Failed to start Sarama producer: %v", err)
}

p := producer{
log: log,
producer: aprod,
stop: make(chan bool),
done: make(chan bool),
Expand Down Expand Up @@ -66,17 +63,11 @@ func (p *producer) run() {
return

case err := <-p.producer.Errors():
promise, is := err.Msg.Metadata.(*Promise)
if !is {
p.log.Panicf("invalid metadata type. expected *Promise, got %T", err.Msg.Metadata)
}
promise := err.Msg.Metadata.(*Promise)
promise.Finish(err.Err)

case msg := <-p.producer.Successes():
promise, is := msg.Metadata.(*Promise)
if !is {
p.log.Panicf("invalid metadata type. expected *Promise, got %T", msg.Metadata)
}
promise := msg.Metadata.(*Promise)
promise.Finish(nil)
}
}
Expand Down
13 changes: 11 additions & 2 deletions kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goka

import (
"fmt"
"hash"
"sync"

"github.com/facebookgo/ensure"
Expand Down Expand Up @@ -124,12 +125,20 @@ func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
return km.storage, nil
}),
WithConsumerBuilder(km.consumerBuilder),
WithProducer(km.producerMock),
WithTopicManager(km.topicMgrMock),
WithProducerBuilder(km.producerBuilder),
WithTopicManagerBuilder(km.topicManagerBuilder),
WithPartitionChannelSize(0),
}
}

func (km *KafkaMock) topicManagerBuilder(brokers []string) (kafka.TopicManager, error) {
return km.topicMgrMock, nil
}

func (km *KafkaMock) producerBuilder(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) {
return km.producerMock, nil
}

func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) {
return km.consumerMock, nil
}
Expand Down
Loading

0 comments on commit 879bd60

Please sign in to comment.