diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index ea27358246..35aacf760d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -52,16 +52,20 @@ final class PartitionStreamControl private ( /** Offer new data for the stream to process. Should be called on every poll, also when `data.isEmpty` */ private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = - if (data.isEmpty) { - queueInfoRef.update(_.withEmptyPoll) - } else { - for { - now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos - _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) - _ <- dataQueue.offer(Take.chunk(data)) - } yield () - } + ZIO + .whenZIO(isRunning) { + if (data.isEmpty) { + queueInfoRef.update(_.withEmptyPoll) + } else { + for { + now <- Clock.nanoTime + newPullDeadline = now + maxPollIntervalNanos + _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) + _ <- dataQueue.offer(Take.chunk(data)) + } yield () + } + } + .unit def queueSize: UIO[Int] = queueInfoRef.get.map(_.size) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index bbce02fb26..12dd87c500 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -460,11 +460,12 @@ private[consumer] final class Runloop private ( private def handlePoll(state: State): Task[State] = { for { - partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(state.assignedStreams) + runningStreamsBeforePoll <- ZIO.filter(state.assignedStreams)(_.isRunning) + partitionsToFetch <- settings.fetchStrategy.selectPartitionsToFetch(runningStreamsBeforePoll) _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + s" ${state.pendingCommits.size} pending commits," + - s" resuming $partitionsToFetch partitions" + s" resuming ${partitionsToFetch.size} out of ${state.assignedStreams.size} partitions" ) _ <- currentStateRef.set(state) pollResult <-