-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer async partitions #437
Conversation
fc7feed
to
d5141e2
Compare
After several network errors and one (valid) race condition bug found, CI is finally green. |
Logger.Printf("consumer/%s/%d abandoned broker %d because %s\n", child.topic, child.partition, child.broker.broker.ID(), err) | ||
child.broker.acks <- child | ||
child.trigger <- none{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally don't like using fallthrough
like this, but I suppose it has always worked like this so no need to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternative is to duplicate these three lines twice (and the sendError
call one additional time). Using fallthrough
explicitly, (as opposed to the stupid implicit C version) seems much more elegant to me.
I don't want to derail the PR (as you mentioned, this code is just moved, not changed), but you've piqued my curiousity - what bothers you about this pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just way harder to parse in my mind, especially with the default
case not be the last one. I'd prefer a bunch of if
statements.
Thoughts on the third commit? It does make the behaviour less subtle, I think, though perhaps at the cost of another moving part... |
I like this approach better. Using a WaitGroup for waiting for a bunch of goroutines feels much more natural to me. I am not a big fan of the extra bool field on PartitionConsumer. Maybe we should make the channel hold a wrapper type with a (Rant: Go really should support multi-value channels so you can simply feed multi-value return values to a channel without having to wrap them in a shitty type.) |
Which channel? |
the |
There is no acks channel anymore, it's a wait group o.o |
Argh, I am an idiot. |
41464b3
to
9772a2c
Compare
Give each PartitionConsumer a goroutine whose job is to feed the user, and have the brokerConsumer feed each partition with the response, then wait for acks. This makes the consumer much more resilient to weird user consumption ordering, as the old "one partition at a time" logic was very unforgiving.
9772a2c
to
3bd5a52
Compare
OK, this version removes the
|
I think this makes sense, but we should stress test this as well. @shivnagarajan @thegedge, opinions on this? |
I will merge this now to prep master for stress testing, but feel free to continue commenting @shivnagarajan and @thegedge. |
Feed consumer responses to the user asynchronously
Give each PartitionConsumer a goroutine whose job is to feed the user, and have
the brokerConsumer feed each partition with the response, then wait for acks.
This makes the consumer much more resilient to weird user consumption ordering,
as the old "one partition at a time" logic was very unforgiving.
Fixes #325
Replaces #398
@Shopify/kafka careful 👀 on this please