diff --git a/consumergroup/consumer_group.go b/consumergroup/consumer_group.go index 4829818..422a744 100644 --- a/consumergroup/consumer_group.go +++ b/consumergroup/consumer_group.go @@ -77,6 +77,7 @@ type ConsumerGroup struct { instance *kazoo.ConsumergroupInstance wg sync.WaitGroup + nGoroutines int singleShutdown sync.Once messages chan *sarama.ConsumerMessage @@ -88,6 +89,10 @@ type ConsumerGroup struct { offsetManager OffsetManager } +func (cg *ConsumerGroup) existInstance() bool { + return cg.instance != nil +} + // Connects to a consumer group, using Zookeeper for auto-discovery func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) { @@ -172,11 +177,13 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config } // Register itself with zookeeper - if err := cg.instance.Register(topics); err != nil { - cg.Logf("FAILED to register consumer instance: %s!\n", err) - return nil, err - } else { - cg.Logf("Consumer instance registered (%s).", cg.instance.ID) + if cg.existInstance() { + if err := cg.instance.Register(topics); err != nil { + cg.Logf("FAILED to register consumer instance: %s!\n", err) + return nil, err + } else { + cg.Logf("Consumer instance registered (%s).", cg.instance.ID) + } } offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval} @@ -215,16 +222,18 @@ func (cg *ConsumerGroup) Close() error { cg.Logf("FAILED closing the offset manager: %s!\n", err) } - if shutdownError = cg.instance.Deregister(); shutdownError != nil { - cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError) - } else { - cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID) + if cg.existInstance() { + if shutdownError = cg.instance.Deregister(); shutdownError != nil { + cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError) + } else { + cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID) + } } - if shutdownError = cg.consumer.Close(); shutdownError != nil { cg.Logf("FAILED closing the Sarama client: %s\n", shutdownError) } + cg.Logf("closing cg channels and instance, remaining goroutines: %d\n", cg.nGoroutines) close(cg.messages) close(cg.errors) cg.instance = nil @@ -238,13 +247,16 @@ func (cg *ConsumerGroup) Logf(format string, args ...interface{}) { if cg.instance == nil { identifier = "(defunct)" } else { - identifier = cg.instance.ID[len(cg.instance.ID)-12:] + identifier = cg.instance.ID[len(cg.instance.ID)-12:] } sarama.Logger.Printf("[%s/%s] %s", cg.group.Name, identifier, fmt.Sprintf(format, args...)) } func (cg *ConsumerGroup) InstanceRegistered() (bool, error) { - return cg.instance.Registered() + if cg.existInstance() { + return cg.instance.Registered() + } + return false, errors.New("instance has nil value") } func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error { @@ -259,6 +271,7 @@ func (cg *ConsumerGroup) FlushOffsets() error { func (cg *ConsumerGroup) topicListConsumer(topics []string) { limiter := newDefaultLimiter() for { + // note whats the purpose of this being here when is called at the bottom of the for select { case <-cg.stopper: return @@ -280,9 +293,13 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { for _, topic := range topics { cg.wg.Add(1) + cg.nGoroutines += 1 go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors) } + //note let's just wait here for test instead: + cg.wg.Wait() + // note why does not wait here ! right after ! what if enters cg.stopper ? select { case <-ctx.Done(): cg.wg.Wait() @@ -291,15 +308,17 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { return case <-consumerChanges: - registered, err := cg.instance.Registered() - if err != nil { - cg.Logf("FAILED to get register status: %s\n", err) - } else if !registered { - err = cg.instance.Register(topics) + if cg.existInstance() { + registered, err := cg.instance.Registered() if err != nil { - cg.Logf("FAILED to register consumer instance: %s!\n", err) - } else { - cg.Logf("Consumer instance registered (%s).", cg.instance.ID) + cg.Logf("FAILED to get register status: %s\n", err) + } else if !registered { + err = cg.instance.Register(topics) + if err != nil { + cg.Logf("FAILED to register consumer instance: %s!\n", err) + } else { + cg.Logf("Consumer instance registered (%s).", cg.instance.ID) + } } } @@ -311,7 +330,12 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) { } func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.CancelFunc, topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { - defer cg.wg.Done() + defer func() { + // todo lock + cg.nGoroutines -= 1 + // todo unlock + cg.wg.Done() + }() select { case <-ctx.Done(): @@ -325,10 +349,14 @@ func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.Cance partitions, err := cg.kazoo.Topic(topic).Partitions() if err != nil { cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err) - cg.errors <- &sarama.ConsumerError{ - Topic: topic, - Partition: -1, - Err: err, + + _, ok := <-cg.errors + if ok { + cg.errors <- &sarama.ConsumerError{ + Topic: topic, + Partition: -1, + Err: err, + } } cancel() return @@ -336,18 +364,24 @@ func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.Cance partitionLeaders, err := retrievePartitionLeaders(partitions) if err != nil { - cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) - cg.errors <- &sarama.ConsumerError{ - Topic: topic, - Partition: -1, - Err: err, + _, ok := <-cg.errors + if ok { + cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) + cg.errors <- &sarama.ConsumerError{ + Topic: topic, + Partition: -1, + Err: err, + } } cancel() return } dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders) - myPartitions := dividedPartitions[cg.instance.ID] + var myPartitions []*kazoo.Partition + if cg.existInstance() { + myPartitions = dividedPartitions[cg.instance.ID] + } cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders)) // Consume all the assigned partitions @@ -401,35 +435,47 @@ partitionClaimLoop: case <-ctx.Done(): return case <-time.After(1 * time.Second): - if err := cg.instance.ClaimPartition(topic, partition); err == nil { - break partitionClaimLoop - } else if tries+1 < maxRetries { - if err == kazoo.ErrPartitionClaimedByOther { - // Another consumer still owns this partition. We should wait longer for it to release it. + if cg.existInstance() { + if err := cg.instance.ClaimPartition(topic, partition); err == nil { + break partitionClaimLoop + } else if tries+1 < maxRetries { + if err == kazoo.ErrPartitionClaimedByOther { + // Another consumer still owns this partition. We should wait longer for it to release it. + } else { + // An unexpected error occurred. Log it and continue trying until we hit the timeout. + cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err) + } } else { - // An unexpected error occurred. Log it and continue trying until we hit the timeout. - cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err) + cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) + _, ok := <-cg.errors + if ok { + cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) + cg.errors <- &sarama.ConsumerError{ + Topic: topic, + Partition: -1, + Err: err, + } + } + return } - } else { - cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) - cg.errors <- &sarama.ConsumerError{ - Topic: topic, - Partition: partition, - Err: err, - } - return } } } defer func() { - err := cg.instance.ReleasePartition(topic, partition) - if err != nil { - cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err) - cg.errors <- &sarama.ConsumerError{ - Topic: topic, - Partition: partition, - Err: err, + if cg.existInstance() { + err := cg.instance.ReleasePartition(topic, partition) + if err != nil { + cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err) + _, ok := <-cg.errors + if ok { + cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) + cg.errors <- &sarama.ConsumerError{ + Topic: topic, + Partition: -1, + Err: err, + } + } } } }()