Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

[WIP] Reimplementation #72

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
56986f5
[WIP] Reimplementation
wvanbergen Aug 16, 2015
9e3ea5c
Implement retry logic for setting up watches for the subscription and…
wvanbergen Aug 16, 2015
b3d680e
Refactor initialize and shutdown logic of consumerManager
wvanbergen Aug 16, 2015
892b28c
Remove CommitInterval setting - sarama already has this.
wvanbergen Aug 16, 2015
9b0844e
Implement offset management using the kafka offset manager in sarama.
wvanbergen Aug 16, 2015
d381f35
Register subscription properly in Zookeeper.
wvanbergen Aug 16, 2015
d8150b2
Add filename to logging done by the example application.
wvanbergen Aug 16, 2015
de7f1f1
Improve documentation.
wvanbergen Aug 16, 2015
030e249
Add distribution_test to test the proper distribution of partitions o…
wvanbergen Aug 16, 2015
d4a1150
Add an example and some integration tests.
wvanbergen Aug 17, 2015
1c4c4f0
Small tweaks.
wvanbergen Aug 17, 2015
9ce1554
Improve retry logic for partition manager.
wvanbergen Aug 17, 2015
67d7ea4
Add whitelist and blacklist subscriptions
wvanbergen Aug 17, 2015
c56bb38
Use our own logger, which defaults to sarama.Logger.
wvanbergen Aug 17, 2015
80ad22f
Improve logging
wvanbergen Aug 17, 2015
c99d64c
Refactor integration tests.
wvanbergen Aug 18, 2015
d8ff276
Address code review comments
wvanbergen Aug 18, 2015
ad2a6d7
Clean up example program.
wvanbergen Aug 18, 2015
b18b031
Close OffsetManager
wvanbergen Aug 31, 2015
98e6727
Update partition_consumer to reflect changes in sarama’s PartitionOff…
wvanbergen Aug 31, 2015
9abf9ee
Address go vet issues
wvanbergen Aug 31, 2015
e255101
Add 1m timeout to tests
wvanbergen Sep 1, 2015
8e92066
Use ReceiveTime(-1) when committing offsets to Kafka
wvanbergen Oct 6, 2015
3b5fa72
Enable debug logging during CI runs.
wvanbergen Oct 7, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/examples/consumergroup/consumergroup
/examples/kafkaconsumer/kafkaconsumer
/consumergroup/consumergroup.test
/kafkaconsumer/kafkaconsumer.test
/tools/transferoffsets/transferoffsets
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ before_install:
script:
- go build ./...
- go vet ./...
- GORACE="halt_on_error=1" go test -v -race ./...
- GORACE="halt_on_error=1" DEBUG=1 go test -v -race ./... -timeout 1m
73 changes: 73 additions & 0 deletions examples/kafkaconsumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main

import (
"flag"
"log"
"os"
"os/signal"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/kafkaconsumer"
)

const (
DefaultKafkaTopics = "test_topic"
DefaultConsumerGroup = "consumer_example.go"
)

var (
consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume")
zookeeper = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")
)

func init() {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Lshortfile)
kafkaconsumer.Logger = log.New(os.Stdout, "[kafkaconsumer] ", log.LstdFlags|log.Lshortfile)
}

func main() {
flag.Parse()

if *zookeeper == "" {
flag.PrintDefaults()
os.Exit(1)
}

config := kafkaconsumer.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest

subscription := kafkaconsumer.TopicSubscription(strings.Split(*kafkaTopicsCSV, ",")...)
consumer, err := kafkaconsumer.Join(*consumerGroup, subscription, *zookeeper, config)
if err != nil {
log.Fatalln(err)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
consumer.Interrupt()
}()

go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()

var count int64
for message := range consumer.Messages() {
// Simulate processing that takes some time
time.Sleep(10 * time.Millisecond)

// Acknowledge that we have processed the message
consumer.Ack(message)

count++
}

log.Printf("Processed %d events.", count)
}
60 changes: 60 additions & 0 deletions kafkaconsumer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package kafkaconsumer

import (
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kazoo-go"
)

type Config struct {
*sarama.Config

Zookeeper *kazoo.Config

MaxProcessingTime time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.

Offsets struct {
Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
}
}

func NewConfig() *Config {
config := &Config{}
config.Config = sarama.NewConfig()
config.Zookeeper = kazoo.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.MaxProcessingTime = 60 * time.Second

return config
}

func (cgc *Config) Validate() error {
if cgc.Zookeeper.Timeout <= 0 {
return sarama.ConfigurationError("Zookeeper.Timeout should have a duration > 0")
}

if cgc.MaxProcessingTime <= 0 {
return sarama.ConfigurationError("MaxProcessingTime should have a duration > 0")
}

if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest {
return sarama.ConfigurationError("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.")
}

if cgc.Config != nil {
if err := cgc.Config.Validate(); err != nil {
return err
}
}

return nil
}

var Logger sarama.StdLogger

func init() {
if Logger == nil {
Logger = sarama.Logger
}
}
Loading