From b06f0b65f748f08492d256b670af6cd2cd57ce80 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 29 Nov 2023 09:21:01 +0100 Subject: [PATCH] Fix test for stopConsumption with aggregateAsync --- .../zio/kafka/consumer/ConsumerSpec.scala | 41 +++++++++++-------- .../zio/kafka/testkit/KafkaTestUtils.scala | 4 ++ 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ed3ac4ed5..c9ca30cda 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -322,7 +322,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) }, - test("process outstanding commits after a graceful shutdown with aggregateAsync") { + test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") { val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i")) val topic = "test-outstanding-commits" for { @@ -330,21 +330,30 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { client <- randomClient _ <- produceMany(topic, kvs) messagesReceived <- Ref.make[Int](0) - offset <- (Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .mapConcatZIO { record => - for { - nr <- messagesReceived.updateAndGet(_ + 1) - _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty - } - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) - .runDrain *> - Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) - .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 5.seconds)) - } yield assert(offset.map(_.offset))(isSome(equalTo(9L))) - } @@ TestAspect.nonFlaky(10), + offset <- ( + Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .mapConcatZIO { record => + for { + nr <- messagesReceived.updateAndGet(_ + 1) + _ <- Consumer.stopConsumption.when(nr == 10) + } yield if (nr < 10) Seq(record.offset) else Seq.empty + } + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) + .runDrain *> + Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head) + ) + .provideSomeLayer[Kafka]( + consumer( + client, Some(group), commitTimeout = 4.seconds, + rebalanceSafeCommits = true, maxRebalanceDuration = 6.seconds + ) + ) + } yield { + assertTrue(offset.map(_.offset).contains(9L)) + } + } @@ TestAspect.nonFlaky(5), test("a consumer timeout interrupts the stream and shuts down the consumer") { // Setup of this test: // - Set the max poll interval very low: a couple of seconds. diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 81312116e..7ba3c28dd 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -117,6 +117,7 @@ object KafkaTestUtils { offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, + maxRebalanceDuration: Duration = 3.minutes, maxPollInterval: Duration = 5.minutes, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, @@ -140,6 +141,7 @@ object KafkaTestUtils { .withOffsetRetrieval(offsetRetrieval) .withRestartStreamOnRebalancing(restartStreamOnRebalancing) .withRebalanceSafeCommits(rebalanceSafeCommits) + .withMaxRebalanceDuration(maxRebalanceDuration) .withProperties(properties) val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId) @@ -207,6 +209,7 @@ object KafkaTestUtils { diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, + maxRebalanceDuration: Duration = 3.minutes, commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): ZLayer[Kafka, Throwable, Consumer] = @@ -219,6 +222,7 @@ object KafkaTestUtils { offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, rebalanceSafeCommits = rebalanceSafeCommits, + maxRebalanceDuration = maxRebalanceDuration, properties = properties, commitTimeout = commitTimeout )