-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lost messages in plainPartitionedSource on partition reassignment #382
Comments
@edrevo Could you please create an automated test that reproduces this issue ? |
I don't think so, unfortunately. The test case would need to have very fine-grained control over the execution timing of each actor and I wouldn't know how to control that in akka / akka-streams |
Could you try to make a branch with changes in "main" code - by adding thread.sleeps or even better latches - just to demonstrate the issue? Or maybe just the debug logs from the client when the issue actually happens? I was thinking about using |
@rafalmag here you have a very dirty repro of the problem. You should:
Keep in mind that the issue won't happen in every run of the tests, since it is a timing issue and the test isn't forcing any specific timing. |
I think I'm encountering the same issue using Consumer.commitablePartitionedSource. |
@edrevo I've tried the tests in your dirty repo with current code - both go green. Are they flaky or did something improve already? |
@enru, the timing conditions needed to repro the bug are quite tricky, so the test case is just "probabilistic", in the sense that it sets up the scenario for the bug to appear, but it might or might not appear. I've ran it in several machines, and it being able to repro the bug with that test is a hit or miss. |
Mitigate the problem of closing partitions with pending requests. Related to #382.
FWIW, I am no longer running into lost messages around partition rebalancing in my tests with Consumer.commitablePartitionedSource using 0.21 |
Thanks for reporting, I hope others experience the same! |
Hi I have very similar issue, that can be reproduced with simply close downstream for topicPartition source.
In log you will see that every time when if you uncomment i use 0.22 version of lib. |
TOday i check 1.0.4 version. And i can confirm, that issue was fixed. |
Thank you for reporting this. So the fix from #589 solved your case. |
I am currently seeing the following behavior, using reactive-kafka 0.18:
plainPartitionedSource
is created and joins the consumer group at 22:51:04plainPartitionedSource
is created and joins the consumer group at 22:51:13RequestMessages from topic/partition X already registered by other stage
)Upon inspecting the reactive-kafka's source code, here is what I think is happening in the first consumer:
partitionAssignedCB
(SubSourceLogic:78) which in turnspump
s and emits a Source for each partitionpull
, which sends a RequestMessages to the kafka consumer actor (SubSourceLogic:235), but still hasn't received the messages associated with that requestcompleteStage
(SubSourceLogic:222), regardless of whether it was mid-request (requested = true
) or not.This causes the Kafka consumer to mark the message in partition 0 as received, but nobody actually received it.
Does this sound like a reasonable explanation? cc @13h3r, @elkozmon, @patriknw, @rgcase since you are authors of the SubSouceLogic file.
I am happy to provide a PR to fix this, but I'm not sure of what the best solution is (I'm pretty new to Kafka and Akka Streams). Is it reasonable to emit all pending messages before closing the Source? Or is that a big no-no?
The text was updated successfully, but these errors were encountered: