Skip to content

Commit

Permalink
let producerBuilder set partitioner in sarama.Config
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Sep 19, 2017
1 parent 78edea6 commit f26e25e
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 42 deletions.
5 changes: 2 additions & 3 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka
import (
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -51,9 +51,8 @@ type saramaConsumer struct {
}

// NewSaramaConsumer creates a new Consumer using sarama
func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error) {
func NewSaramaConsumer(brokers []string, group string, config *cluster.Config, registry metrics.Registry) (Consumer, error) {
events := make(chan Event, defaultChannelBufferSize)
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)

g, err := newGroupConsumer(brokers, group, events, config)
if err != nil {
Expand Down
32 changes: 0 additions & 32 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,8 @@ import (

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = initialOffset
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries

// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

return config
}

type groupConsumer struct {
brokers []string
config *cluster.Config
Expand Down
7 changes: 2 additions & 5 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
// Producer abstracts the kafka producer
type Producer interface {
// Emit sends a message to topic.
// TODO (franz): this method should return a promise, instead of getting one.
// Otherwise a callback is sufficient
Emit(topic string, key string, value []byte) *Promise
Close() error
}
Expand All @@ -25,9 +23,8 @@ type producer struct {
}

// NewProducer creates new kafka producer for passed brokers.
func NewProducer(brokers []string, registry metrics.Registry, log logger.Logger) (Producer, error) {
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)
aprod, err := sarama.NewAsyncProducer(brokers, &config.Config)
func NewProducer(brokers []string, config *sarama.Config, registry metrics.Registry, log logger.Logger) (Producer, error) {
aprod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("Failed to start Sarama producer: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Pr
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)

func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
config := kafka.CreateDefaultSaramaConfig("goka", registry)
return kafka.NewSaramaConsumer(brokers, group, registry)
}

Expand Down Expand Up @@ -142,7 +143,7 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption {
}
}

// WithConsumer replaces goka's default consumer. Mainly for testing.
// WithConsumer replaces goka's default consumer.
func WithConsumer(c kafka.Consumer) ProcessorOption {
return func(o *poptions) {
o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
Expand All @@ -154,7 +155,7 @@ func WithConsumer(c kafka.Consumer) ProcessorOption {
}
}

// WithProducer replaces goka'S default producer. Mainly for testing.
// WithProducer replaces goka's default producer.
func WithProducer(p kafka.Producer) ProcessorOption {
return func(o *poptions) {
o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
Expand Down

0 comments on commit f26e25e

Please sign in to comment.