Skip to content

Commit

Permalink
async
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Jun 11, 2023
1 parent de5c031 commit 7a18256
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

import java.util
import scala.jdk.CollectionConverters._

//noinspection SimplifyWhenInspection
Expand Down Expand Up @@ -118,15 +119,36 @@ private[consumer] final class Runloop private (
val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }

consumer
.runloopAccess(c => ZIO.attempt(c.commitSync(offsets.asJava)))
.foldZIO(
{
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err => cmd.fail(err) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
},
_ => cmd.succeed <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
.runloopAccess(c =>
for {
fiberId <- ZIO.fiberId
_ <- ZIO.logDebug(s"fiberId: $fiberId")
_ <- ZIO.async(
register = (cb: UIO[Unit] => Unit) =>
c.commitAsync(
offsets.asJava,
(_: util.Map[TopicPartition, OffsetAndMetadata], e: Exception) =>
if (e == null)
cb(
ZIO.logDebug(s"Done for: $offsets") *> cmd.succeed <*
diagnostics.emit(DiagnosticEvent.Commit.Success(offsets))
)
else
cb(
ZIO.logDebugCause(s"Failed for: $offsets", Cause.fail(e)) *>
(e match {
case _: RebalanceInProgressException =>
ZIO.logDebug(s"Rebalance in progress, retrying commit for offsets $offsets") *>
commandQueue.offer(cmd).unit
case err =>
ZIO.logDebug(s"TATA") *> cmd.fail(err) <*
diagnostics.emit(DiagnosticEvent.Commit.Failure(offsets, err))
})
)
),
blockingOn = fiberId
)
} yield ()
)
}

Expand Down

0 comments on commit 7a18256

Please sign in to comment.