From 407c75c067868fc4ef44ecc57050cc0f3b7eb5a7 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Fri, 27 Oct 2023 17:09:32 +0200 Subject: [PATCH] Fix test In test `restartStreamsOnRebalancing mode closes all partition streams` consumer 1 is expected to receive at least 1 message. However, consumer 2 might grab all them through pre-fetching. Fix this by disabling pre-fetching for consumer 2. --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index bb584fc29..682b0c0a6 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -811,12 +811,16 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(20) .runDrain .provideSomeLayer[Kafka]( - consumer( - client2, - Some(group), - clientInstanceId = Some("consumer2"), - properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "10") - ) + // Reduce `max.poll.records` and disable pre-fetch so that we are sure that consumer 2 does + // not pre-fetch more than it will process. + ZLayer { + consumerSettings( + client2, + Some(group), + clientInstanceId = Some("consumer2"), + `max.poll.records` = 10 + ).map(_.withFetchStrategy(_ => ZIO.succeed(Set.empty))) + } >>> minimalConsumer() ) } .fork