Skip to content

Commit

Permalink
Merge pull request #2 from Financial-Times/fix/UPPSF-2236-sync-on-wai…
Browse files Browse the repository at this point in the history
…t-group

fix waitgroup sync issue
  • Loading branch information
rnov authored Apr 12, 2021
2 parents 8a52393 + 24eeac1 commit eedf486
Showing 1 changed file with 99 additions and 53 deletions.
152 changes: 99 additions & 53 deletions consumergroup/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ConsumerGroup struct {
instance *kazoo.ConsumergroupInstance

wg sync.WaitGroup
nGoroutines int
singleShutdown sync.Once

messages chan *sarama.ConsumerMessage
Expand All @@ -88,6 +89,10 @@ type ConsumerGroup struct {
offsetManager OffsetManager
}

func (cg *ConsumerGroup) existInstance() bool {
return cg.instance != nil
}

// Connects to a consumer group, using Zookeeper for auto-discovery
func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error) {

Expand Down Expand Up @@ -172,11 +177,13 @@ func JoinConsumerGroup(name string, topics []string, zookeeper []string, config
}

// Register itself with zookeeper
if err := cg.instance.Register(topics); err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
if cg.existInstance() {
if err := cg.instance.Register(topics); err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}
}

offsetConfig := OffsetManagerConfig{CommitInterval: config.Offsets.CommitInterval}
Expand Down Expand Up @@ -215,16 +222,18 @@ func (cg *ConsumerGroup) Close() error {
cg.Logf("FAILED closing the offset manager: %s!\n", err)
}

if shutdownError = cg.instance.Deregister(); shutdownError != nil {
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
} else {
cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID)
if cg.existInstance() {
if shutdownError = cg.instance.Deregister(); shutdownError != nil {
cg.Logf("FAILED deregistering consumer instance: %s!\n", shutdownError)
} else {
cg.Logf("Deregistered consumer instance %s.\n", cg.instance.ID)
}
}

if shutdownError = cg.consumer.Close(); shutdownError != nil {
cg.Logf("FAILED closing the Sarama client: %s\n", shutdownError)
}

cg.Logf("closing cg channels and instance, remaining goroutines: %d\n", cg.nGoroutines)
close(cg.messages)
close(cg.errors)
cg.instance = nil
Expand All @@ -238,13 +247,16 @@ func (cg *ConsumerGroup) Logf(format string, args ...interface{}) {
if cg.instance == nil {
identifier = "(defunct)"
} else {
identifier = cg.instance.ID[len(cg.instance.ID)-12:]
identifier = cg.instance.ID[len(cg.instance.ID)-12:]
}
sarama.Logger.Printf("[%s/%s] %s", cg.group.Name, identifier, fmt.Sprintf(format, args...))
}

func (cg *ConsumerGroup) InstanceRegistered() (bool, error) {
return cg.instance.Registered()
if cg.existInstance() {
return cg.instance.Registered()
}
return false, errors.New("instance has nil value")
}

func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error {
Expand All @@ -259,6 +271,7 @@ func (cg *ConsumerGroup) FlushOffsets() error {
func (cg *ConsumerGroup) topicListConsumer(topics []string) {
limiter := newDefaultLimiter()
for {
// note whats the purpose of this being here when is called at the bottom of the for
select {
case <-cg.stopper:
return
Expand All @@ -280,9 +293,13 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {

for _, topic := range topics {
cg.wg.Add(1)
cg.nGoroutines += 1
go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors)
}
//note let's just wait here for test instead:
cg.wg.Wait()

// note why does not wait here ! right after ! what if enters cg.stopper ?
select {
case <-ctx.Done():
cg.wg.Wait()
Expand All @@ -291,15 +308,17 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
return

case <-consumerChanges:
registered, err := cg.instance.Registered()
if err != nil {
cg.Logf("FAILED to get register status: %s\n", err)
} else if !registered {
err = cg.instance.Register(topics)
if cg.existInstance() {
registered, err := cg.instance.Registered()
if err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
cg.Logf("FAILED to get register status: %s\n", err)
} else if !registered {
err = cg.instance.Register(topics)
if err != nil {
cg.Logf("FAILED to register consumer instance: %s!\n", err)
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
}
}
}

Expand All @@ -311,7 +330,12 @@ func (cg *ConsumerGroup) topicListConsumer(topics []string) {
}

func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.CancelFunc, topic string, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer cg.wg.Done()
defer func() {
// todo lock
cg.nGoroutines -= 1
// todo unlock
cg.wg.Done()
}()

select {
case <-ctx.Done():
Expand All @@ -325,29 +349,39 @@ func (cg *ConsumerGroup) topicConsumer(ctx context.Context, cancel context.Cance
partitions, err := cg.kazoo.Topic(topic).Partitions()
if err != nil {
cg.Logf("%s :: FAILED to get list of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,

_, ok := <-cg.errors
if ok {
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
cancel()
return
}

partitionLeaders, err := retrievePartitionLeaders(partitions)
if err != nil {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
_, ok := <-cg.errors
if ok {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
cancel()
return
}

dividedPartitions := dividePartitionsBetweenConsumers(cg.consumers, partitionLeaders)
myPartitions := dividedPartitions[cg.instance.ID]
var myPartitions []*kazoo.Partition
if cg.existInstance() {
myPartitions = dividedPartitions[cg.instance.ID]
}
cg.Logf("%s :: Claiming %d of %d partitions", topic, len(myPartitions), len(partitionLeaders))

// Consume all the assigned partitions
Expand Down Expand Up @@ -401,35 +435,47 @@ partitionClaimLoop:
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break partitionClaimLoop
} else if tries+1 < maxRetries {
if err == kazoo.ErrPartitionClaimedByOther {
// Another consumer still owns this partition. We should wait longer for it to release it.
if cg.existInstance() {
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break partitionClaimLoop
} else if tries+1 < maxRetries {
if err == kazoo.ErrPartitionClaimedByOther {
// Another consumer still owns this partition. We should wait longer for it to release it.
} else {
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
}
} else {
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
_, ok := <-cg.errors
if ok {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
return
}
} else {
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: partition,
Err: err,
}
return
}
}
}

defer func() {
err := cg.instance.ReleasePartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: partition,
Err: err,
if cg.existInstance() {
err := cg.instance.ReleasePartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
_, ok := <-cg.errors
if ok {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
}
}
}()
Expand Down

0 comments on commit eedf486

Please sign in to comment.