-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Await commits during a rebalance #1098
Conversation
@svroonland @guizmaii Perhaps we should offer this as an experimental feature at first. WDYT? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I don't see major issues besides that we need to carefully consider the same-thread behavior changes, a few comments/questions for now.
currentStateRef: Ref[State], | ||
committedOffsetsRef: Ref[CommitOffsets], | ||
fetchStrategy: FetchStrategy | ||
) { | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any other kafka consumer or kafka broker settings that limit the acceptable values here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know. I am just guessing here. I will go through the Kafka configurations and see if this is configurable in the first place. It might be a server configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found it. scheduled.rebalance.max.delay.ms
The maximum delay that is scheduled in order to wait for the return of one or more departed workers before rebalancing and reassigning their connectors and tasks to the group. During this period the connectors and tasks of the departed workers remain unassigned
Default: 300000 (5 minutes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm maybe not. That is a Kafka connect config 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot find a setting that could configure the timeout.
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the `Diagnostics` trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues: - the rebalance events are replace by a single event which is emitted from outside the rebalance listener, - all invocations of the diagnostics trait are forked (unless they are run from a finalizer).
It seems we have a new flaky test:
Update: this is resolved. |
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Outdated
Show resolved
Hide resolved
One thing I want to carefully review is off-by-one errors in the offsets. At some moment consumed offsets are translated into commitable offset (meaning we do +1). In the rebalance listener those all come together and on off-by-one error is easy to make. |
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the Diagnostics trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events. In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance. **Breaking change** Since the rebalance events are changed, this is a breaking change.
Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the Diagnostics trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events. In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance. **Breaking change** Since the rebalance events are changed, this is a breaking change.
cb08b83
to
90d3613
Compare
This is now done and indeed there was a bug which is now fixed. I have marked the new mode as experimental. @guizmaii @svroonland When the test complete (in ~20 minutes), this PR is ready for merging. |
90d3613
to
a0c4c72
Compare
Haha, I cheered too early. Working on it... 😄 |
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
a0c4c72
to
1312456
Compare
This has been fixed. |
Okay, now this PR is ready for merging: the tests have run several times without issues, the PR description has been groomed and polished again. There is one outstanding comment (about configuration parameters) but I cannot answer that question (I did try). By marking this feature as experimental we can collect feedback about this (and other things) from the adventurous users and not disappoint the users that need more stability. @svroonland @guizmaii Please review again. |
@@ -73,6 +75,120 @@ private[consumer] final class Runloop private ( | |||
commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit | |||
|
|||
private val rebalanceListener: RebalanceListener = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should extract a class to a separate file here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am okay with that. We do need to accept that the diff becomes harder to read though. Or we do it in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, separate PR is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@svroonland I tried to move the rebalance listener to a separate file. Unfortunately they are so integrated that it would require a lot of refactoring. I don't think it is feasible.
Fixes #590 "Many records duplicately processed after rebalancing"
In this change we introduce a new mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed.
Motivation
Here is a common (single partition) scenario around rebalances:
Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable.
After merging this change, the scenario will unfold as follows:
Commit queue
Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command
CommitAvailable
was introduced.Complications
Same thread executor
The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement (with KIP-944) failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as
ZIO.timeout
and anything withSchedules
will still shift work to another thread. We work around this by using blocking time and checking whether aPromise
is done before awaiting it.Collateral
This change also:
private
andfinal
Future
In the future KIP-983 might become available. Once it is, we can dramatically simplify the implementation of this feature.
Related
The same issue is present in:
In fact, every program that does polls and commits asynchronously is likely affected.
Non-goals
This change does not try to solve the following goals. However, these can be addressed in future PRs.
This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089, #1097 and #1102.