-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Repeated records after rebalance #1038
Comments
An idea that I had as a (hopefully) quite quick and easy way to resolve this would be to tag records with a "generation id" of the subscription that they were read based on. This would involve using an The logic to filter out records from revoked partitions in I'd be happy to put together a PR for this, but would first like some feedback on the general approach that I'm proposing here (i.e. does this sound right, and are there any obvious problems with it?) |
Hi @gabrielreid. Yes, this is a recurring issue for users that often comes up. As you mentioned, users can introduce an arbitrary number of asynchronous boundaries in their graph, which makes it very difficult to invalidate inflight messages for revoked partitions without a lot of extra intervention and book keeping. This has been partially tackled in our preliminary transactional support, but that's a more specific use case where we can make more assumptions and assign more caveats to how users construct their streams to achieve EoS guarantees. I think your idea is worth exploring. Just so that I understand: this is only a solution to make invalidation of buffered messages more robust in the |
Yes, correct, I'm just talking about clearing the internally-buffered messages in the |
Ok, I'm looking forward to seeing an implementation. |
hi, is there any known workaround to this? like disable internal buffer? I'm batch processing big chunks of messages and pushing them to HDFS and getting loads of duplicates when re-balance happens. |
The issue is that in-flight messages can't be invalidated once they leave Alpakka Kafka stages in the graph. Due to the asynchronous nature of akka streams, it's possible that there are some messages from revoked partitions downstream of the consumer. In transactional streams we expect a consume, transform, produce workflow, so even though messages are processed by non-Alpakka Kafka stages, they eventually reach the producer where messages can be invalidated. There are ways tackle this issue, but it would require something like what @gabrielreid proposed initially in this issue as well as some user cooperation to filter out in-flight messages in downstream stages. |
Has anyone found any feasible workaround for this? I saw there's an emergency commit introduced in this PR but throwing an exception from |
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: fd4s/fs2-kafka#1200 - alpakka-kafka: akka/alpakka-kafka#1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097.
This issue was reported to be fixed by using the
This is also mentioned in our docs: https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#settings |
Reopening, since it seems that CooperativeStickyAssignor does not help if a partition gets revoked, but helps the consumer that gets get an partition assigned. |
Versions used
2.0.1
Expected Behavior
Assuming external (or kafka-internal) commits are processed properly and loaded properly, it is expected that we will get exactly-once processing of records based on the committed offsets, even over rebalances.
Actual Behavior
Incoming records are currently buffered within alpakka-kafka, and then distributed to streams based on demand. With automatic kafka-based partitionining, when a rebalance occurs, the buffered records are not discarded, and are instead still distributed downstream. The internal consumer then seeks to the committed offset (which will typically be earlier than the records in the buffer) and starts reading, which results in the same records being passed into the stream a second time.
This is particularly problematic if commits are stored in an external system (i.e. using
Consumer.plainPartitionedManualOffsetSource
), and commits only occur every 100 or 100 records for a given partition, for example. In this case, the following sequence of events can occur:There is currently logic to filter out buffered records from a revoked partition from the buffer, but no such logic for partitions which have been revoked and then directly assigned again as the result of a rebalance.
I realize that it's probably not feasible to fully resolve this issue, as records are asynchronously passed into the stream (as described in the docs in
SourceLogicBuffer.scala
, but I do believe that the current situation (where hundreds or thousands of records could be passed into a stream a second time) can be greatly improved.This can/will also probably be resolved by using the incremental rebalance protocol (as pointed out in #790), although there might also be a less invasive option in the short term.
The text was updated successfully, but these errors were encountered: