From 3dedcafb3d0eb3ac4c083ed8c2255415a5999908 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 19:01:46 +0400 Subject: [PATCH] Raw implementation of https://github.com/zio/zio-kafka/pull/1022#issuecomment-1712502924 --- .../scala/zio/kafka/consumer/Consumer.scala | 9 +++++ .../zio/kafka/consumer/internal/Runloop.scala | 36 +++++++++++++++++-- .../consumer/internal/RunloopAccess.scala | 20 ++++++++--- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 01123113e..41a14e141 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -49,6 +49,10 @@ trait Consumer { def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def commitAccumBatch[R]( + commitschedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] + def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] /** @@ -459,6 +463,11 @@ private[consumer] final class ConsumerLive private[consumer] ( override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(record) + override def commitAccumBatch[R]( + commitSchedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + runloopAccess.commitAccumBatch(commitSchedule) + override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 1366e60bf..078b75971 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -32,7 +32,8 @@ private[consumer] final class Runloop private ( userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, currentStateRef: Ref[State], - fetchStrategy: FetchStrategy + fetchStrategy: FetchStrategy, + runloopScope: Scope ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = @@ -128,6 +129,35 @@ private[consumer] final class Runloop private ( } && policy ) + // noinspection YieldingZIOEffectInspection + private[internal] def commitAccumBatch[R]( + commitSchedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + for { + acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]]) + _ <- acc.updateZIO { case data @ (offsets, promises) => + if (offsets.isEmpty) ZIO.succeed(data) + else + commit(offsets) + .foldZIO( + e => ZIO.foreachDiscard(promises)(_.fail(e)), + _ => ZIO.foreachDiscard(promises)(_.succeed(())) + ) + .as((Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]])) + } + .schedule(commitSchedule) + .forkIn(runloopScope) + } yield { (records: Chunk[CommittableRecord[_, _]]) => + for { + p <- Promise.make[Throwable, Unit] + _ <- acc.update { case (offsets, promises) => + val newOffsets = offsets ++ records.map(record => record.topicPartition -> record.record.offset()) + val newPromises = promises :+ p + (newOffsets, newPromises) + } + } yield p.await + } + private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { @@ -587,6 +617,7 @@ private[consumer] object Runloop { initialState = State.initial currentStateRef <- Ref.make(initialState) runtime <- ZIO.runtime[Any] + scope <- ZIO.scope runloop = new Runloop( runtime = runtime, hasGroupId = hasGroupId, @@ -602,7 +633,8 @@ private[consumer] object Runloop { userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, currentStateRef = currentStateRef, - fetchStrategy = fetchStrategy + fetchStrategy = fetchStrategy, + runloopScope = scope ) _ <- ZIO.logDebug("Starting Runloop") diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index a80c9b6e7..fc0272329 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -30,17 +30,22 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[R, E]( + private def withRunloopZIO__[R, E, A]( shouldStartIfNot: Boolean - )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = + )(whenRunning: Runloop => ZIO[R, E, A])(orElse: ZIO[R, E, A]): ZIO[R, E, A] = runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { - case RunloopState.NotStarted => ZIO.unit + case RunloopState.NotStarted => orElse case RunloopState.Started(runloop) => whenRunning(runloop) - case RunloopState.Finalized => ZIO.unit + case RunloopState.Finalized => orElse } + private def withRunloopZIO[R, E](shouldStartIfNot: Boolean)( + whenRunning: Runloop => ZIO[R, E, Unit] + ): ZIO[R, E, Unit] = + withRunloopZIO__(shouldStartIfNot)(whenRunning)(ZIO.unit) + /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ @@ -71,6 +76,13 @@ private[consumer] final class RunloopAccess private ( def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + def commitAccumBatch[R]( + commitschedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))( + ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit)) + ) + } private[consumer] object RunloopAccess {