diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4fae127a0..679a0d582 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId = clientId, groupId = Some(groupId), `max.poll.records` = 1, - rebalanceSafeCommits = rebalanceSafeCommits + rebalanceSafeCommits = rebalanceSafeCommits, + maxRebalanceDuration = 60.seconds ) consumer <- Consumer.make(settings) } yield consumer diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 35cf60fa5..83948be52 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -106,6 +106,8 @@ private[consumer] final class Runloop private ( ): Task[Unit] = { val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos + def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L + val endingTps = streamsToEnd.map(_.tp).toSet def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = @@ -125,36 +127,92 @@ private[consumer] final class Runloop private ( ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie } - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + sealed trait EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } + case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } + case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } + + final case class StreamCompletionStatus( + tp: TopicPartition, + streamEnded: Boolean, + lastPulledOffset: Option[Long], + endOffsetCommitStatus: EndOffsetCommitStatus + ) { + override def toString: String = + s"${tp}: " + + s"${if (streamEnded) "stream ended" else "stream is running"}, " + + s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + + endOffsetCommitStatus + } + + def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = + "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") + + def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] = for { + committedOffsets <- committedOffsetsRef.get + allPendingCommitOffsets = + (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map { + case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset()) + } streamResults <- ZIO.foreach(streamsToEnd) { stream => for { isDone <- stream.completedPromise.isDone lastPulledOffset <- stream.lastPulledOffset endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - } yield (isDone || lastPulledOffset.isEmpty, endOffset) - } - committedOffsets <- committedOffsetsRef.get - } yield { - val allStreamsCompleted = streamResults.forall(_._1) - allStreamsCompleted && { - val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2) - val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits) - endOffsets.forall { endOffset => - val tp = endOffset.topicPartition - val offset = endOffset.offset - def endOffsetWasCommitted = committedOffsets.contains(tp, offset) - def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit => - pendingCommit.offsets.get(tp).exists { pendingOffset => - pendingOffset.offset() >= offset - } - } - endOffsetWasCommitted || endOffsetCommitIsPending + + endOffsetCommitStatus = + endOffset match { + case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } + } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) } - } + } yield streamResults + + @inline + def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { + val statusStrings = completionStatusesAsString(completionStatuses) + ZIO.logInfo( + s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + + s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" + ) + } + + def logInitialStreamCompletionStatuses: UIO[Unit] = + for { + completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty) + _ <- logStreamCompletionStatuses(completionStatuses) + } yield () + + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = + for { + completionStatuses <- getStreamCompletionStatuses(newCommits) + _ <- logStreamCompletionStatuses(completionStatuses) + } yield completionStatuses.forall { status => + // A stream is complete when it never got any records, or when it committed the offset of the last consumed record + status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) } + def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] = + if (completed) + ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") + else + for { + completionStatuses <- getStreamCompletionStatuses(newCommits) + statusStrings = completionStatusesAsString(completionStatuses) + _ <- + ZIO.logWarning( + s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + + s"the records they consumed; the rebalance will continue. " + + s"This might cause another consumer to process some records again. $statusStrings" + ) + } yield () + def commitSync: Task[Unit] = ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) @@ -174,17 +232,23 @@ private[consumer] final class Runloop private ( // // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. // Instead, we poll the queue in a loop. - ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *> - ZStream - .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) - .tap(commitAsync) - .forever - .takeWhile(_ => java.lang.System.nanoTime() <= deadline) - .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .takeUntilZIO(endingStreamsCompletedAndCommitsExist) - .runDrain *> - commitSync *> - ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + for { + _ <- logInitialStreamCompletionStatuses + completedAndCommits <- + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) + .tap(commitAsync) + .forever + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .scan(Chunk.empty[Runloop.Commit])(_ ++ _) + .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) + .takeUntil { case (completed, _) => completed } + .runLast + .map(_.getOrElse((false, Chunk.empty))) + _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2) + _ <- commitSync + _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } yield () } // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times.