From d09e7acfe5b07e2e73d59e1bee9246d5ee5b1520 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Tue, 23 May 2023 14:11:28 +0400 Subject: [PATCH] Reproducer for #852 copied from @erikvanoosten Gist: https://gist.github.com/erikvanoosten/5e9f34d8ff43de32b583c021c858e309 --- .github/workflows/ci.yml | 14 +- build.sbt | 3 +- .../bench/ConsumersComparisonBenchmark.scala | 6 +- .../main/scala/zio/kafka/example/Main.scala | 66 +++-- .../test/scala/zio/kafka/ProducerSpec.scala | 2 +- .../zio/kafka/consumer/ConsumerSpec.scala | 189 +++++++++++- .../zio/kafka/testkit/KafkaTestUtils.scala | 2 - .../scala/zio/kafka/consumer/Consumer.scala | 95 ++---- .../zio/kafka/consumer/ConsumerSettings.scala | 15 +- .../diagnostics/DiagnosticEvent.scala | 7 + .../consumer/diagnostics/Diagnostics.scala | 2 +- .../internal/PartitionStreamControl.scala | 10 +- .../zio/kafka/consumer/internal/Runloop.scala | 276 +++++++++--------- .../consumer/internal/RunloopAccess.scala | 128 ++++++++ .../consumer/internal/RunloopCommand.scala | 26 +- .../consumer/internal/SubscriptionState.scala | 15 + 16 files changed, 569 insertions(+), 287 deletions(-) create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7bae583d26..f7755c1920 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,7 +29,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: '8' @@ -54,7 +54,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: '8' @@ -80,7 +80,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: ${{ matrix.java }} @@ -92,7 +92,7 @@ jobs: with: fetch-depth: '0' - name: Test - run: sbt ++test + run: sbt +test update-readme: name: Update README runs-on: ubuntu-latest @@ -106,7 +106,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: '8' @@ -180,7 +180,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: '8' @@ -209,7 +209,7 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v3.11.0 with: distribution: temurin java-version: '8' diff --git a/build.sbt b/build.sbt index 7abde72b9d..10fc5fe290 100644 --- a/build.sbt +++ b/build.sbt @@ -163,8 +163,6 @@ lazy val zioKafkaExample = .settings( libraryDependencies ++= Seq( "dev.zio" %% "zio" % "2.0.13", - "dev.zio" %% "zio-kafka" % "2.3.1", - "dev.zio" %% "zio-kafka-testkit" % "2.3.1" % Test, "dev.zio" %% "zio-test" % "2.0.13" % Test, "ch.qos.logback" % "logback-classic" % "1.4.6", "dev.zio" %% "zio-logging-slf4j2" % "2.1.13", @@ -175,6 +173,7 @@ lazy val zioKafkaExample = // [error] org.scala-lang.modules:scala-collection-compat _3, _2.13 crossScalaVersions -= scala3.value ) + .dependsOn(zioKafka, zioKafkaTestkit) addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumersComparisonBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumersComparisonBenchmark.scala index bc2bca00cb..5147c6807d 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumersComparisonBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumersComparisonBenchmark.scala @@ -11,7 +11,7 @@ import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.Kafka import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, minimalConsumer, produceMany, producer } -import zio.{ durationInt, ULayer, ZIO, ZLayer } +import zio.{ ULayer, ZIO, ZLayer } import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ @@ -55,9 +55,7 @@ class ConsumersComparisonBenchmark extends ZioBenchmark[Env] { consumerSettings( clientId = randomThing("client"), groupId = Some(randomThing("client")), - `max.poll.records` = 1000, // A more production worthy value - runloopTimeout = - 1.hour // Absurdly high timeout to avoid the runloop from being interrupted while we're benchmarking other stuff + `max.poll.records` = 1000 // A more production worthy value ) ) diff --git a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala index 5e3a9b12b9..c8beeacbe4 100644 --- a/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala +++ b/zio-kafka-example/src/main/scala/zio/kafka/example/Main.scala @@ -1,9 +1,11 @@ package zio.kafka.example import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } +import org.apache.kafka.clients.producer.ProducerRecord import zio._ -import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.{ Consumer, ConsumerSettings, Subscription } +import zio.kafka.consumer.Consumer.AutoOffsetStrategy +import zio.kafka.consumer.{ Consumer, ConsumerSettings, OffsetBatch, Subscription } +import zio.kafka.producer.{ Producer, ProducerSettings } import zio.kafka.serde.Serde import zio.logging.backend.SLF4J @@ -41,32 +43,50 @@ object Main extends ZIOAppDefault { private val topic = "test-topic" - private def consumerLayer(kafka: MyKafka): ZLayer[Any, Throwable, Consumer] = { - val consumerSettings = - ConsumerSettings(kafka.bootstrapServers) - .withPollTimeout(500.millis) - .withGroupId("test") + private val consumerLayer: ZLayer[MyKafka, Throwable, Consumer] = + ZLayer.scoped { + ZIO.serviceWithZIO[MyKafka] { kafka => + val consumerSettings = + ConsumerSettings(kafka.bootstrapServers) + .withGroupId("group1") + .withOffsetRetrieval(Consumer.OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) + Consumer.make(consumerSettings) + } + } - ZLayer.make[Consumer]( - ZLayer.succeed(consumerSettings), - ZLayer.succeed(Diagnostics.NoOp), - Consumer.live - ) - } + private val producerLayer: ZLayer[MyKafka, Throwable, Producer] = + ZLayer.scoped { + ZIO.serviceWithZIO[MyKafka] { kafka => + val producerSettings = ProducerSettings(kafka.bootstrapServers) + Producer.make(producerSettings) + } + } override def run: ZIO[ZIOAppArgs with Scope, Any, Any] = ZIO.addFinalizer(ZIO.logInfo("Stopping app")) *> ( for { - _ <- ZIO.logInfo(s"Starting app") - kafka <- ZIO.service[MyKafka] - stream = Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .provideLayer(consumerLayer(kafka)) - _ <- ZIO.logInfo(s"Consuming messages...") - consumed <- stream.take(1000).tap(r => ZIO.logInfo(s"Consumed record $r")).runCount - _ <- ZIO.logInfo(s"Consumed $consumed records") + _ <- ZIO.logInfo(s"Starting app") + _ <- Producer.produceChunk( + Chunk.fromIterable(1 to 1000).map(n => new ProducerRecord(topic, n, n.toString)), + Serde.int, + Serde.string + ) + _ <- Consumer + .plainStream(Subscription.topics(topic), Serde.int, Serde.string) + .take(100) + .groupedWithin(10, 100.millis) + .mapZIOPar(2)(c => ZIO.debug(c.size) as c.map(_.offset)) + .map(OffsetBatch.apply) + .debug("Offset") + .mapZIO(_.commit) + .debug("Commit") + .runDrain + _ <- ZIO.logInfo("Ready!") } yield () - ).provideSomeLayer[ZIOAppArgs with Scope](MyKafka.embedded) - + ).provide( + MyKafka.embedded, + consumerLayer, + producerLayer + ) } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index 590d3c8027..ac86e380a7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -470,5 +470,5 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { ) .provideSomeShared[Scope]( Kafka.embedded - ) @@ withLiveClock @@ timeout(2.minutes) @@ sequential + ) @@ withLiveClock @@ timeout(3.minutes) @@ sequential } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 84c9bbd765..82c2da14e2 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -12,8 +12,14 @@ import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ + ConsumerFinalized, + RunloopFinalized, + SubscriptionFinalized +} import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.producer.TransactionalProducer +import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } @@ -24,6 +30,7 @@ import zio.test._ import scala.reflect.ClassTag +//noinspection SimplifyAssertInspection object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { override val kafkaPrefix: String = "consumespec" @@ -287,8 +294,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .tap(_ => ZIO.logDebug("Stream completed")) .provideSomeLayer[Kafka]( consumer(client, Some(group)) - ) *> keepProducing - .set(false) + ) + _ <- keepProducing.set(false) } yield assertCompletes }, test("process outstanding commits after a graceful shutdown") { @@ -1052,11 +1059,183 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceOne(topic, "key2", "message2") _ <- recordsOut.take } yield assertCompletes - } + }, + suite("issue #856")( + test( + "Booting a Consumer to do something else than consuming should not fail with `RunloopTimeout` exception" + ) { + def test(diagnostics: Diagnostics) = + for { + clientId <- randomClient + settings <- consumerSettings(clientId = clientId) + _ <- Consumer.make(settings, diagnostics = diagnostics) + _ <- ZIO.sleep(1.second) + } yield assertCompletes + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized))) + }, + suite( + "Ordering of finalizers matters. If subscriptions are finalized after the runloop, it creates a deadlock" + )( + test("When not consuming, the Runloop is not started so only the Consumer is finalized") { + def test(diagnostics: Diagnostics): ZIO[Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + settings <- consumerSettings(clientId = clientId) + _ <- Consumer.make(settings, diagnostics = diagnostics) + } yield assertCompletes + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)(hasSameElements(Chunk(ConsumerFinalized))) + }, + test("When consuming, the Runloop is started. The finalization orders matters to avoid a deadlock") { + // This test ensures that we're not inadvertently introducing a deadlock by changing the order of finalizers. + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceOne(topic, "key1", "message1") + // Starting a consumption session to start the Runloop. + consumed_0 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(1) + .runCount + consumed_1 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(1) + .runCount + } yield assert(consumed_0)(equalTo(1L)) && assert(consumed_1)(equalTo(1L)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + }, + test( + "Calling `Consumer::stopConsumption` just after starting a forked consumption session should stop the consumption" + ) { + val numberOfMessages: Int = 100000 + val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceMany(topic, kvs) + // Starting a consumption session to start the Runloop. + fiber <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + .forkScoped + _ <- consumer.stopConsumption + consumed_0 <- fiber.join + } yield assert(consumed_0)(isLessThan(numberOfMessages.toLong)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + } @@ nonFlaky(5), + test("it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously") { + val numberOfMessages: Int = 100000 + val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i")) + + def test(diagnostics: Diagnostics): ZIO[Producer & Scope & Kafka, Throwable, TestResult] = + for { + clientId <- randomClient + topic <- randomTopic + settings <- consumerSettings(clientId = clientId) + consumer <- Consumer.make(settings, diagnostics = diagnostics) + _ <- produceMany(topic, kvs) + // Starting a consumption session to start the Runloop. + fiber <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + .forkScoped + _ <- ZIO.sleep(200.millis) + _ <- consumer.stopConsumption + consumed_0 <- fiber.join + _ <- ZIO.logDebug(s"consumed_0: $consumed_0") + + _ <- ZIO.logDebug("About to sleep 5 seconds") + _ <- ZIO.sleep(5.seconds) + _ <- ZIO.logDebug("Slept 5 seconds") + consumed_1 <- consumer + .plainStream(Subscription.manual(topic -> 0), Serde.string, Serde.string) + .take(numberOfMessages.toLong) + .runCount + } yield assert(consumed_0)(isGreaterThan(0L) && isLessThan(numberOfMessages.toLong)) && + assert(consumed_1)(equalTo(numberOfMessages.toLong)) + + for { + diagnostics <- Diagnostics.SlidingQueue.make(1000) + testResult <- ZIO.scoped { + test(diagnostics) + } + finalizationEvents <- diagnostics.queue.takeAll.map(_.filter(_.isInstanceOf[Finalization])) + } yield testResult && assert(finalizationEvents)( + // The order is very important. + // The subscription must be finalized before the runloop, otherwise it creates a deadlock. + equalTo( + Chunk( + SubscriptionFinalized, + RunloopFinalized, + ConsumerFinalized + ) + ) + ) + } + ) + ) ) .provideSome[Scope & Kafka](producer) .provideSomeShared[Scope]( Kafka.embedded - ) @@ withLiveClock @@ TestAspect.sequential @@ timeout(2.minutes) + ) @@ withLiveClock @@ sequential @@ timeout(2.minutes) } diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 73337c857e..33af7324e8 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -103,7 +103,6 @@ object KafkaTestUtils { offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed - runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout, properties: Map[String, String] = Map.empty ): URIO[Kafka, ConsumerSettings] = ZIO.serviceWith[Kafka] { (kafka: Kafka) => @@ -111,7 +110,6 @@ object KafkaTestUtils { .withClientId(clientId) .withCloseTimeout(5.seconds) .withPollTimeout(100.millis) - .withRunloopTimeout(runloopTimeout) .withProperties( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.METADATA_MAX_AGE_CONFIG -> "100", diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 9795fe1864..6ff66f4b3a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -3,15 +3,14 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp } import org.apache.kafka.common._ import zio._ +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord -import zio.kafka.consumer.internal.{ ConsumerAccess, Runloop } +import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ import scala.jdk.CollectionConverters._ -import scala.util.control.NoStackTrace trait Consumer { @@ -156,16 +155,9 @@ trait Consumer { object Consumer { - case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace - - private final case class Live( - private val consumer: ConsumerAccess, - private val settings: ConsumerSettings, - private val runloop: Runloop, - private val subscriptions: Ref.Synchronized[Set[Subscription]], - private val partitionAssignments: Hub[ - Take[Throwable, Chunk[(TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord])]] - ] + private final class Live private[Consumer] ( + consumer: ConsumerAccess, + runloopAccess: RunloopAccess ) extends Consumer { override def assignment: Task[Set[TopicPartition]] = @@ -202,7 +194,9 @@ object Consumer { * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit * requests. */ - override def stopConsumption: UIO[Unit] = runloop.stopConsumption + override def stopConsumption: UIO[Unit] = + ZIO.logDebug("stopConsumption called") *> + runloopAccess.stopConsumption() override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) @@ -223,43 +217,20 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { - def extendSubscriptions = subscriptions.updateZIO { existingSubscriptions => - val newSubscriptions = NonEmptyChunk.fromIterable(subscription, existingSubscriptions) - Subscription.unionAll(newSubscriptions) match { - case None => ZIO.fail(InvalidSubscriptionUnion(newSubscriptions)) - case Some(union) => - ZIO.logDebug(s"Changing kafka subscription to $union") *> - subscribe(union).as(newSubscriptions.toSet) - } - }.uninterruptible - - def reduceSubscriptions = subscriptions.updateZIO { existingSubscriptions => - val newSubscriptions = NonEmptyChunk.fromIterableOption(existingSubscriptions - subscription) - val newUnion = newSubscriptions.flatMap(Subscription.unionAll) - - (newUnion match { - case Some(union) => - ZIO.logDebug(s"Reducing kafka subscription to $union") *> subscribe(union) - case None => - ZIO.logDebug(s"Unsubscribing kafka consumer") *> unsubscribe - }).as(newSubscriptions.fold(Set.empty[Subscription])(_.toSet)) - }.uninterruptible - val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { for { - stream <- ZStream.fromHubScoped(partitionAssignments) - _ <- extendSubscriptions.withFinalizer(_ => reduceSubscriptions.orDie) + stream <- runloopAccess.subscribe(subscription) } yield stream .map(_.exit) .flattenExitOption - .flattenChunks .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = - if (onlyByteArraySerdes) partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] + if (onlyByteArraySerdes) + partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream @@ -321,12 +292,6 @@ object Consumer { .runDrain } yield () - private def subscribe(subscription: Subscription): Task[Unit] = - runloop.changeSubscription(Some(subscription)) - - private def unsubscribe: Task[Unit] = - runloop.changeSubscription(None) - override def metrics: Task[Map[MetricName, Metric]] = consumer.withConsumer(_.metrics().asScala.toMap) } @@ -346,39 +311,13 @@ object Consumer { def make( settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp - ): ZIO[Scope, Throwable, Consumer] = { - /* - We must supply a queue size for the partitionAssignments hub below. Under most circumstances, - a value of 1 should be sufficient, as runloop.partitions is already an unbounded queue. But if - there is a large skew in speed of consuming partition assignments (not the speed of consuming kafka messages) - between the subscriptions, there may arise a situation where the faster stream is 'blocked' from - getting new partition assignments by the faster stream. A value of 32 should be more than sufficient to cover - this situation. - */ - val hubCapacity = 32 - + ): ZIO[Scope, Throwable, Consumer] = for { - _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) - wrapper <- ConsumerAccess.make(settings) - runloop <- Runloop.make( - hasGroupId = settings.hasGroupId, - consumer = wrapper, - pollTimeout = settings.pollTimeout, - diagnostics = diagnostics, - offsetRetrieval = settings.offsetRetrieval, - userRebalanceListener = settings.rebalanceListener, - restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, - runloopTimeout = settings.runloopTimeout - ) - subscriptions <- Ref.Synchronized.make(Set.empty[Subscription]) - - partitionAssignments <- ZStream - .fromQueue(runloop.partitionsQueue) - .map(_.exit) - .flattenExitOption - .toHub(hubCapacity) - } yield Live(wrapper, settings, runloop, subscriptions, partitionAssignments) - } + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) + _ <- SslHelper.validateEndpoint(settings.bootstrapServers, settings.properties) + consumerAccess <- ConsumerAccess.make(settings) + runloopAccess <- RunloopAccess.make(settings, diagnostics, consumerAccess) + } yield new Live(consumerAccess, runloopAccess) /** * Accessor method for [[Consumer.assignment]] diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 750ca807f7..0f1fe1815d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -15,11 +15,6 @@ import zio.kafka.security.KafkaCredentialStore * @param restartStreamOnRebalancing * When `true` _all_ streams are restarted during a rebalance, including those streams that are not revoked. The * default is `false`. - * @param runloopTimeout - * Internal timeout for each iteration of the command processing and polling loop, use to detect stalling. This should - * be much larger than the pollTimeout and the time it takes to process chunks of records. If your consumer is not - * subscribed for long periods during its lifetime, this timeout should take that into account as well. When the - * timeout expires, the plainStream/partitionedStream/etc will fail with a [[Consumer.RunloopTimeout]]. */ case class ConsumerSettings( bootstrapServers: List[String], @@ -28,8 +23,7 @@ case class ConsumerSettings( pollTimeout: Duration, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), rebalanceListener: RebalanceListener = RebalanceListener.noop, - restartStreamOnRebalancing: Boolean = false, - runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout + restartStreamOnRebalancing: Boolean = false ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig) @@ -84,20 +78,15 @@ case class ConsumerSettings( def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings = withProperties(credentialsStore.properties) - def withRunloopTimeout(timeout: Duration): ConsumerSettings = - copy(runloopTimeout = timeout) } object ConsumerSettings { - val defaultRunloopTimeout: Duration = 4.minutes - def apply(bootstrapServers: List[String]): ConsumerSettings = new ConsumerSettings( bootstrapServers = bootstrapServers, properties = Map.empty, closeTimeout = 30.seconds, pollTimeout = 50.millis, - offsetRetrieval = OffsetRetrieval.Auto(), - runloopTimeout = defaultRunloopTimeout + offsetRetrieval = OffsetRetrieval.Auto() ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index 2a2da75e07..f22f2d7d5b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -27,4 +27,11 @@ object DiagnosticEvent { final case class Lost(partitions: Set[TopicPartition]) extends Rebalance } + sealed trait Finalization extends DiagnosticEvent + object Finalization { + case object SubscriptionFinalized extends Finalization + case object RunloopFinalized extends Finalization + case object ConsumerFinalized extends Finalization + } + } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala index 054bc567f2..3c5bbe7bee 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala @@ -10,7 +10,7 @@ object Diagnostics { override def emit(event: => DiagnosticEvent): UIO[Unit] = ZIO.unit } - final case class SlidingQueue(queue: Queue[DiagnosticEvent]) extends Diagnostics { + final case class SlidingQueue private (queue: Queue[DiagnosticEvent]) extends Diagnostics { override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit } object SlidingQueue { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 5f0d05811e..47c37f24d4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -10,7 +10,7 @@ private[internal] final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], - interruptPromise: Promise[Throwable, Unit], + interruptionPromise: Promise[Throwable, Unit], completedPromise: Promise[Nothing, Unit] ) { @@ -27,12 +27,12 @@ private[internal] final class PartitionStreamControl private ( /** To be invoked when the partition was lost. */ def lost(): UIO[Boolean] = - interruptPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost")) + interruptionPromise.fail(new RuntimeException(s"Partition ${tp.toString} was lost")) /** To be invoked when the partition was revoked or otherwise needs to be ended. */ def end(): ZIO[Any, Nothing, Unit] = logAnnotate { - ZIO.logTrace(s"Partition ${tp.toString} ending") *> + ZIO.logDebug(s"Partition ${tp.toString} ending") *> dataQueue.offer(Take.end).unit } @@ -68,9 +68,9 @@ private[internal] object PartitionStreamControl { tp: TopicPartition, commandQueue: Queue[RunloopCommand], diagnostics: Diagnostics - ): ZIO[Any, Nothing, PartitionStreamControl] = + ): UIO[PartitionStreamControl] = for { - _ <- ZIO.logTrace(s"Creating partition stream ${tp.toString}") + _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] completedPromise <- Promise.make[Nothing, Unit] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 98ec3236f4..b92375c455 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -4,25 +4,25 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RebalanceInProgressException import zio._ -import zio.kafka.consumer.Consumer.{ OffsetRetrieval, RunloopTimeout } +import zio.kafka.consumer.Consumer.OffsetRetrieval +import zio.kafka.consumer._ +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer -import zio.kafka.consumer.internal.Runloop._ -import zio.kafka.consumer.{ CommittableRecord, RebalanceConsumer, RebalanceListener, Subscription } +import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.stream._ -import java.util import scala.jdk.CollectionConverters._ +//noinspection SimplifyWhenInspection private[consumer] final class Runloop private ( runtime: Runtime[Any], hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, - runloopTimeout: Duration, commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Option[Runloop.RebalanceEvent]], - val partitionsQueue: Queue[Take[Throwable, (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])]], + partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, @@ -34,19 +34,26 @@ private[consumer] final class Runloop private ( PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics) def stopConsumption: UIO[Unit] = - commandQueue.offer(RunloopCommand.StopAllStreams).unit - - def changeSubscription( - subscription: Option[Subscription] - ): Task[Unit] = - Promise - .make[Throwable, Unit] - .flatMap { cont => - commandQueue.offer(RunloopCommand.ChangeSubscription(subscription, cont)) *> - cont.await - } - .unit - .uninterruptible + ZIO.logDebug("stopConsumption called") *> + commandQueue.offer(RunloopCommand.StopAllStreams).unit + + private[consumer] def shutdown: UIO[Unit] = + ZIO.logDebug(s"Shutting down runloop initiated") *> + commandQueue + .offerAll( + Chunk( + RunloopCommand.StopSubscription, + RunloopCommand.StopAllStreams, + RunloopCommand.StopRunloop + ) + ) + .unit + + private[internal] def addSubscription(subscription: Subscription): UIO[Unit] = + commandQueue.offer(RunloopCommand.AddSubscription(subscription)).unit + + private[internal] def removeSubscription(subscription: Subscription): UIO[Unit] = + commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { val emitDiagnostics = RebalanceListener( @@ -99,6 +106,7 @@ private[consumer] final class Runloop private ( private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { + _ <- ZIO.debug(s"Committing offsets: $offsets") p <- Promise.make[Throwable, Unit] _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) @@ -106,31 +114,19 @@ private[consumer] final class Runloop private ( } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { - val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } - val cont = (e: Exit[Throwable, Unit]) => cmd.cont.done(e).asInstanceOf[UIO[Unit]] - val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets)) - val onFailure: Throwable => UIO[Unit] = { - case _: RebalanceInProgressException => - ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *> - commandQueue.offer(cmd).unit - case err => - cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err)) - } - val callback = - new OffsetCommitCallback { - override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = - Unsafe.unsafe { implicit u => - runtime.unsafe.run(if (exception eq null) onSuccess else onFailure(exception)).getOrThrowFiberFailure() - } - } - - // We don't wait for the completion of the commit here, because it - // will only complete once we poll again. - consumer.runloopAccess { c => - ZIO - .attempt(c.commitAsync(offsets.asJava, callback)) - .catchAll(onFailure) - } + val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } + + consumer + .runloopAccess(c => ZIO.attempt(c.commitSync(offsets.asJava))) + .foldZIO( + { + case _: RebalanceInProgressException => + ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *> + commandQueue.offer(cmd).unit + case err => cmd.fail(err) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err)) + }, + _ => cmd.succeed <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets)) + ) } /** @@ -222,14 +218,13 @@ private[consumer] final class Runloop private ( private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = offsetRetrieval match { + case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty) case OffsetRetrieval.Manual(getOffsets) => - getOffsets(tps) - .tap(offsets => ZIO.foreachDiscard(offsets) { case (tp, offset) => ZIO.attempt(c.seek(tp, offset)) }) - .when(tps.nonEmpty) - .as(tps) - - case OffsetRetrieval.Auto(_) => - ZIO.succeed(Set.empty) + if (tps.isEmpty) ZIO.succeed(Set.empty) + else + getOffsets(tps) + .tap(offsets => ZIO.foreachDiscard(offsets) { case (tp, offset) => ZIO.attempt(c.seek(tp, offset)) }) + .as(tps) } /** @@ -256,7 +251,7 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = for { _ <- - ZIO.logTrace( + ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and ${state.pendingCommits.size} pending commits" ) _ <- currentStateRef.set(state) @@ -341,14 +336,13 @@ private[consumer] final class Runloop private ( } } startingStreams <- - if (pollResult.startingTps.isEmpty) { - ZIO.succeed(Chunk.empty[PartitionStreamControl]) - } else { + if (pollResult.startingTps.isEmpty) ZIO.succeed(Chunk.empty[PartitionStreamControl]) + else { ZIO .foreach(Chunk.fromIterable(pollResult.startingTps))(newPartitionStream) .tap { newStreams => - ZIO.logTrace(s"Offering partition assignment ${pollResult.startingTps}") *> - partitionsQueue.offer(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) + ZIO.logDebug(s"Offering partition assignment ${pollResult.startingTps}") *> + partitionsHub.publish(Take.chunk(Chunk.fromIterable(newStreams.map(_.tpStream)))) } } runningStreams <- ZIO.filter(pollResult.assignedStreams)(_.isRunning) @@ -366,78 +360,107 @@ private[consumer] final class Runloop private ( assignedStreams = updatedStreams ) - private def handleCommand(state: State, cmd: RunloopCommand.StreamControl): Task[State] = + private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = { + def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] = + applyNewSubscriptionState(newSubscriptionState).flatMap { newAssignedStreams => + val newState = state.copy( + assignedStreams = state.assignedStreams ++ newAssignedStreams, + subscriptionState = newSubscriptionState + ) + if (newSubscriptionState.isSubscribed) ZIO.succeed(newState) + else + // End all streams and pending requests + endRevokedPartitions( + newState.pendingRequests, + newState.assignedStreams, + isRevoked = _ => true + ).map { revokeResult => + newState.copy( + pendingRequests = revokeResult.pendingRequests, + assignedStreams = revokeResult.assignedStreams + ) + } + } + cmd match { case req: RunloopCommand.Request => ZIO.succeed(state.addRequest(req)) case cmd: RunloopCommand.Commit => doCommit(cmd).as(state.addCommit(cmd)) - case cmd @ RunloopCommand.ChangeSubscription(subscription, _) => - handleChangeSubscription(subscription).flatMap { newAssignedStreams => - val newState = state.copy( - assignedStreams = state.assignedStreams ++ newAssignedStreams, - subscription = subscription - ) - if (subscription.isDefined) ZIO.succeed(newState) - else { - // End all streams and pending requests - endRevokedPartitions( - newState.pendingRequests, - newState.assignedStreams, - isRevoked = _ => true - ).map { revokeResult => - newState.copy( - pendingRequests = revokeResult.pendingRequests, - assignedStreams = revokeResult.assignedStreams - ) + case RunloopCommand.AddSubscription(newSubscription) => + state.subscriptionState match { + case SubscriptionState.NotSubscribed => + val newSubState = + SubscriptionState.Subscribed(subscriptions = Set(newSubscription), union = newSubscription) + doChangeSubscription(newSubState) + case SubscriptionState.Subscribed(existingSubscriptions, _) => + val subs = NonEmptyChunk.fromIterable(newSubscription, existingSubscriptions) + + Subscription.unionAll(subs) match { + case None => ZIO.fail(InvalidSubscriptionUnion(subs)) + case Some(union) => + val newSubState = + SubscriptionState.Subscribed( + subscriptions = existingSubscriptions + newSubscription, + union = union + ) + doChangeSubscription(newSubState) + } + } + case RunloopCommand.RemoveSubscription(subscription) => + state.subscriptionState match { + case SubscriptionState.NotSubscribed => ZIO.succeed(state) + case SubscriptionState.Subscribed(existingSubscriptions, _) => + val newUnion: Option[(Subscription, NonEmptyChunk[Subscription])] = + NonEmptyChunk + .fromIterableOption(existingSubscriptions - subscription) + .flatMap(subs => Subscription.unionAll(subs).map(_ -> subs)) + + newUnion match { + case Some((union, newSubscriptions)) => + val newSubState = + SubscriptionState.Subscribed(subscriptions = newSubscriptions.toSet, union = union) + doChangeSubscription(newSubState) + case None => + ZIO.logDebug(s"Unsubscribing kafka consumer") *> + doChangeSubscription(SubscriptionState.NotSubscribed) } - } } - .tapBoth(e => cmd.fail(e), _ => cmd.succeed) - .uninterruptible + case RunloopCommand.StopSubscription => doChangeSubscription(SubscriptionState.NotSubscribed) case RunloopCommand.StopAllStreams => for { - _ <- ZIO.logDebug("Graceful shutdown initiated") + _ <- ZIO.logDebug("Stop all streams initiated") _ <- ZIO.foreachDiscard(state.assignedStreams)(_.end()) - _ <- partitionsQueue.offer(Take.end) - _ <- ZIO.logTrace("Graceful shutdown done") + _ <- partitionsHub.publish(Take.end) + _ <- ZIO.logDebug("Stop all streams done") } yield state.copy(pendingRequests = Chunk.empty) } + } - /** - * @return - * any created streams - */ - private def handleChangeSubscription( - newSubscription: Option[Subscription] + private def applyNewSubscriptionState( + newSubscriptionState: SubscriptionState ): Task[Chunk[PartitionStreamControl]] = consumer.runloopAccess { c => - newSubscription match { - case None => + newSubscriptionState match { + case SubscriptionState.NotSubscribed => ZIO .attempt(c.unsubscribe()) .as(Chunk.empty) - case Some(Subscription.Pattern(pattern)) => + case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rc = RebalanceConsumer.Live(c) ZIO .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(runtime, rc))) .as(Chunk.empty) - case Some(Subscription.Topics(topics)) => + case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rc = RebalanceConsumer.Live(c) ZIO .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(runtime, rc))) .as(Chunk.empty) - case Some(Subscription.Manual(topicPartitions)) => + case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop for { - _ <- ZIO.attempt(c.assign(topicPartitions.asJava)) - _ <- offsetRetrieval match { - case OffsetRetrieval.Manual(getOffsets) => - getOffsets(topicPartitions).flatMap { offsets => - ZIO.foreachDiscard(offsets) { case (tp, offset) => ZIO.attempt(c.seek(tp, offset)) } - } - case OffsetRetrieval.Auto(_) => ZIO.unit - } + _ <- ZIO.attempt(c.assign(topicPartitions.asJava)) + _ <- doSeekForNewPartitions(c, topicPartitions) partitionStreams <- ZIO.foreach(Chunk.fromIterable(topicPartitions))(newPartitionStream) - _ <- partitionsQueue.offer(Take.chunk(partitionStreams.map(_.tpStream))) + _ <- partitionsHub.publish(Take.chunk(partitionStreams.map(_.tpStream))) } yield partitionStreams } } @@ -459,22 +482,21 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .timeoutFail[Throwable](RunloopTimeout)(runloopTimeout) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(State.initial) { (state, commands) => for { - _ <- ZIO.logTrace(s"Processing ${commands.size} commands: ${commands.mkString(",")}") - streamCommands = commands.collect { case cmd: RunloopCommand.StreamControl => cmd } + _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") + streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } stateAfterCommands <- ZIO.foldLeft(streamCommands)(state)(handleCommand) updatedStateAfterPoll <- if (stateAfterCommands.shouldPoll) handlePoll(stateAfterCommands) else ZIO.succeed(stateAfterCommands) // Immediately poll again, after processing all new queued commands - _ <- commandQueue.offer(RunloopCommand.Poll).when(updatedStateAfterPoll.shouldPoll) + _ <- if (updatedStateAfterPoll.shouldPoll) commandQueue.offer(RunloopCommand.Poll) else ZIO.unit } yield updatedStateAfterPoll } .tapErrorCause(cause => ZIO.logErrorCause("Error in Runloop", cause)) - .onError(cause => partitionsQueue.offer(Take.failCause(cause))) + .onError(cause => partitionsHub.offer(Take.failCause(cause))) } } @@ -501,9 +523,6 @@ private[consumer] object Runloop { type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] - // Internal parameters, should not be necessary to tune - private val commandQueueSize = 1024 - private final case class PollResult( startingTps: Set[TopicPartition], pendingRequests: Chunk[RunloopCommand.Request], @@ -529,6 +548,9 @@ private[consumer] object Runloop { ) extends RebalanceEvent } + // Internal parameters, should not be necessary to tune + private final val commandQueueSize: Int = 1024 + def make( hasGroupId: Boolean, consumer: ConsumerAccess, @@ -537,28 +559,22 @@ private[consumer] object Runloop { offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, - runloopTimeout: Duration + partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): ZIO[Scope, Throwable, Runloop] = for { + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.bounded[RunloopCommand](commandQueueSize))(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Option[Runloop.RebalanceEvent]](None) - partitionsQueue <- ZIO.acquireRelease( - Queue - .unbounded[ - Take[Throwable, (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord])] - ] - )(_.shutdown) - currentStateRef <- Ref.make(State.initial) - runtime <- ZIO.runtime[Any] + currentStateRef <- Ref.make(State.initial) + runtime <- ZIO.runtime[Any] runloop = new Runloop( runtime = runtime, hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, - runloopTimeout = runloopTimeout, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, - partitionsQueue = partitionsQueue, + partitionsHub = partitionsHub, diagnostics = diagnostics, offsetRetrieval = offsetRetrieval, userRebalanceListener = userRebalanceListener, @@ -569,13 +585,13 @@ private[consumer] object Runloop { // Run the entire loop on the a dedicated thread to avoid executor shifts executor <- RunloopExecutor.newInstance - fib <- ZIO.onExecutor(executor)(runloop.run).forkScoped + fiber <- ZIO.onExecutor(executor)(runloop.run).forkScoped + waitForRunloopStop = fiber.join.orDie _ <- ZIO.addFinalizer( - ZIO.logTrace("Shutting down Runloop") *> - commandQueue.offer(RunloopCommand.StopAllStreams) *> - commandQueue.offer(RunloopCommand.StopRunloop) *> - fib.join.orDie <* + ZIO.logDebug("Shutting down Runloop") *> + runloop.shutdown *> + waitForRunloopStop <* ZIO.logDebug("Shut down Runloop") ) } yield runloop @@ -585,15 +601,13 @@ private[internal] final case class State( pendingRequests: Chunk[RunloopCommand.Request], pendingCommits: Chunk[RunloopCommand.Commit], assignedStreams: Chunk[PartitionStreamControl], - subscription: Option[Subscription] + subscriptionState: SubscriptionState ) { def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c) def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) - def isSubscribed: Boolean = subscription.isDefined - def shouldPoll: Boolean = - isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) + subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) } object State { @@ -601,6 +615,6 @@ object State { pendingRequests = Chunk.empty, pendingCommits = Chunk.empty, assignedStreams = Chunk.empty, - subscription = None + subscriptionState = SubscriptionState.NotSubscribed ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala new file mode 100644 index 0000000000..7b2937d829 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -0,0 +1,128 @@ +package zio.kafka.consumer.internal + +import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization +import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.{ ConsumerSettings, Subscription } +import zio.stream.{ Stream, Take, UStream, ZStream } +import zio.{ durationInt, Hub, RIO, Ref, Scope, Task, UIO, ZIO, ZLayer } + +private[internal] sealed trait RunloopState +private[internal] object RunloopState { + case object NotStarted extends RunloopState + final case class Started(runloop: Runloop) extends RunloopState + case object Stopped extends RunloopState +} + +/** + * This [[RunloopAccess]] is here to make the [[Runloop]] instantiation/boot lazy: we only starts it when the user is + * starting a consuming session. + * + * This is needed because of 2 things: + * + * 1. A Consumer can be used to do something else than consuming (e.g. fetching Kafka topics metadata) + * 1. The [[Runloop]] has a timeout which is reached if no commands are processed for a certain amount of time. If the + * Runloop is started eagerly (when we instantiate a Consumer), then the timeout will be reached even if the user + * is still using the Consumer. + * + * Additional note for the future: + * + * This is less an issue now that we have removed the `RunloopTimeout` exception. It might be possible to remove this + * `RunloopAccess` and start the `Runloop` eagerly. Reaching the timeout if the user does not consumer. Rebooting a new + * `Runloop` if the user decides to finally consume with its `Consumer`. Tho, I don't know the + * implication/complexity/feasibility of this change and it's not what I'm trying to achieve/fix here. + */ +private[consumer] final class RunloopAccess private ( + runloopStateRef: Ref.Synchronized[RunloopState], + partitionHub: Hub[Take[Throwable, PartitionAssignment]], + makeRunloop: Task[RunloopState.Started], + diagnostics: Diagnostics +) { + private def runloop(shouldStartIfNot: Boolean): Task[RunloopState] = + runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop } + private def withRunloopZIO[R, A](shouldStartIfNot: Boolean)(f: Runloop => RIO[R, A]): RIO[R, A] = + runloop(shouldStartIfNot).flatMap { + case RunloopState.NotStarted => ZIO.unit.asInstanceOf[RIO[R, A]] + case RunloopState.Started(runloop) => f(runloop) + case RunloopState.Stopped => ZIO.unit.asInstanceOf[RIO[R, A]] + } + + /** + * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. + * + * Note: + * 1. The `.orDie` is just here for compilation. It cannot happen. + * 1. We do a 100 retries waiting 10ms between each to roughly take max 1s before to stop to retry. We want to avoid + * an infinite loop. We need this recursion because if the user calls `stopConsumption` before the Runloop is + * started, we need to wait for it to be started. Can happen if the user starts a consuming session in a forked + * fiber and immediately after forking, stops it. The Runloop will potentially not be started yet. + */ + // noinspection SimplifyUnlessInspection + def stopConsumption(retry: Int = 100, initialCall: Boolean = true): UIO[Unit] = { + @inline def next: UIO[Unit] = stopConsumption(retry - 1, initialCall = false) + + runloop(shouldStartIfNot = false).orDie.flatMap { + case RunloopState.Stopped => ZIO.unit + case RunloopState.Started(runloop) => runloop.stopConsumption + case RunloopState.NotStarted => + if (retry <= 0) ZIO.unit + else if (initialCall) next + else next.delay(10.millis) + } + } + + /** + * We're doing all of these things in this method so that the interface of this class is as simple as possible and + * there's no mistake possible for the caller. + * + * The external world (Consumer) doesn't need to know how we "subscribe", "unsubscribe", etc. internally. + */ + def subscribe( + subscription: Subscription + ): ZIO[Scope, Throwable, UStream[Take[Throwable, PartitionAssignment]]] = + for { + stream <- ZStream.fromHubScoped(partitionHub) + // starts the Runloop if not already started + _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) + _ <- ZIO.addFinalizer { + withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)).orDie <* + diagnostics.emit(Finalization.SubscriptionFinalized) + } + } yield stream + +} + +private[consumer] object RunloopAccess { + type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + + def make( + settings: ConsumerSettings, + diagnostics: Diagnostics = Diagnostics.NoOp, + consumerAccess: ConsumerAccess + ): ZIO[Scope, Throwable, RunloopAccess] = + for { + // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer + // When the Consumer is shutdown, the Runloop and the Hub will be shutdown too (before the consumer) + consumerScope <- ZIO.scope + partitionsHub <- ZIO + .acquireRelease(Hub.unbounded[Take[Throwable, PartitionAssignment]])(_.shutdown) + .provide(ZLayer.succeed(consumerScope)) + runloopStateRef <- Ref.Synchronized.make[RunloopState](RunloopState.NotStarted) + makeRunloop = Runloop + .make( + hasGroupId = settings.hasGroupId, + consumer = consumerAccess, + pollTimeout = settings.pollTimeout, + diagnostics = diagnostics, + offsetRetrieval = settings.offsetRetrieval, + userRebalanceListener = settings.rebalanceListener, + restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, + partitionsHub = partitionsHub + ) + .withFinalizer(_ => runloopStateRef.set(RunloopState.Stopped)) + .map(RunloopState.Started.apply) + .provide(ZLayer.succeed(consumerScope)) + } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index 50451ca1aa..dbacfdae8b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -11,29 +11,25 @@ object RunloopCommand { sealed trait Control extends RunloopCommand /** Used for internal control of the stream of data from Kafka. */ - sealed trait StreamControl extends RunloopCommand + sealed trait StreamCommand extends RunloopCommand /** Used as a signal that another poll is needed. */ case object Poll extends Control case object StopRunloop extends Control - case object StopAllStreams extends StreamControl + case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamControl { - @inline def isDone: UIO[Boolean] = cont.isDone - - @inline def isPending: UIO[Boolean] = isDone.negate + final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends StreamCommand { + @inline def succeed: UIO[Unit] = cont.succeed(()).unit + @inline def fail(e: Throwable): UIO[Unit] = cont.fail(e).unit + @inline def isDone: UIO[Boolean] = cont.isDone + @inline def isPending: UIO[Boolean] = isDone.negate } /** Used by a stream to request more records. */ - final case class Request(tp: TopicPartition) extends StreamControl - - final case class ChangeSubscription( - subscription: Option[Subscription], - cont: Promise[Throwable, Unit] - ) extends StreamControl { - @inline def succeed: UIO[Boolean] = cont.succeed(()) + final case class Request(tp: TopicPartition) extends StreamCommand - @inline def fail(throwable: Throwable): UIO[Boolean] = cont.fail(throwable) - } + final case class AddSubscription(subscription: Subscription) extends StreamCommand + final case class RemoveSubscription(subscription: Subscription) extends StreamCommand + case object StopSubscription extends StreamCommand } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala new file mode 100644 index 0000000000..09113dbdda --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/SubscriptionState.scala @@ -0,0 +1,15 @@ +package zio.kafka.consumer.internal + +import zio.kafka.consumer.Subscription + +private[internal] sealed trait SubscriptionState { + def isSubscribed: Boolean = + this match { + case _: SubscriptionState.Subscribed => true + case SubscriptionState.NotSubscribed => false + } +} +private[internal] object SubscriptionState { + case object NotSubscribed extends SubscriptionState + final case class Subscribed(subscriptions: Set[Subscription], union: Subscription) extends SubscriptionState +}