-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
committablePartitionedManualOffsetSource does not use getOffsetsOnAssign on rebalance #1086
Comments
Thanks for raising this issue @francisdb. Are you saying that |
No for existing assignments that get revoked and directly assigned again there is this seek to last commit. |
For partitions that were already assigned to the same consumer group member? |
Yes Logs indicate for example: 4 & 5 start properly with new stream after call to getOffsetsOnAssign(1 2 3 4 5) |
Got it. Is this something you would like to take on? If not, I can add it to my backlog. |
To be honest I have a hard time understanding all those callbacks and internal state handling. |
Investigated a bit more and I see that on an assign the kafka driver automatically seeks to the last committed offset for all assigned partitions: if assign 1 2 3 4 5 we get this logged for all partitions, even before getOffsetsOnAssign([1,2,3,4,5]) is called
|
Yes, we do some optimizations here as well for the committable partitioned sources when partitions are reassigned back to the same group member, so that they don't replay too many messages that may be "in flight" within the stream. For |
@francisdb I did some digging into this and I'm thinking about how to proceed. I've discovered that the I have a few questions for you:
|
We have a hosted kafka solution in the same cloud datacenter (Confluent Cloud)
and ConsumerSettings(actorSystem,...,...)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
We store snapshots (external offset store) every 1000 messages to a object store
I inherited this project and am not sure why we chose lazy val committerSettings: CommitterSettings = CommitterSettings(actorSystem).withMaxInterval(2.seconds)
val operationResultMessageSink = Producer
.committableSink(runtimeOperationFeedbackProducerSettings, committerSettings)
.withAttributes(ActorAttributes.supervisionStrategy {
case e: RetriableCommitFailedException =>
logger.warn(s"Ignoring RetriableCommitFailedException for RuntimeOperationTopic: ${e.getMessage}")
Supervision.Resume
case e: CommitFailedException if e.getMessage.contains("already rebalanced") =>
// Commit cannot be completed since the group has already rebalanced and assigned the partitions to another
// member. This means that the time between subsequent calls to poll() was longer than the
// configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time
// message processing. You can address this either by increasing max.poll.interval.ms or by reducing
// the maximum size of batches returned in poll() with max.poll.records.
// TODO should normally be fixed but maybe not for the setup we use
// https://github.com/akka/alpakka-kafka/issues/755
logger.warn(s"Ignoring CommitFailedException when rebalanced for RuntimeOperationTopic: ${e.getMessage}")
Supervision.Resume
case _ => Supervision.Stop
}) |
Thanks for the information @francisdb.
The I acknowledge that |
We need both offsets as storing snapshots is rather expensive and we need to know until how far we have processed messages past the snapshot. (to avoid duplicated side-effects) This is a typical eventsourcing solution for when state is kept for a long time (months) and a lot of messages have to be handled. (we are not that Lightbend client) |
Got it. Is there any reason why you chose not to store/commit offsets in external storage too, apart from your occasional snapshot? |
I missed this earlier question. With the plain sources offsets aren't committed back to Kafka. The user stores them somewhere else and either uses the https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#offset-storage-external-to-kafka |
Those snapshots are not occasional. We handle a lot of messages and 1000 messages on a partition are not a lot so snasphots are being written frequently. Regarding the storing I guess there is no reason not to store them in Kafka? That would require an extra stateful datastore which we try to avoid. (snapshots are stored on s3/google cloud storage) To be honest we have not yet reached a stable point where we can start tweaking partitions/snapshot/commit intervals. We even decided to put our kafka efforts aside for a while because of this issue. As long as we have a single consumer everything works perfectly but once we get rebalances we end up in a mess. |
@francisdb Is it possible for you to produce a snapshot and try out the fix in #1100? |
Will try to have our test system running with this |
@francisdb We're planning an Alpakka Kafka release this week, so it would be a good opportunity to get this solved soon. Have you had a chance to see if this works for you? |
Not yet, we have a fork witch some changes regarding #1066 that I need to update first. But it's next on my todo list so should not take long. |
Ok, thanks! |
and we still have newer messages arriving after re-assign (could be related to #1038)
|
@francisdb Thanks for the update. I'll dig into this some more later, but it may not be before the next release. One question regarding your log view from partition 2:
Is this ^ the logging from Kafka's
|
these are the details of that log line:
I was thinking to come up with a small project that reproduces this issue as I understand it's not easy to debug like this, however that might take quite some of my time |
Thanks. Does this log line always occur after the similar Alpakka Kafka logging
That probably isn't necessary. I thought I constructed a test that reproduced this reliably, but maybe I misread the ordering of seeks between Kafka Consumer internals and the Alpakka Kafka logic. I thought this was caused by a timing issue of Alpakka Kafka seeking to external offsets after a rebalance was complete. I'm not sure if there's any way to know when the Consumer is doing this internally since it happens after the rebalance. I'll have to dig more to see if it's truly a background thread or triggered in the first poll after a rebalance. If it's the latter maybe we check after an empty poll if the seek position was reset, and then seek to external offsets before polling again. |
we don't have debug logging enabled on our staging system so can't tell for now |
Do you need me to do another run with debug logging enabled? |
@francisdb Sure, that would be helpful for me to assert that I recreated your use case. |
Closing as nobody else seems to have this issue and we are no longer using alpakka-kafka for this use case |
@francisdb question how did you do |
We are now using the native blocking client which allows loading snapshots and store our snapshots with offset on a cloud object store. This is what currently works for us We have a snapshot loading ConsumerRebalanceListener that:
The consumer:
We were blocked with alpakka kafka on:
|
thank you @francisdb |
Versions used
Expected Behavior
We do manual offset management where we start from the last snapshot offset and
replay
without side-effects to the committed offset, then we enable side-effects.We expect the stream to have a monotonic progress where offsets increment one by one. In case the offset goes back we ignore that. If the offset takes a step of > 1 we throw an error as this indicates we are skipping messages.
phases:
Actual Behavior
On a rebalance during the replay phase (4) and the partition is revoked/reassigned we see that alpakka-kafka keeps the stream open and seeks to the last committed offset (y) which skips a lot of messages and messes up the replay.
This is a common case on redeploy where we have some successive rebalances as nodes are replaced.
Relevant logs
How to solve this?
Does the above behavior make sense for manual offset handling?
Is this configurable?
Without optimizations we could just go for the simple solution of not keeping the stream open on quick revoke/assign.
Any other suggestions?
The text was updated successfully, but these errors were encountered: