Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix waitgroup sync issue #2

Merged
merged 2 commits into from
Apr 12, 2021
Merged

fix waitgroup sync issue #2

merged 2 commits into from
Apr 12, 2021

Conversation

rnov
Copy link

@rnov rnov commented Apr 12, 2021

No description provided.

@rnov rnov requested review from mitagg and karsov April 12, 2021 09:09
Copy link

@karsov karsov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work as expected, but I think it can be improved.

Comment on lines +468 to +480
if cg.existInstance() {
err := cg.instance.ReleasePartition(topic, partition)
if err != nil {
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err)
_, ok := <-cg.errors
if ok {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking: Using the inverse "if" condition in all the conditionals here would make the code less indented and easier to read.

Comment on lines 440 to 461
if cg.existInstance() {
if err := cg.instance.ClaimPartition(topic, partition); err == nil {
break partitionClaimLoop
} else if tries+1 < maxRetries {
if err == kazoo.ErrPartitionClaimedByOther {
// Another consumer still owns this partition. We should wait longer for it to release it.
} else {
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
}
} else {
// An unexpected error occurred. Log it and continue trying until we hit the timeout.
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err)
}
} else {
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: partition,
Err: err,
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err)
_, ok := <-cg.errors
if ok {
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err)
cg.errors <- &sarama.ConsumerError{
Topic: topic,
Partition: -1,
Err: err,
}
}
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking: Using the inverse "if" condition in all the conditionals here would make the code less indented and easier to read.

@@ -238,13 +247,18 @@ func (cg *ConsumerGroup) Logf(format string, args ...interface{}) {
if cg.instance == nil {
identifier = "(defunct)"
} else {
identifier = cg.instance.ID[len(cg.instance.ID)-12:]
if cg.existInstance() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case where cg.instance != nil so this check is redundant.

@@ -259,6 +273,7 @@ func (cg *ConsumerGroup) FlushOffsets() error {
func (cg *ConsumerGroup) topicListConsumer(topics []string) {
limiter := newDefaultLimiter()
for {
// note whats the purpose of this being here when is called at the bottom of the for
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most probably the purpose is to not start a new consumer if a stop request is received already.

go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors)
}
//note let's just wait here for test instead:
cg.wg.Wait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest us to remove this wait here as it prevents us to receive messages on the consumerChanges channel.


// note why does not wait here ! right after ! what if enters cg.stopper ?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cg.stopper message would cause a cancel of the ctx context, which would then cause a ctx.Done() message which already does a cg.wg.Wait().

@@ -77,6 +77,7 @@ type ConsumerGroup struct {
instance *kazoo.ConsumergroupInstance

wg sync.WaitGroup
nGoroutines int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (non-blocking):nGoroutines was added while debugging, as it does not bring any additional value we can remove it now.

return nil, err
} else {
cg.Logf("Consumer instance registered (%s).", cg.instance.ID)
if cg.existInstance() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: ‏We use cg.existInstance() as a check before we use cg.instance without guarding with mutexes, so it seems to me that it is possible cg.existInstance() to return true, another goroutine to set cg.instance to nil and then when we invoke cg.instance.Register(topics ) we panic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a zero sum, and it'd not work because we would be locking the check but not where is being overwritten,(actually if we knew we would not need this check) till this moment we still don't know where this happens, this is a last resort solution, that works by luck.

@rnov rnov merged commit eedf486 into master Apr 12, 2021
@rnov rnov deleted the fix/UPPSF-2236-sync-on-wait-group branch April 12, 2021 13:26
karsov pushed a commit that referenced this pull request Apr 27, 2021
…c-on-wait-group"

This reverts commit eedf486, reversing
changes made to 8a52393.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants