diff --git a/.gitignore b/.gitignore index ed0c7ba..8d8f794 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /examples/consumergroup/consumergroup +/examples/kafkaconsumer/kafkaconsumer /consumergroup/consumergroup.test +/kafkaconsumer/kafkaconsumer.test /tools/transferoffsets/transferoffsets diff --git a/.travis.yml b/.travis.yml index b41b753..868edba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,4 +24,4 @@ before_install: script: - go build ./... - go vet ./... -- GORACE="halt_on_error=1" go test -v -race ./... +- GORACE="halt_on_error=1" DEBUG=1 go test -v -race ./... -timeout 1m diff --git a/examples/kafkaconsumer/main.go b/examples/kafkaconsumer/main.go new file mode 100644 index 0000000..5b06e82 --- /dev/null +++ b/examples/kafkaconsumer/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "flag" + "log" + "os" + "os/signal" + "strings" + "time" + + "github.com/Shopify/sarama" + "github.com/wvanbergen/kafka/kafkaconsumer" +) + +const ( + DefaultKafkaTopics = "test_topic" + DefaultConsumerGroup = "consumer_example.go" +) + +var ( + consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing") + kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume") + zookeeper = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)") +) + +func init() { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Lshortfile) + kafkaconsumer.Logger = log.New(os.Stdout, "[kafkaconsumer] ", log.LstdFlags|log.Lshortfile) +} + +func main() { + flag.Parse() + + if *zookeeper == "" { + flag.PrintDefaults() + os.Exit(1) + } + + config := kafkaconsumer.NewConfig() + config.Offsets.Initial = sarama.OffsetNewest + + subscription := kafkaconsumer.TopicSubscription(strings.Split(*kafkaTopicsCSV, ",")...) + consumer, err := kafkaconsumer.Join(*consumerGroup, subscription, *zookeeper, config) + if err != nil { + log.Fatalln(err) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + consumer.Interrupt() + }() + + go func() { + for err := range consumer.Errors() { + log.Println(err) + } + }() + + var count int64 + for message := range consumer.Messages() { + // Simulate processing that takes some time + time.Sleep(10 * time.Millisecond) + + // Acknowledge that we have processed the message + consumer.Ack(message) + + count++ + } + + log.Printf("Processed %d events.", count) +} diff --git a/kafkaconsumer/config.go b/kafkaconsumer/config.go new file mode 100644 index 0000000..869979c --- /dev/null +++ b/kafkaconsumer/config.go @@ -0,0 +1,60 @@ +package kafkaconsumer + +import ( + "time" + + "github.com/Shopify/sarama" + "github.com/wvanbergen/kazoo-go" +) + +type Config struct { + *sarama.Config + + Zookeeper *kazoo.Config + + MaxProcessingTime time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute. + + Offsets struct { + Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest. + } +} + +func NewConfig() *Config { + config := &Config{} + config.Config = sarama.NewConfig() + config.Zookeeper = kazoo.NewConfig() + config.Offsets.Initial = sarama.OffsetOldest + config.MaxProcessingTime = 60 * time.Second + + return config +} + +func (cgc *Config) Validate() error { + if cgc.Zookeeper.Timeout <= 0 { + return sarama.ConfigurationError("Zookeeper.Timeout should have a duration > 0") + } + + if cgc.MaxProcessingTime <= 0 { + return sarama.ConfigurationError("MaxProcessingTime should have a duration > 0") + } + + if cgc.Offsets.Initial != sarama.OffsetOldest && cgc.Offsets.Initial != sarama.OffsetNewest { + return sarama.ConfigurationError("Offsets.Initial should be sarama.OffsetOldest or sarama.OffsetNewest.") + } + + if cgc.Config != nil { + if err := cgc.Config.Validate(); err != nil { + return err + } + } + + return nil +} + +var Logger sarama.StdLogger + +func init() { + if Logger == nil { + Logger = sarama.Logger + } +} diff --git a/kafkaconsumer/consumer_manager.go b/kafkaconsumer/consumer_manager.go new file mode 100644 index 0000000..1a076a4 --- /dev/null +++ b/kafkaconsumer/consumer_manager.go @@ -0,0 +1,427 @@ +package kafkaconsumer + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/samuel/go-zookeeper/zk" + "github.com/wvanbergen/kazoo-go" + "gopkg.in/tomb.v1" +) + +// Consumer represents a consumer instance and is the main interface to work with as a consumer +// of this library. +type Consumer interface { + // Interrups will initiate the shutdown procedure of the consumer, and return immediately. + // When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory. + Interrupt() + + // Closes will start the shutdown procedure for the consumer and wait for it to complete. + // When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory. + Close() error + + // Messages returns a channel that you can read to obtain messages from Kafka to process. + // Every message that you receive from this channel should be sent to Ack after it has been processed. + Messages() <-chan *sarama.ConsumerMessage + + // Error returns a channel that you can read to obtain errors that occur. + Errors() <-chan error + + // Ack marks a message as processed, indicating that the message offset can be committed + // for the message's partition by the offset manager. Note that the offset manager may decide + // not to commit every offset immediately for efficiency reasons. Calling Close or Interrupt + // will make sure that the last offset provided to this function will be flushed to storage. + // You have to provide the messages in the same order as you received them from the Messages + // channel. + Ack(*sarama.ConsumerMessage) +} + +// Join joins a Kafka consumer group, and returns a Consumer instance. +// - `group` is the name of the group this consumer instance will join . All instances that form +// a consumer group should use the same name. A group name must be unique per Kafka cluster. +// - `subscription` is an object that describes what partitions the group wants to consume. +// A single instance may end up consuming between zero of them, or all of them, or any number +// in between. Every running instance in a group should use the same subscription; the behavior +// is undefined if that is not the case. +// - `zookeeper` is the zookeeper connection string, e.g. "zk1:2181,zk2:2181,zk3:2181/chroot" +// - `config` specifies the configuration. If it is nil, a default configuration is used. +func Join(group string, subscription Subscription, zookeeper string, config *Config) (Consumer, error) { + if group == "" { + return nil, sarama.ConfigurationError("a group name cannot be empty") + } + + if config == nil { + config = NewConfig() + } + + var zkNodes []string + zkNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(zookeeper) + + if err := config.Validate(); err != nil { + return nil, err + } + + cm := &consumerManager{ + config: config, + subscription: subscription, + + partitionManagers: make(map[string]*partitionManager), + messages: make(chan *sarama.ConsumerMessage, config.ChannelBufferSize), + errors: make(chan error, config.ChannelBufferSize), + } + + if kz, err := kazoo.NewKazoo(zkNodes, config.Zookeeper); err != nil { + return nil, err + } else { + cm.kz = kz + } + + cm.group = cm.kz.Consumergroup(group) + cm.instance = cm.group.NewInstance() + + // Register the consumer group if it does not exist yet + if exists, err := cm.group.Exists(); err != nil { + cm.shutdown() + return nil, err + + } else if !exists { + if err := cm.group.Create(); err != nil { + cm.shutdown() + return nil, err + } + } + + // Register itself with zookeeper + data, err := subscription.JSON() + if err != nil { + cm.shutdown() + return nil, err + } + if err := cm.instance.RegisterWithSubscription(data); err != nil { + cm.shutdown() + return nil, err + } else { + cm.logf("Consumer instance registered (%s).", cm.instance.ID) + } + + // Discover the Kafka brokers + brokers, err := cm.kz.BrokerList() + if err != nil { + cm.shutdown() + return nil, err + } else { + cm.logf("Discovered Kafka cluster at %s", strings.Join(brokers, ",")) + } + + // Initialize sarama client + if client, err := sarama.NewClient(brokers, config.Config); err != nil { + cm.shutdown() + return nil, err + } else { + cm.client = client + } + + // Initialize sarama offset manager + if offsetManager, err := sarama.NewOffsetManagerFromClient(group, cm.client); err != nil { + cm.shutdown() + return nil, err + } else { + cm.offsetManager = offsetManager + } + + // Initialize sarama consumer + if consumer, err := sarama.NewConsumerFromClient(cm.client); err != nil { + cm.shutdown() + return nil, err + } else { + cm.consumer = consumer + } + + // Start the manager goroutine + go cm.run() + + return cm, nil +} + +// consumerManager implements the Consumer interface, and manages the goroutine that +// is responsible for spawning and terminating partitionManagers. +type consumerManager struct { + config *Config + subscription Subscription + + kz *kazoo.Kazoo + group *kazoo.Consumergroup + instance *kazoo.ConsumergroupInstance + + client sarama.Client + consumer sarama.Consumer + offsetManager sarama.OffsetManager + + t tomb.Tomb + m sync.RWMutex + partitionManagers map[string]*partitionManager + + messages chan *sarama.ConsumerMessage + errors chan error +} + +func (cm *consumerManager) Messages() <-chan *sarama.ConsumerMessage { + return cm.messages +} + +func (cm *consumerManager) Errors() <-chan error { + return cm.errors +} + +func (cm *consumerManager) Interrupt() { + cm.t.Kill(nil) +} + +func (cm *consumerManager) Close() error { + cm.Interrupt() + return cm.t.Wait() +} + +// Ack will dispatch a message to the right partitionManager's ack +// function, so it can be marked as processed. +func (cm *consumerManager) Ack(msg *sarama.ConsumerMessage) { + cm.m.RLock() + defer cm.m.RUnlock() + + partitionKey := fmt.Sprintf("%s/%d", msg.Topic, msg.Partition) + partitionManager := cm.partitionManagers[partitionKey] + if partitionManager == nil { + cm.logf("ERROR: acked message %d for %s, but this partition is not managed by this consumer!", msg.Offset, partitionKey) + } else { + partitionManager.ack(msg.Offset) + } +} + +// run implements the main loop of the consumer manager. +// 1. Get partitions that the group subscribes to +// 2. Get the currently running instances +// 3. Distribute partitions over instances +// 4. Run partition consumers for the instances that are assigned to this instance +// 5. Watch zookeeper for changes in 1 & 2; start over when that happens. +func (cm *consumerManager) run() { + defer cm.shutdown() + + for { + partitions, partitionsChanged, err := cm.watchSubscription() + if err != nil { + cm.t.Kill(err) + return + } + + instances, instancesChanged, err := cm.watchConsumerInstances() + if err != nil { + cm.t.Kill(err) + return + } + + cm.logf("Currently, %d instances are registered, to consume %d partitions in total.", len(instances), len(partitions)) + + var ( + partitionDistribution = distributePartitionsBetweenConsumers(instances, retrievePartitionLeaders(partitions)) + assignedPartitions = make(map[string]*kazoo.Partition) + ) + + for _, partition := range partitionDistribution[cm.instance.ID] { + assignedPartitions[partition.Key()] = partition + } + + cm.managePartitionManagers(assignedPartitions) + + select { + case <-cm.t.Dying(): + cm.logf("Interrupted, shutting down...") + return + + case <-partitionsChanged: + cm.logf("Woke up because the subscription reported a change in partitions.") + + case <-instancesChanged: + cm.logf("Woke up because the list of running instances changed.") + } + } +} + +// watchConsumerInstances retrieves the list of currently running consumer instances from Zookeeper, +// and sets a watch to be notified of changes to this list. It will retry for any error that may +// occur. If the consumer manager is interrupted, the error return value will be tomb.ErrDying. Any +// other error is non-recoverable. +func (cm *consumerManager) watchSubscription() (kazoo.PartitionList, <-chan zk.Event, error) { + var ( + partitions kazoo.PartitionList + partitionsChanged <-chan zk.Event + err error + ) + + for { + partitions, partitionsChanged, err = cm.subscription.WatchPartitions(cm.kz) + if err != nil { + cm.logf("Failed to watch subscription: %s. Trying again in 1 second...", err) + select { + case <-cm.t.Dying(): + return nil, nil, tomb.ErrDying + case <-time.After(1 * time.Second): + continue + } + } + + return partitions, partitionsChanged, nil + } +} + +// watchConsumerInstances retrieves the list of currently running consumer instances from Zookeeper, +// and sets a watch to be notified of changes to this list. It will retry for any error that may +// occur. If the consumer manager is interrupted, the error return value will be tomb.ErrDying. Any +// other error is non-recoverable. +func (cm *consumerManager) watchConsumerInstances() (kazoo.ConsumergroupInstanceList, <-chan zk.Event, error) { + var ( + instances kazoo.ConsumergroupInstanceList + instancesChanged <-chan zk.Event + err error + ) + + for { + instances, instancesChanged, err = cm.group.WatchInstances() + if err != nil { + cm.logf("Failed to watch consumer group instances: %s. Trying again in 1 second...", err) + select { + case <-cm.t.Dying(): + return nil, nil, tomb.ErrDying + case <-time.After(1 * time.Second): + continue + } + } + + return instances, instancesChanged, err + } +} + +// startPartitionManager starts a new partition manager in a a goroutine, and adds +// it to the partitionManagers map. +func (cm *consumerManager) startPartitionManager(partition *kazoo.Partition) { + pm := &partitionManager{ + parent: cm, + partition: partition, + lastConsumedOffset: -1, + processingDone: make(chan struct{}), + } + + cm.m.Lock() + cm.partitionManagers[pm.partition.Key()] = pm + cm.m.Unlock() + + go pm.run() +} + +// startPartitionManager stops a running partition manager, and rmoves it +// from the partitionManagers map. +func (cm *consumerManager) stopPartitionManager(pm *partitionManager) { + if err := pm.close(); err != nil { + pm.logf("Failed to cleanly shut down consumer: %s", err) + } + + cm.m.Lock() + delete(cm.partitionManagers, pm.partition.Key()) + cm.m.Unlock() +} + +// managePartitionManagers will compare the currently running partition managers to the list +// of partitions that is assigned to this consumer instance, and will stop and start partition +// managers as appropriate +func (cm *consumerManager) managePartitionManagers(assignedPartitions map[string]*kazoo.Partition) { + var wg sync.WaitGroup + + cm.m.RLock() + cm.logf("This instance is assigned to consume %d partitions, and is currently consuming %d partitions.", len(assignedPartitions), len(cm.partitionManagers)) + + // Stop consumers for partitions that we were not already consuming + for partitionKey, pm := range cm.partitionManagers { + if _, ok := assignedPartitions[partitionKey]; !ok { + wg.Add(1) + go func(pm *partitionManager) { + defer wg.Done() + cm.stopPartitionManager(pm) + }(pm) + } + } + + // Start consumers for partitions that we were not already consuming + for partitionKey, partition := range assignedPartitions { + if _, ok := cm.partitionManagers[partitionKey]; !ok { + wg.Add(1) + go func(partition *kazoo.Partition) { + defer wg.Done() + cm.startPartitionManager(partition) + }(partition) + } + } + + cm.m.RUnlock() + + // Wait until all the interrupted partionManagers have shut down completely. + wg.Wait() +} + +// shutdown cleanly shuts down the consumer manager: +// 1. stop all partition managers +// 2. close connection to Kafka cluster +// 3. deregister this running instance in zookeeper +// 4. close connection to zookeeper. +// 5. close messages and errors channels. +func (cm *consumerManager) shutdown() { + defer cm.t.Done() + + cm.managePartitionManagers(nil) + + if cm.consumer != nil { + if err := cm.consumer.Close(); err != nil { + cm.logf("Failed to close Kafka client: %s", err) + } + } + + if cm.offsetManager != nil { + if err := cm.offsetManager.Close(); err != nil { + cm.logf("Failed to close offset manager: %s", err) + } + } + + if cm.client != nil { + if err := cm.client.Close(); err != nil { + cm.logf("Failed to close Kafka offset manager: %s", err) + } + } + + if cm.instance != nil { + if err := cm.instance.Deregister(); err != nil { + cm.logf("Failed to deregister consumer instance: %s", err) + } + } + + if cm.kz != nil { + if err := cm.kz.Close(); err != nil { + cm.logf("Failed to close Zookeeper connection: %s", err) + } + } + + close(cm.messages) + close(cm.errors) +} + +func (cm *consumerManager) shortID() string { + if cm.instance == nil { + return "(defunct)" + } else { + return cm.instance.ID[len(cm.instance.ID)-12:] + } +} + +func (cm *consumerManager) logf(format string, arguments ...interface{}) { + Logger.Printf(fmt.Sprintf("[instance=%s] %s", cm.shortID(), format), arguments...) +} diff --git a/kafkaconsumer/distribution.go b/kafkaconsumer/distribution.go new file mode 100644 index 0000000..3b988a9 --- /dev/null +++ b/kafkaconsumer/distribution.go @@ -0,0 +1,81 @@ +package kafkaconsumer + +import ( + "sort" + + "github.com/wvanbergen/kazoo-go" +) + +// retrievePartitionLeaders annotates a list of partitions with the leading broker. +// This list can be sorted and serve as input for distributePartitionsBetweenConsumers(). +// By sorting them based on the leading broker, consumer instances will end up consuming +// partitions that are led by the same broker, which means less connections are necessary. +// +// Note: this implementation uses the preferred replica, not the actual leader. Normally, +// the preferred leader is also the actual leader, especially if auto.leader.rebalance.enable +// is set to true on the Kafka cluster. Using the actual leaders would require setting a +// zookeeper watch on every partition state node, and I am not sure that's worth it. +func retrievePartitionLeaders(partitions kazoo.PartitionList) partitionLeaderList { + pls := make(partitionLeaderList, 0, len(partitions)) + for _, partition := range partitions { + pl := partitionLeader{id: partition.ID, leader: partition.PreferredReplica(), partition: partition} + pls = append(pls, pl) + } + return pls +} + +// Divides a set of partitions between a set of consumers. +func distributePartitionsBetweenConsumers(consumers kazoo.ConsumergroupInstanceList, partitions partitionLeaderList) map[string][]*kazoo.Partition { + result := make(map[string][]*kazoo.Partition) + + plen := len(partitions) + clen := len(consumers) + if clen == 0 { + return result + } + + sort.Sort(partitions) + sort.Sort(consumers) + + n := plen / clen + m := plen % clen + p := 0 + for i, consumer := range consumers { + first := p + last := first + n + if m > 0 && i < m { + last++ + } + if last > plen { + last = plen + } + + for _, pl := range partitions[first:last] { + result[consumer.ID] = append(result[consumer.ID], pl.partition) + } + p = last + } + + return result +} + +type partitionLeader struct { + id int32 + leader int32 + partition *kazoo.Partition +} + +// A sortable slice of PartitionLeader structs +type partitionLeaderList []partitionLeader + +func (pls partitionLeaderList) Len() int { + return len(pls) +} + +func (pls partitionLeaderList) Less(i, j int) bool { + return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].partition.Key() < pls[j].partition.Key()) +} + +func (s partitionLeaderList) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/kafkaconsumer/distribution_test.go b/kafkaconsumer/distribution_test.go new file mode 100644 index 0000000..b2e9ecd --- /dev/null +++ b/kafkaconsumer/distribution_test.go @@ -0,0 +1,108 @@ +package kafkaconsumer + +import ( + "fmt" + "github.com/wvanbergen/kazoo-go" + "sort" + "testing" +) + +func TestPartitionLeaderList(t *testing.T) { + var ( + topicFoo = &kazoo.Topic{Name: "foo"} + topicBar = &kazoo.Topic{Name: "bar"} + ) + + var ( + partitionFoo_1 = topicFoo.Partition(1, []int32{3, 2, 1}) + partitionFoo_2 = topicFoo.Partition(2, []int32{1, 3, 1}) + partitionBar_1 = topicBar.Partition(1, []int32{3, 1, 2}) + partitionBar_2 = topicBar.Partition(2, []int32{3, 2, 1}) + ) + + partitions := kazoo.PartitionList{partitionFoo_1, partitionFoo_2, partitionBar_1, partitionBar_2} + partitionLeaders := retrievePartitionLeaders(partitions) + sort.Sort(partitionLeaders) + + if partitionLeaders[0].partition != partitionFoo_2 || partitionLeaders[1].partition != partitionBar_1 || partitionLeaders[2].partition != partitionBar_2 || partitionLeaders[3].partition != partitionFoo_1 { + t.Error("Unexpected order after sorting partition leader list", partitionLeaders) + } +} + +func TestPartitionDistribution(t *testing.T) { + type testCase struct{ consumers, partitions int } + + consumerPartitionTestCases := []testCase{ + testCase{consumers: 0, partitions: 1}, + testCase{consumers: 1, partitions: 0}, + testCase{consumers: 2, partitions: 5}, + testCase{consumers: 5, partitions: 2}, + testCase{consumers: 9, partitions: 32}, + testCase{consumers: 10, partitions: 50}, + testCase{consumers: 232, partitions: 592}, + } + + for _, tc := range consumerPartitionTestCases { + var ( + consumers = createTestConsumerGroupInstanceList(tc.consumers) + partitions = createTestPartitions(tc.partitions) + ) + + distribution := distributePartitionsBetweenConsumers(consumers, partitions) + + var ( + grouping = make(map[int32]struct{}) + maxConsumed = 0 + minConsumed = len(partitions) + 1 + ) + + for _, v := range distribution { + if len(v) > maxConsumed { + maxConsumed = len(v) + } + if len(v) < minConsumed { + minConsumed = len(v) + } + for _, partition := range v { + if _, ok := grouping[partition.ID]; ok { + t.Errorf("PartitionDivision: Partition %v was assigned more than once!", partition.ID) + } else { + grouping[partition.ID] = struct{}{} + } + } + } + + if len(consumers) > 0 { + if len(grouping) != len(partitions) { + t.Errorf("PartitionDivision: Expected to divide %d partitions among consumers, but only %d partitions were consumed.", len(partitions), len(grouping)) + } + if (maxConsumed - minConsumed) > 1 { + t.Errorf("PartitionDivision: Partitions weren't divided evenly, consumers shouldn't have a difference of more than 1 in the number of partitions consumed (was %d).", maxConsumed-minConsumed) + } + if minConsumed > 1 && len(consumers) != len(distribution) { + t.Errorf("PartitionDivision: Partitions weren't divided evenly, some consumers didn't get any paritions even though there were %d partitions and %d consumers.", len(partitions), len(consumers)) + } + } else { + if len(grouping) != 0 { + t.Error("PartitionDivision: Expected to not distribute partitions without running instances") + } + } + } +} + +func createTestConsumerGroupInstanceList(size int) kazoo.ConsumergroupInstanceList { + k := make(kazoo.ConsumergroupInstanceList, size) + for i := range k { + k[i] = &kazoo.ConsumergroupInstance{ID: fmt.Sprintf("consumer%d", i)} + } + return k +} + +func createTestPartitions(count int) []partitionLeader { + topic := &kazoo.Topic{Name: "foo"} + p := make([]partitionLeader, count) + for i := range p { + p[i] = partitionLeader{id: int32(i), leader: 1, partition: topic.Partition(int32(i), []int32{1, 2, 3})} + } + return p +} diff --git a/kafkaconsumer/kafkaconsumer_integration_test.go b/kafkaconsumer/kafkaconsumer_integration_test.go new file mode 100644 index 0000000..347b4b6 --- /dev/null +++ b/kafkaconsumer/kafkaconsumer_integration_test.go @@ -0,0 +1,458 @@ +package kafkaconsumer + +import ( + "fmt" + "io" + "log" + "os" + "os/signal" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/wvanbergen/kazoo-go" +) + +const ( + TopicWithSinglePartition = "test.1" + TopicWithMultiplePartitions = "test.4" +) + +var ( + // By default, assume we're using Sarama's vagrant cluster when running tests + zookeeper = kazoo.BuildConnectionString([]string{"192.168.100.67:2181", "192.168.100.67:2182", "192.168.100.67:2183", "192.168.100.67:2184", "192.168.100.67:2185"}) + kafkaBrokers []string +) + +func init() { + if os.Getenv("ZOOKEEPER_PEERS") != "" { + zookeeper = os.Getenv("ZOOKEEPER_PEERS") + } + + if os.Getenv("DEBUG") != "" { + Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + } + + fmt.Printf("Using Zookeeper cluster at %s\n", zookeeper) + + zkNodes, _ := kazoo.ParseConnectionString(zookeeper) + kz, err := kazoo.NewKazoo(zkNodes, nil) + if err != nil { + log.Fatal("Failed to connect to Zookeeper:", err) + } + defer kz.Close() + + brokers, err := kz.BrokerList() + if err != nil { + log.Fatal("Failed to retrieve broker list from Zookeeper:", err) + } + + kafkaBrokers = brokers +} + +//////////////////////////////////////////////////////////////////// +// Examples +//////////////////////////////////////////////////////////////////// + +// This example sets up a consumer instance that consumes two topics, +// processes and commits the messages that are consumed, and properly +// shuts down the consumer when the process is interrupted. +func ExampleConsumer() { + consumer, err := Join( + "ExampleConsumerGroup", // name of the consumer group + TopicSubscription("access_log", "audit_log"), // topics to subscribe to + "zk1:2181,zk2:2181,zk3:2181/chroot", // zookeeper connection string + nil) // Set this to a *Config instance to override defaults + + if err != nil { + log.Fatalln(err) + } + + // Trap the interrupt signal to cleanly shut down the consumer + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + consumer.Interrupt() + }() + + eventCount := 0 + for message := range consumer.Messages() { + // Process message + log.Println(string(message.Value)) + eventCount += 1 + + // Acknowledge that the message has been processed + consumer.Ack(message) + } + + log.Printf("Processed %d events.", eventCount) +} + +//////////////////////////////////////////////////////////////////// +// Integration tests +//////////////////////////////////////////////////////////////////// + +func TestFunctionalSingleConsumerSingleTopic(t *testing.T) { + ts := newTestState(t, "kafkaconsumer.TestSingleConsumerSingleTopic") + ts.prepareConsumer([]string{"test.4"}) + ts.produceMessages("test.4", 100) + + consumer, err := Join("kafkaconsumer.TestSingleConsumerSingleTopic", TopicSubscription("test.4"), zookeeper, nil) + if err != nil { + t.Fatal(t) + } + + ts.assertMessages(consumer, 100) + + safeClose(t, consumer) + ts.assertDrained(consumer) + + ts.assertOffsets() +} + +func TestFunctionalSingleConsumerMultipleTopics(t *testing.T) { + ts := newTestState(t, "kafkaconsumer.TestSingleConsumerMultipleTopics") + ts.prepareConsumer([]string{"test.4", "test.1"}) + + ts.produceMessages("test.4", 100) + ts.produceMessages("test.1", 200) + + consumer, err := Join("kafkaconsumer.TestSingleConsumerMultipleTopics", TopicSubscription("test.4", "test.1"), zookeeper, nil) + if err != nil { + t.Fatal(t) + } + + ts.assertMessages(consumer, 300) + + safeClose(t, consumer) + ts.assertDrained(consumer) + + ts.assertOffsets() +} + +// For this test, we produce 100 messages, and then consume the first 50 messages with +// the first consumer, and the last 50 messages with a second consumer +func TestFunctionalSerialConsumersSingleTopic(t *testing.T) { + ts := newTestState(t, "kafkaconsumer.TestSerialConsumersSingleTopic") + ts.prepareConsumer([]string{"test.4"}) + ts.produceMessages("test.4", 100) + + consumer1, err := Join("kafkaconsumer.TestSerialConsumersSingleTopic", TopicSubscription("test.4"), zookeeper, nil) + if err != nil { + t.Fatal(t) + } + + ts.assertMessages(consumer1, 50) + + safeClose(t, consumer1) + // Consumer 1 may not be fully drained, but the messages that are in the pipeline won't + // be committed, and will eventually be consumed by consumer2 instead + + consumer2, err := Join("kafkaconsumer.TestSerialConsumersSingleTopic", TopicSubscription("test.4"), zookeeper, nil) + if err != nil { + t.Fatal(t) + } + + ts.assertMessages(consumer2, 50) + + safeClose(t, consumer2) + // Consumer 2 should be fully drained though + ts.assertDrained(consumer2) + + ts.assertOffsets() +} + +func TestFunctionalParallelConsumers(t *testing.T) { + ts := newTestState(t, "kafkaconsumer.TestFunctionalParallelConsumers") + ts.prepareConsumer([]string{"test.64"}) + ts.produceMessages("test.64", 1000) + + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + consumer, err := Join("kafkaconsumer.TestFunctionalParallelConsumers", TopicSubscription("test.64"), zookeeper, nil) + if err != nil { + t.Fatal(t) + } + + go func() { + <-ts.done + consumer.Interrupt() + }() + + ts.assertAllMessages(consumer) + }() + } + + wg.Wait() + ts.assertOffsets() +} + +func TestFunctionalParallelConsumersWithInterruption(t *testing.T) { + ts := newTestState(t, "kafkaconsumer.TestFunctionalParallelConsumersWithInterruption") + ts.prepareConsumer([]string{"test.4"}) + ts.produceMessages("test.4", 100) + + config := NewConfig() + config.MaxProcessingTime = 500 * time.Millisecond + + consumer1, err := Join("kafkaconsumer.TestFunctionalParallelConsumersWithInterruption", TopicSubscription("test.4"), zookeeper, config) + if err != nil { + t.Fatal(t) + } + + consumer2, err := Join("kafkaconsumer.TestFunctionalParallelConsumersWithInterruption", TopicSubscription("test.4"), zookeeper, config) + if err != nil { + t.Fatal(t) + } + + // We start consuming messages from both consumers in parallel. + // Both should acknowledge 25 messages. + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ts.assertMessages(consumer1, 25) + }() + + wg.Add(1) + go func() { + defer wg.Done() + ts.assertMessages(consumer2, 25) + }() + + wg.Wait() + + // Now, we close one consumer, which means that the second consumer will consume all other messages. + safeClose(t, consumer1) + + ts.assertMessages(consumer2, 50) + + // Now, close the second consumer + safeClose(t, consumer2) + ts.assertDrained(consumer2) + + ts.assertOffsets() +} + +//////////////////////////////////////////////////////////////////// +// Helper functions +//////////////////////////////////////////////////////////////////// + +func safeClose(t *testing.T, c io.Closer) { + if err := c.Close(); err != nil { + t.Error(err) + } +} + +func newTestState(t *testing.T, group string) *testState { + return &testState{t: t, group: group, done: make(chan struct{})} +} + +type testState struct { + t *testing.T + group string + c sarama.Client + coord *sarama.Broker + offsetTotal int64 + topics []string + produced int64 + consumed int64 + done chan struct{} +} + +func (ts *testState) close() { + if ts.c != nil { + safeClose(ts.t, ts.c) + } +} + +func (ts *testState) client() sarama.Client { + if ts.c == nil { + config := sarama.NewConfig() + config.Producer.Partitioner = sarama.NewRoundRobinPartitioner + + client, err := sarama.NewClient(kafkaBrokers, config) + if err != nil { + ts.t.Fatal("Failed to connect to Kafka:", err) + } + + ts.c = client + } + + return ts.c +} + +func (ts *testState) coordinator() *sarama.Broker { + if ts.coord == nil { + coordinator, err := ts.client().Coordinator(ts.group) + if err != nil { + ts.t.Fatal(err) + } + ts.coord = coordinator + } + + return ts.coord +} + +func (ts *testState) prepareConsumer(topics []string) { + client := ts.client() + coordinator := ts.coordinator() + + request := &sarama.OffsetCommitRequest{ + Version: 1, + ConsumerGroup: ts.group, + } + + ts.topics = topics + for _, topic := range topics { + partitions, err := client.Partitions(topic) + if err != nil { + ts.t.Fatal("Failed to retrieve list of partitions:", err) + } + + for _, partition := range partitions { + offset, err := client.GetOffset(topic, partition, sarama.OffsetNewest) + if err != nil { + ts.t.Fatal("Failed to get latest offset for partition:", err) + } + ts.offsetTotal += offset - 1 + + request.AddBlock(topic, partition, offset-1, -1, "") + ts.t.Logf("Setting initial offset for %s/%d: %d.", topic, partition, offset) + } + } + + response, err := coordinator.CommitOffset(request) + if err != nil { + ts.t.Fatal("Failed to commit offsets for consumergroup:", err) + } + + for topic, errors := range response.Errors { + for partition, err := range errors { + if err != sarama.ErrNoError { + ts.t.Fatalf("Failed to commit offsets for %s/%d: %s", topic, partition, err) + } + } + } +} + +func (ts *testState) produceMessages(topic string, count int64) { + producer, err := sarama.NewAsyncProducerFromClient(ts.client()) + if err != nil { + ts.t.Fatal("Failed to open Kafka producer:", err) + } + defer producer.Close() + + go func() { + for err := range producer.Errors() { + ts.t.Error("Failed to produce message:", err) + } + }() + + for i := int64(0); i < count; i++ { + producer.Input() <- &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(fmt.Sprintf("%d", count)), + } + } + + atomic.AddInt64(&ts.produced, count) + ts.t.Logf("Produced %d messages to %s", count, topic) +} + +func (ts *testState) assertAllMessages(consumer Consumer) { + var count int64 + for msg := range consumer.Messages() { + count++ + atomic.AddInt64(&ts.consumed, 1) + consumer.Ack(msg) + + if atomic.LoadInt64(&ts.produced) == atomic.LoadInt64(&ts.consumed) { + close(ts.done) + } + } + + ts.t.Logf("Consumed %d out of %d messages.", count, atomic.LoadInt64(&ts.produced)) +} + +func (ts *testState) assertMessages(consumer Consumer, total int) { + timeout := time.After(10 * time.Second) + count := 0 + for { + select { + case <-timeout: + ts.t.Errorf("TIMEOUT: only consumed %d/%d messages", count, total) + return + + case msg := <-consumer.Messages(): + count++ + atomic.AddInt64(&ts.consumed, 1) + consumer.Ack(msg) + + if count == total { + ts.t.Logf("Consumed %d out of %d messages.", total, atomic.LoadInt64(&ts.produced)) + return + } + } + } +} + +func (ts *testState) assertDrained(consumer Consumer) { + if _, ok := <-consumer.Messages(); ok { + ts.t.Error("Expected messages channel of consumer to be drained") + } +} + +func (ts *testState) assertOffsets() { + defer ts.close() + + request := &sarama.OffsetFetchRequest{ + Version: 1, + ConsumerGroup: ts.group, + } + + for _, topic := range ts.topics { + partitions, err := ts.client().Partitions(topic) + if err != nil { + ts.t.Fatal("Failed to retrieve list of partitions:", err) + } + + for _, partition := range partitions { + request.AddPartition(topic, partition) + } + } + + response, err := ts.coordinator().FetchOffset(request) + if err != nil { + ts.t.Fatal("Failed to fetch committed offsets", err) + } + + var newOffsetTotal int64 + for topic, partitions := range response.Blocks { + for partition, block := range partitions { + if block.Err != sarama.ErrNoError { + ts.t.Fatalf("%s/%d: %s", topic, partition, block.Err) + } + + ts.t.Logf("Committed offset for %s/%d: %d", topic, partition, block.Offset) + newOffsetTotal += block.Offset + } + } + + produced := atomic.LoadInt64(&ts.produced) + if newOffsetTotal-ts.offsetTotal != produced { + ts.t.Errorf("Expected the offsets to have increased by %d, but increment was %d", produced, newOffsetTotal-ts.offsetTotal) + } else { + ts.t.Logf("The offsets stored in Kafka have increased by %d", produced) + } +} diff --git a/kafkaconsumer/partition_manager.go b/kafkaconsumer/partition_manager.go new file mode 100644 index 0000000..53817d4 --- /dev/null +++ b/kafkaconsumer/partition_manager.go @@ -0,0 +1,286 @@ +package kafkaconsumer + +import ( + "fmt" + "sync/atomic" + "time" + + "github.com/Shopify/sarama" + "github.com/wvanbergen/kazoo-go" + "gopkg.in/tomb.v1" +) + +// partitionManager manages the consumption of a single partition, and committing +// the processed messages to offset storage. +type partitionManager struct { + parent *consumerManager + t tomb.Tomb + partition *kazoo.Partition + + offsetManager sarama.PartitionOffsetManager + lastConsumedOffset int64 + processingDone chan struct{} +} + +// run implements the main partition manager loop. +// 1. Claim the partition in Zookeeper +// 2. Determine at what offset to start consuming +// 3. Start a consumer for the partition at the inital offset +// 4. Transfer messages and errors from the partition consumer to the consumer manager. +func (pm *partitionManager) run() { + defer pm.t.Done() + + if err := pm.claimPartition(); err != nil { + pm.t.Kill(err) + return + } + defer pm.releasePartition() + + offsetManager, err := pm.startPartitionOffsetManager() + if err != nil { + pm.t.Kill(err) + return + } else { + pm.offsetManager = offsetManager + defer pm.closePartitionOffsetManager() + } + + // We are ignoring metadata for now. + initialOffset, _ := offsetManager.NextOffset() + defer pm.waitForProcessing() + + pc, err := pm.startPartitionConsumer(initialOffset) + if err != nil { + pm.t.Kill(err) + return + } + defer pm.closePartitionConsumer(pc) + + for { + select { + case <-pm.t.Dying(): + return + + case msg := <-pc.Messages(): + select { + case pm.parent.messages <- msg: + atomic.StoreInt64(&pm.lastConsumedOffset, msg.Offset) + + case <-pm.t.Dying(): + return + } + + case err := <-offsetManager.Errors(): + select { + case pm.parent.errors <- err: + // Noop? + case <-pm.t.Dying(): + return + } + + case err := <-pc.Errors(): + select { + case pm.parent.errors <- err: + // Noop? + case <-pm.t.Dying(): + return + } + } + } +} + +// interrupt initiates the shutdown procedure for the partition manager, and returns immediately. +func (pm *partitionManager) interrupt() { + pm.t.Kill(nil) +} + +// close starts the shutdown proecure, and waits for it to complete. +func (pm *partitionManager) close() error { + pm.interrupt() + return pm.t.Wait() +} + +// ack sets the offset on the partition's offset manager, and signals that +// processing done if the offset is equal to the last consumed offset during shutdown. +func (pm *partitionManager) ack(offset int64) { + pm.offsetManager.MarkOffset(offset, "") + + if pm.t.Err() != tomb.ErrStillAlive && offset == atomic.LoadInt64(&pm.lastConsumedOffset) { + close(pm.processingDone) + } +} + +// claimPartition claims a partition in Zookeeper for this instance. +// If the partition is already claimed by someone else, it will wait for the +// partition to become available. It will retry errors if they occur. +// This method should therefore only return with a nil error value, or +// tomb.ErrDying if the partitionManager was interrupted. Any other errors +// are not recoverable. +func (pm *partitionManager) claimPartition() error { + pm.logf("Trying to claim partition...") + + for { + owner, changed, err := pm.parent.group.WatchPartitionOwner(pm.partition.Topic().Name, pm.partition.ID) + if err != nil { + pm.logf("Failed to get partition owner from Zookeeper: %s. Trying again in 1 second...", err) + select { + case <-time.After(1 * time.Second): + continue + case <-pm.t.Dying(): + return tomb.ErrDying + } + } + + if owner != nil { + if owner.ID == pm.parent.instance.ID { + return fmt.Errorf("The current instance is already the owner of %s. This should not happen.", pm.partition.Key()) + } + + pm.logf("Partition is currently claimed by instance %s. Waiting for it to be released...", owner.ID) + select { + case <-changed: + continue + case <-pm.t.Dying(): + return tomb.ErrDying + } + + } else { + + err := pm.parent.instance.ClaimPartition(pm.partition.Topic().Name, pm.partition.ID) + if err != nil { + pm.logf("Fail to claim partition ownership: %s. Trying again...", err) + continue + } + + pm.logf("Claimed partition ownership") + return nil + } + } +} + +// releasePartition releases this instance's claim on this partition in Zookeeper. +func (pm *partitionManager) releasePartition() { + if err := pm.parent.instance.ReleasePartition(pm.partition.Topic().Name, pm.partition.ID); err != nil { + pm.logf("FAILED to release partition: %s", err) + } else { + pm.logf("Released partition.") + } +} + +// startPartitionOffsetManager starts a PartitionOffsetManager for the partition, and will +// retry any errors. The only error value that can be returned is tomb.ErrDying, which is +// returned when the partition manager is interrupted. Any other error should be considered +// non-recoverable. +func (pm *partitionManager) startPartitionOffsetManager() (sarama.PartitionOffsetManager, error) { + for { + offsetManager, err := pm.parent.offsetManager.ManagePartition(pm.partition.Topic().Name, pm.partition.ID) + if err != nil { + pm.logf("FAILED to start partition offset manager: %s. Trying again in 1 second...\n", err) + + select { + case <-pm.t.Dying(): + return nil, tomb.ErrDying + case <-time.After(1 * time.Second): + continue + } + } + + return offsetManager, nil + } +} + +// closePartitionOffsetManager stops the partition offset manager for this partitions, and will write +// any error that may happen to the logger. +func (pm *partitionManager) closePartitionOffsetManager() { + if err := pm.offsetManager.Close(); err != nil { + pm.logf("Failed to close partition offset manager: %s\n", err) + } +} + +// startPartitionConsumer starts a sarama consumer for the partition under management. +// This function will retry any error that may occur. The error return value is nil once +// it successfully has started the partition consumer, or tomb.ErrDying if the partition +// manager was interrupted. Any other error is not recoverable. +func (pm *partitionManager) startPartitionConsumer(initialOffset int64) (sarama.PartitionConsumer, error) { + var ( + pc sarama.PartitionConsumer + err error + ) + + for { + pc, err = pm.parent.consumer.ConsumePartition(pm.partition.Topic().Name, pm.partition.ID, initialOffset) + switch err { + case nil: + switch initialOffset { + case sarama.OffsetNewest: + pm.logf("Started consumer for new messages only.") + case sarama.OffsetOldest: + pm.logf("Started consumer at the oldest available offset.") + default: + pm.logf("Started consumer at offset %d.", initialOffset) + } + + // We have a valid partition consumer so we can return + return pc, nil + + case sarama.ErrOffsetOutOfRange: + // The offset we had on file is too old. Restart with initial offset + if pm.parent.config.Offsets.Initial == sarama.OffsetNewest { + pm.logf("Offset %d is no longer available. Trying again with new messages only...", initialOffset) + } else if pm.parent.config.Offsets.Initial == sarama.OffsetOldest { + pm.logf("Offset %d is no longer available. Trying again with he oldest available offset...", initialOffset) + } + initialOffset = pm.parent.config.Offsets.Initial + + continue + + default: + // Assume the problem is temporary; just try again. + pm.logf("Failed to start consuming partition: %s. Trying again in 1 second...\n", err) + select { + case <-pm.t.Dying(): + return nil, tomb.ErrDying + case <-time.After(1 * time.Second): + continue + } + } + + } +} + +// closePartitionConsumer closes the sarama consumer for the partition under management. +func (pm *partitionManager) closePartitionConsumer(pc sarama.PartitionConsumer) { + if err := pc.Close(); err != nil { + pm.logf("Failed to close partition consumer: %s\n", err) + } +} + +// waitForProcessing waits for all the messages that were consumed for this partition to be processed. +// The processing can take at most MaxProcessingTime time. After that, those messages are consisered +// lost and will not be committed. Note that this may cause messages to be processed twice if another +// partition consumer resumes consuming from this partition later. +func (pm *partitionManager) waitForProcessing() { + nextOffset, _ := pm.offsetManager.NextOffset() + lastProcessedOffset := nextOffset - 1 + lastConsumedOffset := atomic.LoadInt64(&pm.lastConsumedOffset) + + if lastConsumedOffset >= 0 { + if lastConsumedOffset > lastProcessedOffset { + pm.logf("Waiting for offset %d to be processed before shutting down...", lastConsumedOffset) + + select { + case <-pm.processingDone: + pm.logf("Offset %d has been processed, continuing shutdown.", lastConsumedOffset) + case <-time.After(pm.parent.config.MaxProcessingTime): + pm.logf("TIMEOUT: offset %d still has not been processed. The last processed offset was %d.", lastConsumedOffset, lastProcessedOffset) + } + } else { + pm.logf("Offset %d has been processed. Continuing shutdown...", lastConsumedOffset) + } + } +} + +// logf writes a formatted log message to the consumergroup.Logger +func (pm *partitionManager) logf(format string, arguments ...interface{}) { + Logger.Printf(fmt.Sprintf("[instance=%s partition=%s] %s", pm.parent.shortID(), pm.partition.Key(), format), arguments...) +} diff --git a/kafkaconsumer/subscription.go b/kafkaconsumer/subscription.go new file mode 100644 index 0000000..a8783c3 --- /dev/null +++ b/kafkaconsumer/subscription.go @@ -0,0 +1,226 @@ +package kafkaconsumer + +import ( + "encoding/json" + "regexp" + "sync" + "time" + + "github.com/samuel/go-zookeeper/zk" + "github.com/wvanbergen/kazoo-go" +) + +type SubscriptionPattern string + +const ( + SubscriptionPatternStatic SubscriptionPattern = "static" + SubscriptionPatternWhiteList SubscriptionPattern = "white_list" + SubscriptionPatternBlackList SubscriptionPattern = "black_list" +) + +// Subscription describes what topics/partitions a consumer instance is +// subscribed to. This can be a static list of topic, or can be a regular +// expression that acts as a whitelist or blacklist of topics. +// +// The subscription is responsible for watching zookeeper to changes is +// the list of topics or partitions, and notify the consumer so it +// can trigger a rebalance. +type Subscription interface { + + // WatchPartitions returns a list of partitions that the consumer group should + // consume, and a channel that will be fired if this list has changed. + WatchPartitions(kazoo *kazoo.Kazoo) (kazoo.PartitionList, <-chan zk.Event, error) + + // JSON returns a JSON-encoded representation of the subscription, which will be + // stored in Zookeeper alongside every running instance registration. + JSON() ([]byte, error) +} + +type staticSubscription struct { + Pattern SubscriptionPattern `json:"pattern"` + Subscription map[string]int `json:"subscription"` + Timestamp int64 `json:"timestamp,string"` + Version int `json:"version"` +} + +// TopicSubscription creates a static subscription for a list of topics. +func TopicSubscription(topics ...string) Subscription { + sub := make(map[string]int) + for _, topic := range topics { + sub[topic] = 1 + } + return StaticSubscription(sub) +} + +// StaticSubscription creates a static subscription for a map of topics, +// and the number of streams that will be used to consume it. +func StaticSubscription(subscription map[string]int) Subscription { + return &staticSubscription{ + Version: 1, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Pattern: SubscriptionPatternStatic, + Subscription: subscription, + } +} + +func (ss *staticSubscription) WatchPartitions(kz *kazoo.Kazoo) (kazoo.PartitionList, <-chan zk.Event, error) { + var ( + allPartitions = make([]*kazoo.Partition, 0) + ca = newChangeAggregator() + ) + + for topicName, _ := range ss.Subscription { + topic := kz.Topic(topicName) + + if exists, err := topic.Exists(); err != nil { + return nil, nil, err + } else if !exists { + // This deals with topics that are deleted while the consumer is running. Don't do this. + Logger.Printf("Attempted to subscribe to %s, but this topic does not appear to exist. Ignoring...", topic.Name) + continue + } + + partitions, partitionsChanged, err := topic.WatchPartitions() + if err != nil { + return nil, nil, err + } + + ca.handleChange(partitionsChanged) + + for _, partition := range partitions { + allPartitions = append(allPartitions, partition) + } + } + + return allPartitions, ca.channel(), nil +} + +// JSON returns the json representation of the static subscription +func (ss *staticSubscription) JSON() ([]byte, error) { + return json.Marshal(ss) +} + +type regexpSubscription struct { + Pattern SubscriptionPattern `json:"pattern"` + Subscription map[string]int `json:"subscription"` + Timestamp int64 `json:"timestamp,string"` + Version int `json:"version"` + + regexp *regexp.Regexp +} + +// WhitelistSubscription creates a subscription on topics that match a given regular expression. +// It will automatically subscribe to new topics that match the pattern when they are created. +func WhitelistSubscription(pattern *regexp.Regexp) Subscription { + subscription := make(map[string]int) + subscription[pattern.String()] = 1 + + return ®expSubscription{ + Version: 1, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Pattern: SubscriptionPatternWhiteList, + Subscription: subscription, + + regexp: pattern, + } +} + +// BlacklistSubscription creates a subscription on topics that do not match a given regular expression. +// It will automatically subscribe to new topics that don't match the pattern when they are created. +func BlacklistSubscription(pattern *regexp.Regexp) Subscription { + subscription := make(map[string]int) + subscription[pattern.String()] = 1 + + return ®expSubscription{ + Version: 1, + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Pattern: SubscriptionPatternBlackList, + Subscription: subscription, + + regexp: pattern, + } +} + +func (rs *regexpSubscription) topicMatchesPattern(topicName string) bool { + switch rs.Pattern { + case SubscriptionPatternWhiteList: + return rs.regexp.MatchString(topicName) + case SubscriptionPatternBlackList: + return !rs.regexp.MatchString(topicName) + default: + panic("Unexpected pattern for regexpSubscription") + } +} + +func (rs *regexpSubscription) WatchPartitions(kz *kazoo.Kazoo) (kazoo.PartitionList, <-chan zk.Event, error) { + var ( + allPartitions = make([]*kazoo.Partition, 0) + ca = newChangeAggregator() + ) + + topics, topicsChanged, err := kz.WatchTopics() + if err != nil { + return nil, nil, err + } + + ca.handleChange(topicsChanged) + + for _, topic := range topics { + if rs.topicMatchesPattern(topic.Name) { + partitions, partitionsChanged, err := topic.WatchPartitions() + if err != nil { + return nil, nil, err + } + + ca.handleChange(partitionsChanged) + + for _, partition := range partitions { + allPartitions = append(allPartitions, partition) + } + } + } + + return allPartitions, ca.channel(), nil +} + +// JSON returns the json representation of the static subscription +func (rs *regexpSubscription) JSON() ([]byte, error) { + return json.Marshal(rs) +} + +func newChangeAggregator() *changeAggregator { + return &changeAggregator{ + dying: make(chan struct{}, 0), + c: make(chan zk.Event, 1), + } +} + +// changeAggregator will emit only the first change event to the +// output channel. All other goroutines waiting for zookeeper watches +// will be stopped once this happens. +type changeAggregator struct { + dying chan struct{} + c chan zk.Event + once sync.Once +} + +func (ca *changeAggregator) handleChange(change <-chan zk.Event) { + go ca.waitForChange(change) +} + +func (ca *changeAggregator) waitForChange(change <-chan zk.Event) { + select { + case <-ca.dying: + return + case event := <-change: + ca.once.Do(func() { + close(ca.dying) + ca.c <- event + close(ca.c) + }) + } +} + +func (ca *changeAggregator) channel() <-chan zk.Event { + return ca.c +} diff --git a/kafkaconsumer/subscription_test.go b/kafkaconsumer/subscription_test.go new file mode 100644 index 0000000..082f371 --- /dev/null +++ b/kafkaconsumer/subscription_test.go @@ -0,0 +1,158 @@ +package kafkaconsumer + +import ( + "encoding/json" + "regexp" + "sync" + "sync/atomic" + "testing" + + "github.com/samuel/go-zookeeper/zk" +) + +func TestStaticSubscriptionJSON(t *testing.T) { + subscription := TopicSubscription("test1", "test2") + bytes, err := subscription.JSON() + if err != nil { + t.Fatal(err) + } + + data := make(map[string]interface{}) + if err := json.Unmarshal(bytes, &data); err != nil { + t.Fatal(err) + } + + if data["pattern"] != "static" { + t.Errorf("pattern should be set to static, but was %+v", data["pattern"]) + } + + if data["version"] != float64(1) { + t.Errorf("version should be set to 1, but was %+v", data["version"]) + } + + if data["timestamp"] == nil || data["timestamp"] == "" { + t.Errorf("timestamp should be set, but was %+v", data["timestamp"]) + } + + if s, ok := data["subscription"].(map[string]interface{}); ok { + if len(s) != 2 { + t.Error("Subscription should have 2 entries") + } + + if s["test1"] != float64(1) { + t.Error("Subscription for test1 was not properly set") + } + + if s["test2"] != float64(1) { + t.Error("Subscription for test2 was not properly set") + } + } else { + t.Error("subscription should be a map") + } +} + +func TestRegexpSubscription(t *testing.T) { + whitelist := WhitelistSubscription(regexp.MustCompile("^test\\..*")).(*regexpSubscription) + + if whitelist.Pattern != SubscriptionPatternWhiteList { + t.Error("Subscription should have white_list pattern, but has:", whitelist.Pattern) + } + + if !whitelist.topicMatchesPattern("test.1") { + t.Error("Subscription should match topic test.1") + } + + if whitelist.topicMatchesPattern("foo") { + t.Error("Subscription should not match topic foo") + } + + blacklist := BlacklistSubscription(regexp.MustCompile("^test\\..*")).(*regexpSubscription) + + if blacklist.Pattern != SubscriptionPatternBlackList { + t.Error("Subscription should have black_list pattern, but has:", blacklist.Pattern) + } + + if blacklist.topicMatchesPattern("test.1") { + t.Error("Subscription should not match topic test.1") + } + + if !blacklist.topicMatchesPattern("foo") { + t.Error("Subscription should match topic foo") + } +} + +func TestRegexpSubscriptionJSON(t *testing.T) { + subscription := WhitelistSubscription(regexp.MustCompile("^test\\..*")) + bytes, err := subscription.JSON() + if err != nil { + t.Fatal(err) + } + + data := make(map[string]interface{}) + if err := json.Unmarshal(bytes, &data); err != nil { + t.Fatal(err) + } + + if data["pattern"] != "white_list" { + t.Errorf("pattern should be set to static, but was %+v", data["pattern"]) + } + + if data["version"] != float64(1) { + t.Errorf("version should be set to 1, but was %+v", data["version"]) + } + + if data["timestamp"] == nil || data["timestamp"] == "" { + t.Errorf("timestamp should be set, but was %+v", data["timestamp"]) + } + + if s, ok := data["subscription"].(map[string]interface{}); ok { + if len(s) != 1 && s["^test\\..*"] != float64(1) { + t.Error("Pattern was not set properly in the Subscription field") + } + } else { + t.Error("subscription should be a map") + } +} + +func TestChangeAggregator(t *testing.T) { + + var ( + changesHandled, changesSubmitted int32 + + wg sync.WaitGroup + ca = newChangeAggregator() + ) + + for i := 0; i < 100; i++ { + change := make(chan zk.Event, 1) + + wg.Add(1) + go func(change chan<- zk.Event) { + defer wg.Done() + change <- zk.Event{} + close(change) + atomic.AddInt32(&changesSubmitted, 1) + }(change) + + wg.Add(1) + go func(change <-chan zk.Event) { + defer wg.Done() + ca.waitForChange(change) + atomic.AddInt32(&changesHandled, 1) + }(change) + } + + wg.Wait() + + if changesSubmitted != 100 || changesHandled != 100 { + t.Errorf("Expected 100 changes to be submitted and handled, but found %d and %d", changesSubmitted, changesHandled) + } + + if _, ok := <-ca.channel(); !ok { + t.Errorf("Expected to read a single event from the output channel, but the channel is closed already") + } + + if _, ok := <-ca.channel(); ok { + t.Errorf("Expected the output channel to be closed afte reading the first event") + } +}