Skip to content

Commit

Permalink
fix flaky KafkaSourceProviderImplSpec (#1127)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam authored Feb 21, 2024
1 parent 96b8577 commit d91efe2
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt

"The KafkaSourceProviderImpl" must {

// FIXME disabled because failing a lot #641
"successfully verify offsets from assigned partitions" ignore {
"successfully verify offsets from assigned partitions" in {
val topic = "topic"
val partitions = 2
val settings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
Expand All @@ -62,9 +61,10 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt
val metadataClient = new TestMetadataClientAdapter(partitions)
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val totalPerPartition = 10

val consumerRecords =
for (n <- 0 to 10; tp <- List(tp0, tp1))
for (n <- 0 to totalPerPartition; tp <- List(tp0, tp1))
yield new ConsumerRecord(tp.topic(), tp.partition(), n, n.toString, n.toString)

val consumerSource = Source(consumerRecords)
Expand Down Expand Up @@ -101,6 +101,12 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt
records.count(_.partition() == tp1.partition()) shouldBe 5
}

// because source push to handle(probe) before sinkProbe request pull, it made probe cache random one record
val eagerMessage = probe.receiveMessage()
records = records ++ Set(eagerMessage)
val tp0Received = records.count(_.partition() == tp0.partition())
val tp0Expect = totalPerPartition - tp0Received

// assign only tp0 to this projection
provider.partitionHandler.onAssign(Set(tp0), null)
provider.partitionHandler.onRevoke(Set(tp1), null)
Expand All @@ -112,10 +118,10 @@ class KafkaSourceProviderImplSpec extends ScalaTestWithActorTestKit with LogCapt
// only records from partition 0 should remain, because the rest were filtered
sinkProbe.request(5)
sinkProbe.expectNextN(5)
records = probe.receiveMessages(5)
records = probe.receiveMessages(tp0Expect)

withClue("checking: after rebalance processed records should only have records from partition 0") {
records.count(_.partition() == tp0.partition()) shouldBe 5
records.count(_.partition() == tp0.partition()) shouldBe tp0Expect
records.count(_.partition() == tp1.partition()) shouldBe 0
}
}
Expand Down

0 comments on commit d91efe2

Please sign in to comment.