diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 5ad68a1f3..63cfc269d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -346,7 +346,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // The runner for GitHub Actions is a bit underpowered. The machine is so busy that the logic // that detects the timeout doesn't get the chance to execute quickly enough. To compensate we // sleep a huge amount of time: - .tap(r => ZIO.sleep(10.seconds).when(r.key == "key3")) + .tap(r => ZIO.sleep(20.seconds).when(r.key == "key3")) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. // Because of chunking, we need to pull more than 3 records before the interrupt kicks in. .take(100) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 554b19952..f1ab25db1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -1,6 +1,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.Offset import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord @@ -15,12 +16,30 @@ abstract class PartitionStream { def queueSize: UIO[Int] } +/** + * Provides control and information over a stream that consumes from a partition. + * + * @param tp + * topic and partition + * @param stream + * the stream + * @param dataQueue + * the queue the stream reads data from + * @param interruptionPromise + * a promise that when completed stops the stream + * @param completedPromise + * the last pulled offset (if any). The promise completes when the stream completed. + * @param queueInfoRef + * used to track the stream's pull deadline, its queue size, and last pulled offset + * @param maxPollInterval + * see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]] + */ final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], interruptionPromise: Promise[Throwable, Unit], - completedPromise: Promise[Nothing, Unit], + val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration ) extends PartitionStream { @@ -97,20 +116,20 @@ object PartitionStreamControl { ): UIO[PartitionStreamControl] = { val maxPollIntervalNanos = maxPollInterval.toNanos - def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] = + def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] = for { now <- Clock.nanoTime newPullDeadline = now + maxPollIntervalNanos - _ <- queueInfo.update(_.withPull(newPullDeadline, recordCount)) + _ <- queueInfo.update(_.withPull(newPullDeadline, records)) } yield () for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] - completedPromise <- Promise.make[Nothing, Unit] + completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime - queueInfo <- Ref.make(QueueInfo(now, 0)) + queueInfo <- Ref.make(QueueInfo(now, 0, None)) requestAndAwaitData = for { _ <- commandQueue.offer(RunloopCommand.Request(tp)) @@ -122,16 +141,19 @@ object PartitionStreamControl { LogAnnotation("topic", tp.topic()), LogAnnotation("partition", tp.partition().toString) ) *> - ZStream.finalizer( - completedPromise.succeed(()) <* - ZIO.logDebug(s"Partition stream ${tp.toString} has ended") - ) *> + ZStream.finalizer { + for { + qi <- queueInfo.get + _ <- completedPromise.succeed(qi.lastPulledOffset) + _ <- ZIO.logDebug(s"Partition stream ${tp.toString} has ended") + } yield () + } *> ZStream.repeatZIOChunk { // First try to take all records that are available right now. // When no data is available, request more data and await its arrival. dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) }.flattenTake - .chunksWith(_.tap(records => registerPull(queueInfo, records.size))) + .chunksWith(_.tap(records => registerPull(queueInfo, records))) .interruptWhen(interruptionPromise) } yield new PartitionStreamControl( tp, @@ -146,12 +168,12 @@ object PartitionStreamControl { // The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0. // (Note that theoretically `size` can go below 0 when the update operations are reordered.) - private final case class QueueInfo(pullDeadline: NanoTime, size: Int) { + private final case class QueueInfo(pullDeadline: NanoTime, size: Int, lastPulledOffset: Option[Offset]) { def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = - QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount) + QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount, lastPulledOffset) - def withPull(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = - QueueInfo(newPullDeadline, size - recordCount) + def withPull(newPullDeadline: NanoTime, records: Chunk[ByteArrayCommittableRecord]): QueueInfo = + QueueInfo(newPullDeadline, size - records.size, records.lastOption.map(_.offset).orElse(lastPulledOffset)) def deadlineExceeded(now: NanoTime): Boolean = size > 0 && pullDeadline <= now