From 78cfa03673b0bd4b8fe36487de9c06f915f9f2e0 Mon Sep 17 00:00:00 2001 From: Adriel Casellas Date: Tue, 19 Nov 2024 04:12:29 -0600 Subject: [PATCH] Fix timing issue in ConsumerSpec test by adding delay in consumer stream (#1388) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR addresses a timing issue in the ConsumerSpec test: `“it’s possible to start a new consumption session from a Consumer that had a consumption session stopped previously”` # Issue: When running the entire test suite, this test occasionally fails with the following assertion error: ``` Assertion failed: ✗ 100000 was not less than 100000 consumed0 did not satisfy isGreaterThan(0L) && isLessThan(numberOfMessages.toLong) consumed0 = 100000 ``` # Cause: - The failure occurs because the first consumer sometimes consumes all messages before consumer.stopConsumption is called. - This happens due to timing variations when the test suite is run in full, possibly because of system performance or resource contention. - As a result, consumed0 equals numberOfMessages, causing the assertion consumed0 < numberOfMessages.toLong to fail. # Solution: - Introduce a slight delay in the consumer stream to prevent it from consuming all messages too quickly. - This ensures that consumer.stopConsumption is called before all messages are consumed. - The test can now reliably check that the consumer can be stopped and restarted. # Testing: - Ran the full test suite multiple times to confirm that the issue is resolved. - Verified that consumed0 is greater than 0 and less than numberOfMessages, satisfying the original assertions. --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 679a0d582..defc9484e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -1480,9 +1480,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { test( "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" ) { - // NOTE: - // When this test fails with the message `100000 was not less than 100000`, it's because - // your computer is so fast that the first consumer already consumed all 100000 messages. val numberOfMessages: Int = 100000 val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) @@ -1494,11 +1491,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { consumer <- Consumer.make(settings, diagnostics = diagnostics) _ <- produceMany(topic, kvs) // Starting a consumption session to start the Runloop. - fiber <- consumer - .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) - .take(numberOfMessages.toLong) - .runCount - .forkScoped + fiber <- + consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .tap(_ => ZIO.sleep(1.millisecond)) // sleep to avoid consuming all messages in under 200 millis + .take(numberOfMessages.toLong) + .runCount + .forkScoped _ <- ZIO.sleep(200.millis) _ <- consumer.stopConsumption consumed0 <- fiber.join