Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost messages in plainPartitionedSource on partition reassignment #382

Closed
edrevo opened this issue Dec 18, 2017 · 12 comments
Closed

Lost messages in plainPartitionedSource on partition reassignment #382

edrevo opened this issue Dec 18, 2017 · 12 comments
Milestone

Comments

@edrevo
Copy link
Contributor

edrevo commented Dec 18, 2017

I am currently seeing the following behavior, using reactive-kafka 0.18:

  1. I am sending 2 messages to a Kafka topic, which fall in two different partitions (partition 0 and partition 3)
  2. A Kafka plainPartitionedSource is created and joins the consumer group at 22:51:04
  3. A second Kafka plainPartitionedSource is created and joins the consumer group at 22:51:13
  4. A partition rebalance happens between 22:51:28 and 22:51:34 (I see several RequestMessages from topic/partition X already registered by other stage)
  5. Partition 0 stays with the first consumer
  6. Partition 3 moves to the second consumer
  7. The message to partition 0 is never emitted by the Source
  8. The message to partition 3 is correctly emitted by the second consumer

Upon inspecting the reactive-kafka's source code, here is what I think is happening in the first consumer:

  1. All partitions are assigned to the consumer, which triggers partitionAssignedCB (SubSourceLogic:78) which in turns pumps and emits a Source for each partition
  2. The Source for partition 0 receives a pull, which sends a RequestMessages to the kafka consumer actor (SubSourceLogic:235), but still hasn't received the messages associated with that request
  3. The new consumer joins the consumer group, and Kafka first revokes all partitions and then assigns the new ones.
  4. Revoking of all partitions causes the Source for partition 0 to be cancelled (SubSourceLogic:99)
  5. The Source for partition 0 calls completeStage (SubSourceLogic:222), regardless of whether it was mid-request (requested = true) or not.
  6. The Kafka actor performs a poll for partition 0, since it has a request for it. And sends the resulting messages to the Source that was just closed (KafkaConsumerActor:346)

This causes the Kafka consumer to mark the message in partition 0 as received, but nobody actually received it.

Does this sound like a reasonable explanation? cc @13h3r, @elkozmon, @patriknw, @rgcase since you are authors of the SubSouceLogic file.

I am happy to provide a PR to fix this, but I'm not sure of what the best solution is (I'm pretty new to Kafka and Akka Streams). Is it reasonable to emit all pending messages before closing the Source? Or is that a big no-no?

@rafalmag
Copy link
Contributor

rafalmag commented Jan 5, 2018

@edrevo Could you please create an automated test that reproduces this issue ?

@edrevo
Copy link
Contributor Author

edrevo commented Jan 9, 2018

I don't think so, unfortunately. The test case would need to have very fine-grained control over the execution timing of each actor and I wouldn't know how to control that in akka / akka-streams

@rafalmag
Copy link
Contributor

Could you try to make a branch with changes in "main" code - by adding thread.sleeps or even better latches - just to demonstrate the issue? Or maybe just the debug logs from the client when the issue actually happens?
If the issue is reproducible then it would be a huge issue as "at least once" guarantee would be gone.

I was thinking about using plainPartitionedSource to couple it with custom transactional producers and try to achieve "exactly once" kafka o kafka processing (something similar to Kafka Streams - https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/)

edrevo added a commit to edrevo/reactive-kafka that referenced this issue Jan 11, 2018
edrevo added a commit to edrevo/reactive-kafka that referenced this issue Jan 11, 2018
edrevo added a commit to edrevo/reactive-kafka that referenced this issue Jan 11, 2018
@edrevo
Copy link
Contributor Author

edrevo commented Jan 11, 2018

@rafalmag here you have a very dirty repro of the problem. You should:

git clone git@github.com:edrevo/reactive-kafka.git
cd reactive-kafka
git checkout issue-382
sbt "testOnly *MyTest*"

Keep in mind that the issue won't happen in every run of the tests, since it is a timing issue and the test isn't forcing any specific timing.

@asflierl
Copy link

I think I'm encountering the same issue using Consumer.commitablePartitionedSource.

@ennru
Copy link
Member

ennru commented May 23, 2018

@edrevo I've tried the tests in your dirty repo with current code - both go green. Are they flaky or did something improve already?

@edrevo
Copy link
Contributor Author

edrevo commented May 23, 2018

@enru, the timing conditions needed to repro the bug are quite tricky, so the test case is just "probabilistic", in the sense that it sets up the scenario for the bug to appear, but it might or might not appear. I've ran it in several machines, and it being able to repro the bug with that test is a hit or miss.

ennru pushed a commit that referenced this issue May 29, 2018
Mitigate the problem of closing partitions with pending requests. Related to #382.
@asflierl
Copy link

asflierl commented Jun 8, 2018

FWIW, I am no longer running into lost messages around partition rebalancing in my tests with Consumer.commitablePartitionedSource using 0.21

@ennru
Copy link
Member

ennru commented Jun 8, 2018

Thanks for reporting, I hope others experience the same!

@GrigorievNick
Copy link

GrigorievNick commented Aug 10, 2018

Hi I have very similar issue, that can be reproduced with simply close downstream for topicPartition source.

(0 to 100000).foreach(action => producer.send(new ProducerRecord(topic, action.toString)))
    producer.flush()
    Consumer
      .committablePartitionedSource[String, String](ConsumerSettings(system, None, None), Subscriptions.topics(topic))
      .log(topic)
      .delay(FiniteDuration(10, TimeUnit.MILLISECONDS))
      .runForeach {
        case (tp, source) =>
          source
            .map(_.record.offset())
            .log(tp.toString)
//            .map(_ => throw new IllegalArgumentException)
            .take(10)
            .runWith(Sink.ignore)
      }
    Thread.sleep(30000)

In log you will see that every time when source complete, next partitioned started from offset max.poll.records + 1 in case max.poll.records > 10.

if you uncomment map with exception, situation will be same, except it will not read 10 messages.

i use 0.22 version of lib.

@GrigorievNick
Copy link

TOday i check 1.0.4 version. And i can confirm, that issue was fixed.
And 99% precent that Consumer might skip offsets #336, also fixed.

@ennru ennru added this to the 1.0-M1 milestone Jul 3, 2019
@ennru
Copy link
Member

ennru commented Jul 3, 2019

Thank you for reporting this. So the fix from #589 solved your case.
Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants