Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make builders builders #92

Merged
merged 12 commits into from
Feb 27, 2018
2 changes: 1 addition & 1 deletion emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterO
return nil, fmt.Errorf(errApplyOptions, err)
}

prod, err := opts.builders.producer(brokers)
prod, err := opts.builders.producer(brokers, opts.clientID, opts.hasher)
if err != nil {
return nil, fmt.Errorf(errBuildProducer, err)
}
Expand Down
72 changes: 72 additions & 0 deletions kafka/builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package kafka

import (
"hash"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)

// ConsumerBuilder creates a Kafka consumer.
type ConsumerBuilder func(brokers []string, group, clientID string) (Consumer, error)

// DefaultConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerBuilder(brokers []string, group, clientID string) (Consumer, error) {
config := NewConfig()
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}

// ConsumerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ConsumerBuilderWithConfig(config *cluster.Config) ConsumerBuilder {
return func(brokers []string, group, clientID string) (Consumer, error) {
config.ClientID = clientID
return NewSaramaConsumer(brokers, group, config)
}
}

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := NewConfig()
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *cluster.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config.Config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
// This topic manager cannot create topics.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
return NewSaramaTopicManager(brokers)
}

// ZKTopicManagerBuilder creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables.
func ZKTopicManagerBuilder(servers []string) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, NewTopicManagerConfig())
}
}

// ZKTopicManagerBuilderWithConfig creates a TopicManager that connects with ZooKeeper to
// check partition counts and create tables given a topic configuration.
func ZKTopicManagerBuilderWithConfig(servers []string, config *TopicManagerConfig) TopicManagerBuilder {
return func([]string) (TopicManager, error) {
return NewTopicManager(servers, config)
}
}
14 changes: 2 additions & 12 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package kafka
import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config {
// NewConfig creates a (bsm) sarama configuration with default values.
func NewConfig() *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID

// consumer configuration
config.Consumer.Return.Errors = true
Expand All @@ -30,12 +27,5 @@ func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerCo
// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

// set partitioner
if partitioner != nil {
config.Producer.Partitioner = partitioner
}
return config
}
15 changes: 3 additions & 12 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/logger"
)

// Producer abstracts the kafka producer
Expand All @@ -15,21 +14,19 @@ type Producer interface {
}

type producer struct {
log logger.Logger
producer sarama.AsyncProducer
stop chan bool
done chan bool
}

// NewProducer creates new kafka producer for passed brokers.
func NewProducer(brokers []string, config *sarama.Config, log logger.Logger) (Producer, error) {
func NewProducer(brokers []string, config *sarama.Config) (Producer, error) {
aprod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("Failed to start Sarama producer: %v", err)
}

p := producer{
log: log,
producer: aprod,
stop: make(chan bool),
done: make(chan bool),
Expand Down Expand Up @@ -66,17 +63,11 @@ func (p *producer) run() {
return

case err := <-p.producer.Errors():
promise, is := err.Msg.Metadata.(*Promise)
if !is {
p.log.Panicf("invalid metadata type. expected *Promise, got %T", err.Msg.Metadata)
}
promise := err.Msg.Metadata.(*Promise)
promise.Finish(err.Err)

case msg := <-p.producer.Successes():
promise, is := msg.Metadata.(*Promise)
if !is {
p.log.Panicf("invalid metadata type. expected *Promise, got %T", msg.Metadata)
}
promise := msg.Metadata.(*Promise)
promise.Finish(nil)
}
}
Expand Down
13 changes: 11 additions & 2 deletions kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goka

import (
"fmt"
"hash"
"sync"

"github.com/facebookgo/ensure"
Expand Down Expand Up @@ -124,12 +125,20 @@ func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
return km.storage, nil
}),
WithConsumerBuilder(km.consumerBuilder),
WithProducer(km.producerMock),
WithTopicManager(km.topicMgrMock),
WithProducerBuilder(km.producerBuilder),
WithTopicManagerBuilder(km.topicManagerBuilder),
WithPartitionChannelSize(0),
}
}

func (km *KafkaMock) topicManagerBuilder(brokers []string) (kafka.TopicManager, error) {
return km.topicMgrMock, nil
}

func (km *KafkaMock) producerBuilder(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) {
return km.producerMock, nil
}

func (km *KafkaMock) consumerBuilder(b []string, group, clientID string) (kafka.Consumer, error) {
return km.consumerMock, nil
}
Expand Down
Loading