Skip to content

Commit

Permalink
Merge pull request #100 from lovoo/bugfix/start-streams-from-newest
Browse files Browse the repository at this point in the history
consume streams from newest
  • Loading branch information
db7 committed Mar 7, 2018
2 parents e931b46 + b62892b commit b7b94ac
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
4 changes: 3 additions & 1 deletion kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ func NewConfig() *cluster.Config {

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest

// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
Expand Down
7 changes: 6 additions & 1 deletion kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"time"

"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/lovoo/goka/multierr"
)
Expand Down Expand Up @@ -62,7 +63,11 @@ func NewSaramaConsumer(brokers []string, group string, config *cluster.Config) (
if err != nil {
return nil, err
}
c, err := newSimpleConsumer(brokers, events, &config.Config)

// since simple consumer only handle tables, be sure to start from oldest
simpleConfig := config.Config // copy config
simpleConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
c, err := newSimpleConsumer(brokers, events, &simpleConfig)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b7b94ac

Please sign in to comment.