Skip to content

Commit

Permalink
Accept different order of partitions when expecting messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Aug 19, 2019
1 parent 704c4b8 commit 3825f5f
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside {
"Fetched records" must {

// The `max.poll.records` controls how many records Kafka fetches internally during a poll.
"do actually show even if partition is revoked" in assertAllStagesStopped {
"actually show even if partition is revoked" in assertAllStagesStopped {
val count = 200
// de-coupling consecutive test runs with crossScalaVersions on Travis
val topicSuffix = Random.nextInt()
Expand Down Expand Up @@ -86,13 +86,19 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside {
val messages1p0 = messages1.filter(_.partition() == partition0)
val messages1p1 = messages1.filter(_.partition() == partition1)

// messages1p0 shouldBe Symbol("empty")
messages1p0 should have size (1)
messages1p1 should have size (count.toLong)
messages2 should have size (count.toLong)
if (messages1p0.size == 1) { // this depending on which partition gets issued "first" during poll
// this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued
messages1p0 should have size (1)
messages1p1 should have size (count.toLong)

messages1p0 should have size (1)
messages1p0.head.partition() shouldBe partition0
messages2 should have size (count.toLong)
} else {
// this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued
messages1p0 should have size (count.toLong)
messages1p1 should have size (1)

messages2 should have size (count.toLong)
}

probe1.cancel()
probe2.cancel()
Expand Down

0 comments on commit 3825f5f

Please sign in to comment.