diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 8f8352591..f785bf86c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -196,7 +196,7 @@ object Consumer { */ override def stopConsumption: UIO[Unit] = ZIO.logDebug("stopConsumption called") *> - runloopAccess.stopConsumption() + runloopAccess.stopConsumption override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 08a3df1fe..ca161a637 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -7,13 +7,25 @@ import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.kafka.consumer.{ ConsumerSettings, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ durationInt, Hub, Ref, Scope, UIO, ZIO, ZLayer } +import zio.{ Hub, Promise, Ref, Scope, UIO, ZIO, ZLayer } private[internal] sealed trait RunloopState private[internal] object RunloopState { - case object NotStarted extends RunloopState - final case class Started(runloop: Runloop) extends RunloopState - case object Stopped extends RunloopState + + /** + * Why do we need a Promise here? + * + * If the user starts a forked consumption session and just after the fork, calls `consumer.stopConsumption`, the + * consumption needs to be stopped even if the runloop is still booting up. + * + * For all the details, see discussion: https://github.com/zio/zio-kafka/pull/857#discussion_r1218434608 + */ + final case class Started(promise: Promise[Nothing, Runloop]) extends RunloopState { + @inline def runloop: UIO[Runloop] = promise.await + } + case object NotStarted extends RunloopState + case object Stopped extends RunloopState + } /** @@ -37,40 +49,28 @@ private[internal] object RunloopState { private[consumer] final class RunloopAccess private ( runloopStateRef: Ref.Synchronized[RunloopState], partitionHub: Hub[Take[Throwable, PartitionAssignment]], - makeRunloop: UIO[RunloopState.Started], + makeRunloop: UIO[Runloop], diagnostics: Diagnostics ) { private def runloop(shouldStartIfNot: Boolean): UIO[RunloopState] = - runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop } + runloopStateRef.updateSomeAndGetZIO { + case RunloopState.NotStarted if shouldStartIfNot => + for { + promise <- Promise.make[Nothing, Runloop] + _ <- makeRunloop.map(promise.succeed).fork + } yield RunloopState.Started(promise) + } private def withRunloopZIO[A](shouldStartIfNot: Boolean)(f: Runloop => UIO[A]): UIO[A] = runloop(shouldStartIfNot).flatMap { - case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]] - case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]] - case RunloopState.Started(runloop) => f(runloop) + case RunloopState.NotStarted => ZIO.unit.asInstanceOf[UIO[A]] + case RunloopState.Stopped => ZIO.unit.asInstanceOf[UIO[A]] + case s: RunloopState.Started => s.runloop.flatMap(f) } /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. - * - * Note: - * 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid - * an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is - * started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked - * fiber and immediately after forking, stops it. The Runloop will potentially not be started yet. */ - // noinspection SimplifyUnlessInspection - def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = { - @inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false) - - runloop(shouldStartIfNot = false).flatMap { - case RunloopState.Stopped => ZIO.unit - case RunloopState.Started(runloop) => runloop.stopConsumption - case RunloopState.NotStarted => - if (retry <= 0) ZIO.unit - else if (initialCall) next - else next.delay(10.millis) - } - } + def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and @@ -123,7 +123,6 @@ private[consumer] object RunloopAccess { consumerSettings = consumerSettings ) .withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped)) - .map(RunloopState.Started.apply) .provide(ZLayer.succeed(consumerScope)) } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) }