diff --git a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala index bcd80fdb0..4979345e3 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala @@ -30,7 +30,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { "Fetched records" must { // The `max.poll.records` controls how many records Kafka fetches internally during a poll. - "do actually show even if partition is revoked" in assertAllStagesStopped { + "actually show even if partition is revoked" in assertAllStagesStopped { val count = 200 // de-coupling consecutive test runs with crossScalaVersions on Travis val topicSuffix = Random.nextInt() @@ -86,13 +86,19 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val messages1p0 = messages1.filter(_.partition() == partition0) val messages1p1 = messages1.filter(_.partition() == partition1) -// messages1p0 shouldBe Symbol("empty") - messages1p0 should have size (1) - messages1p1 should have size (count.toLong) - messages2 should have size (count.toLong) + if (messages1p0.size == 1) { // this depending on which partition gets issued "first" during poll + // this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued + messages1p0 should have size (1) + messages1p1 should have size (count.toLong) - messages1p0 should have size (1) - messages1p0.head.partition() shouldBe partition0 + messages2 should have size (count.toLong) + } else { + // this is the problematic message: even thoug partition 0 is balanced away the enqueued messages are issued + messages1p0 should have size (count.toLong) + messages1p1 should have size (1) + + messages2 should have size (count.toLong) + } probe1.cancel() probe2.cancel()