-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rebalance: filter messages of revoked partitions (partitioned) #992
Rebalance: filter messages of revoked partitions (partitioned) #992
Conversation
The original This may be another candidate for using the new blocking partition rebalance implementation. 1/10 failures when run on master
|
ab69dbb
to
e707964
Compare
c3b1201
to
34da701
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to factor out the buffer and the subscription!
Trolling the demand seems dangerous, please make it log a reason when switching.
core/src/main/scala/akka/kafka/internal/PartitionAssignmentHelpers.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/SourceLogicBuffer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/akka/kafka/internal/SourceLogicSubscription.scala
Outdated
Show resolved
Hide resolved
* duplicate messages sent downstream. | ||
*/ | ||
@InternalApi | ||
private trait SourceLogicBuffer[K, V, Msg] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does a private
trait mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I'm surprised it doesn't introduce a compiler error TBH. I switched it to package protected like your DeferredProducer
trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some more of these BTW. PromiseControl
is private.
It seems this isn't actually necessary. I'll remove it. I originally tried this before introducing the async callback, which appears to negate the message processing concurrency issue as well. |
8771233
to
f221da1
Compare
f221da1
to
f86bc1d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Purpose
A solution to the same issue as described in #872, but for partitioned sources
References
Changes
AlpakkaAssignor
test class to deterministically control which partitions are assigned to which consumer group members inRebalanceSpec
SourceLogicBuffer
to manage source buffers consistently across single and partitioned sourcesSourceLogicSubscription
to manage rebalance handler events consistently across single and partitioned sourcesRebalanceSpec
to demonstrate/resolve issue. Use defaultmax.poll.records
to reproduce failure before fix