diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4d1cad5d8f..39555c715e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -28,7 +28,6 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ -import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag //noinspection SimplifyAssertInspection @@ -1140,50 +1139,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) ) }, - test( - "Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption" - ) { - val numberOfMessages: Int = 100000 - val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) - - def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = - for { - clientId <- randomClient - topic <- randomTopic - settings <- consumerSettings(clientId = clientId) - consumer <- Consumer.make(settings, diagnostics = diagnostics) - _ <- produceMany(topic, kvs) - ref = new AtomicInteger(0) - // Starting a consumption session to start the Runloop. - fiber <- - consumer - .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) - .mapChunksZIO(chunks => ZIO.logDebug(s"Consumed ${ref.getAndAdd(chunks.size)} messages").as(chunks)) - .take(numberOfMessages.toLong) - .runCount - .fork - _ <- consumer.stopConsumption - consumed_0 <- fiber.join - } yield assert(consumed_0)(isLessThan(numberOfMessages.toLong)) - - for { - diagnostics <- Diagnostics.SlidingQueue.make(1000) - testResult <- ZIO.scoped { - test(diagnostics) - } - finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) - } yield testResult && assert(finalizationEvents)( - // The order is very important. - // The subscription must be finalized before the runloop, otherwise it creates a deadlock. - equalTo( - Chunk( - SubscriptionFinalized, - RunloopFinalized, - ConsumerFinalized - ) - ) - ) - } @@ nonFlaky(5), test( "it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously" ) { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 52f5cb005c..6c6a713016 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -198,7 +198,7 @@ object Consumer { */ override def stopConsumption: UIO[Unit] = ZIO.logDebug("stopConsumption called") *> - runloopAccess.stopConsumption() + runloopAccess.stopConsumption override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index e5f8399c15..25873566a8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -7,7 +7,7 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ durationInt, Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } +import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -42,26 +42,8 @@ private[consumer] final class RunloopAccess private ( /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. - * - * Note: - * 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid - * an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is - * started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked - * fiber and immediately after forking, stops it. The Runloop will potentially not be started yet. */ - // noinspection SimplifyUnlessInspection - def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = { - @inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false) - - runloop(shouldStartIfNot = false).flatMap { - case RunloopState.Stopped => ZIO.unit - case RunloopState.Started(runloop) => runloop.stopConsumption - case RunloopState.NotStarted => - if (retry <= 0) ZIO.unit - else if (initialCall) next - else next.delay(10.millis) - } - } + def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and