From 584b806e4b2251fff528388c103ef96f2f7bece9 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sun, 11 Jun 2023 14:27:11 +0400 Subject: [PATCH] async --- .../zio/kafka/consumer/internal/Runloop.scala | 42 +++++++++++++++---- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index e1dfc5ea1..1d7942d71 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -12,6 +12,7 @@ import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.stream._ +import java.util import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection @@ -117,16 +118,39 @@ private[consumer] final class Runloop private ( private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } + //noinspection ConvertExpressionToSAM + def callback(cb: UIO[Unit] => Unit): OffsetCommitCallback = + new OffsetCommitCallback { + override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = + if (e == null) + cb( + ZIO.logDebug(s"Done for: $offsets") *> cmd.succeed <* + diagnostics.emit(DiagnosticEvent.Commit.Success(offsets)) + ) + else + cb( + ZIO.logDebugCause(s"Failed for: $offsets", Cause.fail(e)) *> + (e match { + case _: RebalanceInProgressException => + ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *> + commandQueue.offer(cmd).unit + case err => + ZIO.logDebug(s"TATA") *> cmd.fail(err) <* + diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err)) + }) + ) + } + consumer - .runloopAccess(c => ZIO.attempt(c.commitSync(offsets.asJava))) - .foldZIO( - { - case _: RebalanceInProgressException => - ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *> - commandQueue.offer(cmd).unit - case err => cmd.fail(err) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err)) - }, - _ => cmd.succeed <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets)) + .runloopAccess(c => + for { + fiberId <- ZIO.fiberId + _ <- ZIO.logDebug(s"fiberId: $fiberId") + _ <- ZIO.async( + register = (cb: UIO[Unit] => Unit) => c.commitAsync(offsets.asJava, callback(cb)), + blockingOn = fiberId + ) + } yield () ) }