Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

let producerBuilder set partitioner in sarama.Config #36

Merged
merged 2 commits into from
Sep 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ func TestContext_Lookup(t *testing.T) {
},
},
},
opts: &voptions{
hasher: DefaultHasher(),
},
},
},
}
Expand Down
42 changes: 42 additions & 0 deletions kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafka

import (
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultSaramaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultSaramaConfig(clientID string, partitioner sarama.PartitionerConstructor, registry metrics.Registry) *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries

// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

// set partitioner
if partitioner != nil {
config.Producer.Partitioner = partitioner
}
return config
}
5 changes: 2 additions & 3 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kafka
import (
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

Expand Down Expand Up @@ -51,9 +51,8 @@ type saramaConsumer struct {
}

// NewSaramaConsumer creates a new Consumer using sarama
func NewSaramaConsumer(brokers []string, group string, registry metrics.Registry) (Consumer, error) {
func NewSaramaConsumer(brokers []string, group string, config *cluster.Config, registry metrics.Registry) (Consumer, error) {
events := make(chan Event, defaultChannelBufferSize)
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)

g, err := newGroupConsumer(brokers, group, events, config)
if err != nil {
Expand Down
32 changes: 0 additions & 32 deletions kafka/group_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,8 @@ import (

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
metrics "github.com/rcrowley/go-metrics"
)

// CreateDefaultKafkaConfig creates a (bsm) sarama configuration with default values.
func CreateDefaultKafkaConfig(clientID string, initialOffset int64, registry metrics.Registry) *cluster.Config {
config := cluster.NewConfig()

config.Version = sarama.V0_10_1_0
config.ClientID = clientID
config.ChannelBufferSize = defaultChannelBufferSize

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = initialOffset
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries

// consumer group configuration
config.Group.Return.Notifications = true

// register registry to get kafka metrics
config.Config.MetricRegistry = registry

return config
}

type groupConsumer struct {
brokers []string
config *cluster.Config
Expand Down
7 changes: 2 additions & 5 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
// Producer abstracts the kafka producer
type Producer interface {
// Emit sends a message to topic.
// TODO (franz): this method should return a promise, instead of getting one.
// Otherwise a callback is sufficient
Emit(topic string, key string, value []byte) *Promise
Close() error
}
Expand All @@ -25,9 +23,8 @@ type producer struct {
}

// NewProducer creates new kafka producer for passed brokers.
func NewProducer(brokers []string, registry metrics.Registry, log logger.Logger) (Producer, error) {
config := CreateDefaultKafkaConfig("whatever", sarama.OffsetOldest, registry)
aprod, err := sarama.NewAsyncProducer(brokers, &config.Config)
func NewProducer(brokers []string, config *sarama.Config, registry metrics.Registry, log logger.Logger) (Producer, error) {
aprod, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
return nil, fmt.Errorf("Failed to start Sarama producer: %v", err)
}
Expand Down
64 changes: 57 additions & 7 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package goka

import (
"fmt"
"hash"
"hash/fnv"
"path/filepath"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/storage"
Expand Down Expand Up @@ -64,17 +67,28 @@ func DefaultStorageBuilder(path string) StorageBuilder {
}
}

// DefaultHasher returns an FNV hasher builder to assign keys to partitions.
func DefaultHasher() func() hash.Hash32 {
return func() hash.Hash32 {
return fnv.New32a()
}

}

type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error)
type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)

func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
return kafka.NewSaramaConsumer(brokers, group, registry)
config := kafka.CreateDefaultSaramaConfig("goka", nil, registry)
return kafka.NewSaramaConsumer(brokers, group, config, registry)
}

func defaultProducerBuilder(log logger.Logger) producerBuilder {
func defaultProducerBuilder(hasher func() hash.Hash32, log logger.Logger) producerBuilder {
return func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
return kafka.NewProducer(brokers, registry, log)
partitioner := sarama.NewCustomHashPartitioner(hasher)
config := kafka.CreateDefaultSaramaConfig("goka", partitioner, registry)
return kafka.NewProducer(brokers, &config.Config, registry, log)
}
}

Expand All @@ -97,6 +111,7 @@ type poptions struct {
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -142,7 +157,7 @@ func WithTopicManager(tm kafka.TopicManager) ProcessorOption {
}
}

// WithConsumer replaces goka's default consumer. Mainly for testing.
// WithConsumer replaces goka's default consumer.
func WithConsumer(c kafka.Consumer) ProcessorOption {
return func(o *poptions) {
o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
Expand All @@ -154,7 +169,7 @@ func WithConsumer(c kafka.Consumer) ProcessorOption {
}
}

// WithProducer replaces goka'S default producer. Mainly for testing.
// WithProducer replaces goka's default producer.
func WithProducer(p kafka.Producer) ProcessorOption {
return func(o *poptions) {
o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
Expand Down Expand Up @@ -201,13 +216,24 @@ func WithRegistry(r metrics.Registry) ProcessorOption {
}
}

// WithHasher sets the hash function that assigns keys to partitions.
func WithHasher(hasher func() hash.Hash32) ProcessorOption {
return func(o *poptions) {
o.hasher = hasher
}
}

func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID

for _, o := range opts {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// StorageBuilder should always be set as a default option in NewProcessor
if opt.builders.storage == nil {
return fmt.Errorf("StorageBuilder not set")
Expand All @@ -216,7 +242,7 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.builders.consumer = defaultConsumerBuilder
}
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder(opt.log)
opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log)
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
Expand Down Expand Up @@ -246,6 +272,7 @@ type voptions struct {
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int
hasher func() hash.Hash32

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -330,11 +357,22 @@ func WithViewPartitionChannelSize(size int) ViewOption {
}
}

// WithViewHasher sets the hash function that assigns keys to partitions.
func WithViewHasher(hasher func() hash.Hash32) ViewOption {
return func(o *voptions) {
o.hasher = hasher
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
for _, o := range opts {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// StorageBuilder should always be set as a default option in NewView
if opt.builders.storage == nil {
return fmt.Errorf("StorageBuilder not set")
Expand Down Expand Up @@ -372,6 +410,7 @@ type eoptions struct {

registry metrics.Registry
codec Codec
hasher func() hash.Hash32

builders struct {
topicmgr topicmgrBuilder
Expand Down Expand Up @@ -428,6 +467,13 @@ func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption {
}
}

// WithEmitterHasher sets the hash function that assigns keys to partitions.
func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption {
return func(o *eoptions) {
o.hasher = hasher
}
}

func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand All @@ -436,9 +482,13 @@ func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
o(opt)
}

if opt.hasher == nil {
opt.hasher = DefaultHasher()
}

// config not set, use default one
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder(opt.log)
opt.builders.producer = defaultProducerBuilder(opt.hasher, opt.log)
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
Expand Down
3 changes: 1 addition & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"hash/fnv"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -257,7 +256,7 @@ func (g *Processor) hash(key string) (int32, error) {
// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
hasher := fnv.New32a()
hasher := g.opts.hasher()

_, err := hasher.Write([]byte(key))
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,12 +1010,12 @@ func TestProcessor_HasGet(t *testing.T) {
}

func TestProcessor_HasGetStateless(t *testing.T) {
p := &Processor{graph: DefineGroup(group)}
p := &Processor{graph: DefineGroup(group), opts: &poptions{hasher: DefaultHasher()}}
_, err := p.Get("item1")
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "stateless processor")

p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: new(partition),
}
Expand All @@ -1024,7 +1024,7 @@ func TestProcessor_HasGetStateless(t *testing.T) {
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "0 partitions")

p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: new(partition),
}
Expand All @@ -1037,7 +1037,7 @@ func TestProcessor_HasGetStateless(t *testing.T) {
defer ctrl.Finish()

st := mock.NewMockStorage(ctrl)
p = &Processor{graph: DefineGroup(group, Persist(c))}
p = &Processor{graph: DefineGroup(group, Persist(c)), opts: &poptions{hasher: DefaultHasher()}}
p.partitions = map[int32]*partition{
0: &partition{log: logger.Default(), st: &storageProxy{Storage: st, partition: 0}},
}
Expand Down
3 changes: 1 addition & 2 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package goka
import (
"errors"
"fmt"
"hash/fnv"
"sync"

"github.com/lovoo/goka/kafka"
Expand Down Expand Up @@ -188,7 +187,7 @@ func (v *View) hash(key string) (int32, error) {
// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
hasher := fnv.New32a()
hasher := v.opts.hasher()

_, err := hasher.Write([]byte(key))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func createTestView(t *testing.T, consumer kafka.Consumer, sb StorageBuilder, tm
},
registry: metrics.DefaultRegistry,
gokaRegistry: metrics.DefaultRegistry,
hasher: DefaultHasher(),
}
opts.builders.storage = sb
opts.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) {
Expand Down Expand Up @@ -222,7 +223,7 @@ func TestView_StartStopWithError(t *testing.T) {
}

func TestView_GetErrors(t *testing.T) {
v := &View{}
v := &View{opts: &voptions{hasher: DefaultHasher()}}
_, err := v.Get("hey")
ensure.NotNil(t, err)

Expand Down