-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix waitgroup sync issue #2
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to work as expected, but I think it can be improved.
if cg.existInstance() { | ||
err := cg.instance.ReleasePartition(topic, partition) | ||
if err != nil { | ||
cg.Logf("%s/%d :: FAILED to release partition: %s\n", topic, partition, err) | ||
_, ok := <-cg.errors | ||
if ok { | ||
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) | ||
cg.errors <- &sarama.ConsumerError{ | ||
Topic: topic, | ||
Partition: -1, | ||
Err: err, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicking: Using the inverse "if" condition in all the conditionals here would make the code less indented and easier to read.
consumergroup/consumer_group.go
Outdated
if cg.existInstance() { | ||
if err := cg.instance.ClaimPartition(topic, partition); err == nil { | ||
break partitionClaimLoop | ||
} else if tries+1 < maxRetries { | ||
if err == kazoo.ErrPartitionClaimedByOther { | ||
// Another consumer still owns this partition. We should wait longer for it to release it. | ||
} else { | ||
// An unexpected error occurred. Log it and continue trying until we hit the timeout. | ||
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err) | ||
} | ||
} else { | ||
// An unexpected error occurred. Log it and continue trying until we hit the timeout. | ||
cg.Logf("%s/%d :: FAILED to claim partition on attempt %v of %v; retrying in 1 second. Error: %v", topic, partition, tries+1, maxRetries, err) | ||
} | ||
} else { | ||
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) | ||
cg.errors <- &sarama.ConsumerError{ | ||
Topic: topic, | ||
Partition: partition, | ||
Err: err, | ||
cg.Logf("%s/%d :: FAILED to claim the partition: %s\n", topic, partition, err) | ||
_, ok := <-cg.errors | ||
if ok { | ||
cg.Logf("%s :: FAILED to get leaders of partitions: %s\n", topic, err) | ||
cg.errors <- &sarama.ConsumerError{ | ||
Topic: topic, | ||
Partition: -1, | ||
Err: err, | ||
} | ||
} | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicking: Using the inverse "if" condition in all the conditionals here would make the code less indented and easier to read.
consumergroup/consumer_group.go
Outdated
@@ -238,13 +247,18 @@ func (cg *ConsumerGroup) Logf(format string, args ...interface{}) { | |||
if cg.instance == nil { | |||
identifier = "(defunct)" | |||
} else { | |||
identifier = cg.instance.ID[len(cg.instance.ID)-12:] | |||
if cg.existInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the case where cg.instance != nil
so this check is redundant.
@@ -259,6 +273,7 @@ func (cg *ConsumerGroup) FlushOffsets() error { | |||
func (cg *ConsumerGroup) topicListConsumer(topics []string) { | |||
limiter := newDefaultLimiter() | |||
for { | |||
// note whats the purpose of this being here when is called at the bottom of the for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most probably the purpose is to not start a new consumer if a stop
request is received already.
go cg.topicConsumer(ctx, cancel, topic, cg.messages, cg.errors) | ||
} | ||
//note let's just wait here for test instead: | ||
cg.wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest us to remove this wait here as it prevents us to receive messages on the consumerChanges
channel.
|
||
// note why does not wait here ! right after ! what if enters cg.stopper ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A cg.stopper
message would cause a cancel of the ctx
context, which would then cause a ctx.Done()
message which already does a cg.wg.Wait()
.
@@ -77,6 +77,7 @@ type ConsumerGroup struct { | |||
instance *kazoo.ConsumergroupInstance | |||
|
|||
wg sync.WaitGroup | |||
nGoroutines int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (non-blocking): nGoroutines
was added while debugging, as it does not bring any additional value we can remove it now.
return nil, err | ||
} else { | ||
cg.Logf("Consumer instance registered (%s).", cg.instance.ID) | ||
if cg.existInstance() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: We use cg.existInstance()
as a check before we use cg.instance
without guarding with mutexes, so it seems to me that it is possible cg.existInstance()
to return true, another goroutine to set cg.instance
to nil and then when we invoke cg.instance.Register(topics )
we panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a zero sum, and it'd not work because we would be locking the check but not where is being overwritten,(actually if we knew we would not need this check) till this moment we still don't know where this happens, this is a last resort solution, that works by luck.
No description provided.