From fd0c49df371e8011f1ef621a5b2cc8840b41251f Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 10:35:00 +0100 Subject: [PATCH 01/14] Refactor stream completion status in preparation of additional logging --- .../zio/kafka/consumer/internal/Runloop.scala | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 35cf60fa5..6caed5076 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -125,35 +125,49 @@ private[consumer] final class Runloop private ( ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie } - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + sealed trait EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus + case object EndOffsetCommitPending extends EndOffsetCommitStatus + case object EndOffsetCommitted extends EndOffsetCommitStatus + + case class StreamCompletionStatus( + tp: TopicPartition, + isDone: Boolean, + lastPulledOffset: Option[Long], + endOffsetCommitStatus: EndOffsetCommitStatus + ) + + def getStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Chunk[StreamCompletionStatus]] = for { + committedOffsets <- committedOffsetsRef.get + allPendingCommitOffsets = (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets) streamResults <- ZIO.foreach(streamsToEnd) { stream => for { isDone <- stream.completedPromise.isDone lastPulledOffset <- stream.lastPulledOffset endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - } yield (isDone || lastPulledOffset.isEmpty, endOffset) - } - committedOffsets <- committedOffsetsRef.get - } yield { - val allStreamsCompleted = streamResults.forall(_._1) - allStreamsCompleted && { - val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2) - val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits) - endOffsets.forall { endOffset => - val tp = endOffset.topicPartition - val offset = endOffset.offset - def endOffsetWasCommitted = committedOffsets.contains(tp, offset) - def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit => - pendingCommit.offsets.get(tp).exists { pendingOffset => - pendingOffset.offset() >= offset - } - } - endOffsetWasCommitted || endOffsetCommitIsPending + + endOffsetCommitStatus = endOffset match { + case Some(endOffset) + if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if allPendingCommitOffsets.exists { case (tp, offset) => + tp == stream.tp && offset.offset() >= endOffset.offset + } => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } + } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) } - } - } + } yield streamResults + + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + for { + streamResults <- getStreamCompletionStatuses(newCommits) + } yield streamResults.forall(status => + (status.isDone || status.lastPulledOffset.isEmpty) && (status.endOffsetCommitStatus != EndOffsetNotCommitted) + ) def commitSync: Task[Unit] = ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) From 0d6258be16af517072e6a2e63071fd46fe28ac8e Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 11:01:24 +0100 Subject: [PATCH 02/14] Add logging + fix condition --- .../zio/kafka/consumer/ConsumerSpec.scala | 5 ++-- .../zio/kafka/consumer/internal/Runloop.scala | 29 ++++++++++++++++--- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4fae127a0..e491a2dfd 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId = clientId, groupId = Some(groupId), `max.poll.records` = 1, - rebalanceSafeCommits = rebalanceSafeCommits + rebalanceSafeCommits = rebalanceSafeCommits, + maxRebalanceDuration = 30.seconds ) consumer <- Consumer.make(settings) } yield consumer @@ -863,7 +864,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { testForPartitionAssignmentStrategy[RangeAssignor], testForPartitionAssignmentStrategy[CooperativeStickyAssignor] ) - }: _*), + }: _*) @@ TestAspect.sequential, test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6caed5076..a7378de73 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -135,7 +135,10 @@ private[consumer] final class Runloop private ( isDone: Boolean, lastPulledOffset: Option[Long], endOffsetCommitStatus: EndOffsetCommitStatus - ) + ) { + override def toString: String = + s"${tp}: isDone=${isDone}, lastPulledOffset=${lastPulledOffset.getOrElse("none")}, endOffsetCommitStatus: ${endOffsetCommitStatus}" + } def getStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Chunk[StreamCompletionStatus]] = for { @@ -162,11 +165,27 @@ private[consumer] final class Runloop private ( } } yield streamResults + def logInitialStreamCompletionStatuses: ZIO[Any, Nothing, Unit] = + getStreamCompletionStatuses(newCommits = Chunk.empty).flatMap { completionStatuses => + val statusStrings = completionStatuses.map(_.toString) + ZIO.logInfo(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") + } + + def logFinalStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Unit] = + getStreamCompletionStatuses(newCommits).flatMap { completionStatuses => + val statusStrings = completionStatuses.map(_.toString) + ZIO + .logWarning( + s"Exceeded deadline waiting for streams to end, will continue with rebalance: ${statusStrings.mkString("; ")}" + ) + .when(java.lang.System.nanoTime() >= deadline) + }.unit + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = for { streamResults <- getStreamCompletionStatuses(newCommits) } yield streamResults.forall(status => - (status.isDone || status.lastPulledOffset.isEmpty) && (status.endOffsetCommitStatus != EndOffsetNotCommitted) + status.lastPulledOffset.isEmpty || (status.isDone && status.endOffsetCommitStatus != EndOffsetNotCommitted) ) def commitSync: Task[Unit] = @@ -188,7 +207,7 @@ private[consumer] final class Runloop private ( // // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. // Instead, we poll the queue in a loop. - ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *> + logInitialStreamCompletionStatuses *> ZStream .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) .tap(commitAsync) @@ -196,7 +215,9 @@ private[consumer] final class Runloop private ( .takeWhile(_ => java.lang.System.nanoTime() <= deadline) .scan(Chunk.empty[Runloop.Commit])(_ ++ _) .takeUntilZIO(endingStreamsCompletedAndCommitsExist) - .runDrain *> + .runCollect + .map(_.flatten) + .flatMap(logFinalStreamCompletionStatuses) *> commitSync *> ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") } From 399dcac6ce1bb4c6ce28ac47d85ac4b056329275 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 11:05:48 +0100 Subject: [PATCH 03/14] Also some debug logging while waiting --- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index a7378de73..00108220c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -183,8 +183,10 @@ private[consumer] final class Runloop private ( def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = for { - streamResults <- getStreamCompletionStatuses(newCommits) - } yield streamResults.forall(status => + completionStatuses <- getStreamCompletionStatuses(newCommits) + statusStrings = completionStatuses.map(_.toString) + _ <- ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") + } yield completionStatuses.forall(status => status.lastPulledOffset.isEmpty || (status.isDone && status.endOffsetCommitStatus != EndOffsetNotCommitted) ) From c56dc69228284cb20a8d0353fb78f7cbdb64e60b Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 11:16:50 +0100 Subject: [PATCH 04/14] Fix lint --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 00108220c..a9fae0f48 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -130,7 +130,7 @@ private[consumer] final class Runloop private ( case object EndOffsetCommitPending extends EndOffsetCommitStatus case object EndOffsetCommitted extends EndOffsetCommitStatus - case class StreamCompletionStatus( + final case class StreamCompletionStatus( tp: TopicPartition, isDone: Boolean, lastPulledOffset: Option[Long], From 11b3238556124d2b5e0654e6b7e1f4f8071a8171 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 3 Nov 2024 14:37:04 +0100 Subject: [PATCH 05/14] Increase timeout --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index e491a2dfd..fcc2d6102 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -770,7 +770,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupId = Some(groupId), `max.poll.records` = 1, rebalanceSafeCommits = rebalanceSafeCommits, - maxRebalanceDuration = 30.seconds + maxRebalanceDuration = 60.seconds ) consumer <- Consumer.make(settings) } yield consumer From dc38739810615ff8d2a52633e7ebe30117620780 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 5 Nov 2024 20:50:27 +0100 Subject: [PATCH 06/14] Correct race condition --- .../zio/kafka/consumer/internal/Runloop.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index a9fae0f48..25ce14b49 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -171,17 +171,17 @@ private[consumer] final class Runloop private ( ZIO.logInfo(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") } - def logFinalStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Unit] = + def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): ZIO[Any, Nothing, Unit] = getStreamCompletionStatuses(newCommits).flatMap { completionStatuses => - val statusStrings = completionStatuses.map(_.toString) ZIO .logWarning( - s"Exceeded deadline waiting for streams to end, will continue with rebalance: ${statusStrings.mkString("; ")}" + s"Exceeded deadline waiting for streams to end, will continue with rebalance: ${completionStatuses.map(_.toString).mkString("; ")}" ) - .when(java.lang.System.nanoTime() >= deadline) - }.unit + } + .unless(completed) + .unit - def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = for { completionStatuses <- getStreamCompletionStatuses(newCommits) statusStrings = completionStatuses.map(_.toString) @@ -216,10 +216,11 @@ private[consumer] final class Runloop private ( .forever .takeWhile(_ => java.lang.System.nanoTime() <= deadline) .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .takeUntilZIO(endingStreamsCompletedAndCommitsExist) - .runCollect - .map(_.flatten) - .flatMap(logFinalStreamCompletionStatuses) *> + .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) + .takeUntil { case (completed, commits @ _) => completed } + .runLast + .map(_.getOrElse((false, Chunk.empty))) + .flatMap { case (completed, commits) => logFinalStreamCompletionStatuses(completed, commits) } *> commitSync *> ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") } From 2ee212ccbd48cc3b5c41b0a4ffcc84e48b0da89a Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 5 Nov 2024 20:50:33 +0100 Subject: [PATCH 07/14] Remove sequential --- .../src/test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index fcc2d6102..679a0d582 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -864,7 +864,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { testForPartitionAssignmentStrategy[RangeAssignor], testForPartitionAssignmentStrategy[CooperativeStickyAssignor] ) - }: _*) @@ TestAspect.sequential, + }: _*), test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic From 9caee508e8f4dcce90df2bfb7e9843822f4d3b5f Mon Sep 17 00:00:00 2001 From: svroonland Date: Wed, 6 Nov 2024 09:32:55 +0100 Subject: [PATCH 08/14] Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala Co-authored-by: Erik van Oosten --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 25ce14b49..dae5331b3 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -217,7 +217,7 @@ private[consumer] final class Runloop private ( .takeWhile(_ => java.lang.System.nanoTime() <= deadline) .scan(Chunk.empty[Runloop.Commit])(_ ++ _) .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) - .takeUntil { case (completed, commits @ _) => completed } + .takeUntil { case (completed, _) => completed } .runLast .map(_.getOrElse((false, Chunk.empty))) .flatMap { case (completed, commits) => logFinalStreamCompletionStatuses(completed, commits) } *> From e034b778d2438dc1ae3c43d4e0f50289182d8915 Mon Sep 17 00:00:00 2001 From: svroonland Date: Wed, 6 Nov 2024 09:33:26 +0100 Subject: [PATCH 09/14] Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala Co-authored-by: Erik van Oosten --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index dae5331b3..f5cea5d45 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -175,7 +175,7 @@ private[consumer] final class Runloop private ( getStreamCompletionStatuses(newCommits).flatMap { completionStatuses => ZIO .logWarning( - s"Exceeded deadline waiting for streams to end, will continue with rebalance: ${completionStatuses.map(_.toString).mkString("; ")}" + s"Exceeded deadline waiting for streams to commit the offsets of the records they consumed; the rebalance will continue. This might cause another consumer to process some records again. ${completionStatuses.map(_.toString).mkString("; ")}" ) } .unless(completed) From 221b2839e43ea86a2e667e89495a67a269574326 Mon Sep 17 00:00:00 2001 From: svroonland Date: Wed, 6 Nov 2024 09:43:07 +0100 Subject: [PATCH 10/14] Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala Co-authored-by: Erik van Oosten --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f5cea5d45..e6267439f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -186,9 +186,10 @@ private[consumer] final class Runloop private ( completionStatuses <- getStreamCompletionStatuses(newCommits) statusStrings = completionStatuses.map(_.toString) _ <- ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") - } yield completionStatuses.forall(status => + } yield completionStatuses.forall { status => + // A stream is complete when it never got any records, or when it committed the offset of the last consumed record status.lastPulledOffset.isEmpty || (status.isDone && status.endOffsetCommitStatus != EndOffsetNotCommitted) - ) + } def commitSync: Task[Unit] = ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) From bad1defee846331dd374e2e9fc5a7cc940fc44a3 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 9 Nov 2024 09:55:33 +0100 Subject: [PATCH 11/14] Stricter comparison of pending commit offset to last pulled offset --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index e6267439f..f1b7984b4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -143,7 +143,10 @@ private[consumer] final class Runloop private ( def getStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Chunk[StreamCompletionStatus]] = for { committedOffsets <- committedOffsetsRef.get - allPendingCommitOffsets = (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets) + allPendingCommitOffsets = + (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets).map { + case (tp, offsetAndMetadata) => (tp, offsetAndMetadata.offset()) + } streamResults <- ZIO.foreach(streamsToEnd) { stream => for { @@ -155,9 +158,8 @@ private[consumer] final class Runloop private ( case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => EndOffsetCommitted - case Some(endOffset) if allPendingCommitOffsets.exists { case (tp, offset) => - tp == stream.tp && offset.offset() >= endOffset.offset - } => + case Some(endOffset) + if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => EndOffsetCommitPending case _ => EndOffsetNotCommitted } From cc377dd838b1070814786a8056b3403f9d4fe2fd Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 10 Nov 2024 09:10:23 +0100 Subject: [PATCH 12/14] More rebalance safe commits logging alt (#1367) --- .../zio/kafka/consumer/internal/Runloop.scala | 115 +++++++++++------- 1 file changed, 69 insertions(+), 46 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f1b7984b4..ff31673a7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -106,6 +106,8 @@ private[consumer] final class Runloop private ( ): Task[Unit] = { val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos + def timeToDeadlineMillis(): Long = (java.lang.System.nanoTime() - deadline) / 1000000L + val endingTps = streamsToEnd.map(_.tp).toSet def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = @@ -126,21 +128,27 @@ private[consumer] final class Runloop private ( } sealed trait EndOffsetCommitStatus - case object EndOffsetNotCommitted extends EndOffsetCommitStatus - case object EndOffsetCommitPending extends EndOffsetCommitStatus - case object EndOffsetCommitted extends EndOffsetCommitStatus + case object EndOffsetNotCommitted extends EndOffsetCommitStatus { override def toString = "not committed" } + case object EndOffsetCommitPending extends EndOffsetCommitStatus { override def toString = "commit pending" } + case object EndOffsetCommitted extends EndOffsetCommitStatus { override def toString = "committed" } final case class StreamCompletionStatus( tp: TopicPartition, - isDone: Boolean, + streamEnded: Boolean, lastPulledOffset: Option[Long], endOffsetCommitStatus: EndOffsetCommitStatus ) { override def toString: String = - s"${tp}: isDone=${isDone}, lastPulledOffset=${lastPulledOffset.getOrElse("none")}, endOffsetCommitStatus: ${endOffsetCommitStatus}" + s"${tp}: " + + s"${if (streamEnded) "stream ended" else "stream is running"}, " + + s"last pulled offset=${lastPulledOffset.getOrElse("none")}, " + + endOffsetCommitStatus } - def getStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Chunk[StreamCompletionStatus]] = + def completionStatusesAsString(completionStatuses: Chunk[StreamCompletionStatus]): String = + "Revoked partitions: " + completionStatuses.map(_.toString).mkString("; ") + + def getStreamCompletionStatuses(newCommits: Chunk[Commit]): UIO[Chunk[StreamCompletionStatus]] = for { committedOffsets <- committedOffsetsRef.get allPendingCommitOffsets = @@ -154,45 +162,57 @@ private[consumer] final class Runloop private ( lastPulledOffset <- stream.lastPulledOffset endOffset <- if (isDone) stream.completedPromise.await else ZIO.none - endOffsetCommitStatus = endOffset match { - case Some(endOffset) - if committedOffsets.contains(stream.tp, endOffset.offset) => - EndOffsetCommitted - case Some(endOffset) - if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => - EndOffsetCommitPending - case _ => EndOffsetNotCommitted - } + endOffsetCommitStatus = + endOffset match { + case Some(endOffset) if committedOffsets.contains(stream.tp, endOffset.offset) => + EndOffsetCommitted + case Some(endOffset) if allPendingCommitOffsets.contains((stream.tp, endOffset.offset)) => + EndOffsetCommitPending + case _ => EndOffsetNotCommitted + } } yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus) } } yield streamResults - def logInitialStreamCompletionStatuses: ZIO[Any, Nothing, Unit] = - getStreamCompletionStatuses(newCommits = Chunk.empty).flatMap { completionStatuses => - val statusStrings = completionStatuses.map(_.toString) - ZIO.logInfo(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") - } + @inline + def logStreamCompletionStatuses(completionStatuses: Chunk[StreamCompletionStatus]): UIO[Unit] = { + val statusStrings = completionStatusesAsString(completionStatuses) + ZIO.logInfo( + s"Delaying rebalance until ${streamsToEnd.size} streams (of revoked partitions) have committed " + + s"the offsets of the records they consumed. Deadline in ${timeToDeadlineMillis()}ms. $statusStrings" + ) + } - def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): ZIO[Any, Nothing, Unit] = - getStreamCompletionStatuses(newCommits).flatMap { completionStatuses => - ZIO - .logWarning( - s"Exceeded deadline waiting for streams to commit the offsets of the records they consumed; the rebalance will continue. This might cause another consumer to process some records again. ${completionStatuses.map(_.toString).mkString("; ")}" - ) - } - .unless(completed) - .unit + def logInitialStreamCompletionStatuses: UIO[Unit] = + for { + completionStatuses <- getStreamCompletionStatuses(newCommits = Chunk.empty) + _ <- logStreamCompletionStatuses(completionStatuses) + } yield () def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] = for { completionStatuses <- getStreamCompletionStatuses(newCommits) - statusStrings = completionStatuses.map(_.toString) - _ <- ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}") + _ <- logStreamCompletionStatuses(completionStatuses) } yield completionStatuses.forall { status => // A stream is complete when it never got any records, or when it committed the offset of the last consumed record - status.lastPulledOffset.isEmpty || (status.isDone && status.endOffsetCommitStatus != EndOffsetNotCommitted) + status.lastPulledOffset.isEmpty || (status.streamEnded && status.endOffsetCommitStatus != EndOffsetNotCommitted) } + def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): UIO[Unit] = + if (completed) + ZIO.logInfo("Continuing rebalance, all offsets of consumed records in the revoked partitions were committed.") + else + for { + completionStatuses <- getStreamCompletionStatuses(newCommits) + statusStrings = completionStatusesAsString(completionStatuses) + _ <- + ZIO.logWarning( + s"Exceeded deadline waiting for streams (of revoked partitions) to commit the offsets of " + + s"the records they consumed; the rebalance will continue. " + + s"This might cause another consumer to process some records again. $statusStrings" + ) + } yield () + def commitSync: Task[Unit] = ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) @@ -212,20 +232,23 @@ private[consumer] final class Runloop private ( // // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. // Instead, we poll the queue in a loop. - logInitialStreamCompletionStatuses *> - ZStream - .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) - .tap(commitAsync) - .forever - .takeWhile(_ => java.lang.System.nanoTime() <= deadline) - .scan(Chunk.empty[Runloop.Commit])(_ ++ _) - .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) - .takeUntil { case (completed, _) => completed } - .runLast - .map(_.getOrElse((false, Chunk.empty))) - .flatMap { case (completed, commits) => logFinalStreamCompletionStatuses(completed, commits) } *> - commitSync *> - ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + for { + _ <- logInitialStreamCompletionStatuses + (completed, commits) <- + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) + .tap(commitAsync) + .forever + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .scan(Chunk.empty[Runloop.Commit])(_ ++ _) + .mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits))) + .takeUntil { case (completed, _) => completed } + .runLast + .map(_.getOrElse((false, Chunk.empty))) + _ <- logFinalStreamCompletionStatuses(completed, commits) + _ <- commitSync + _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } yield () } // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. From 7994954a9291b74899292ed812560252e1fae82f Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 09:21:04 +0100 Subject: [PATCH 13/14] Remove withFilter usage --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index ff31673a7..686ef768f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -234,7 +234,7 @@ private[consumer] final class Runloop private ( // Instead, we poll the queue in a loop. for { _ <- logInitialStreamCompletionStatuses - (completed, commits) <- + completedAndCommits <- ZStream .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) .tap(commitAsync) @@ -245,7 +245,7 @@ private[consumer] final class Runloop private ( .takeUntil { case (completed, _) => completed } .runLast .map(_.getOrElse((false, Chunk.empty))) - _ <- logFinalStreamCompletionStatuses(completed, commits) + _ <- logFinalStreamCompletionStatuses(completedAndCommits._1, completedAndCommits._2) _ <- commitSync _ <- ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") } yield () From d3e0270f67f19fed7374c77474028ec6bae5fa5a Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 17:37:01 +0100 Subject: [PATCH 14/14] Fix timeToDeadlineMillis --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 686ef768f..83948be52 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -106,7 +106,7 @@ private[consumer] final class Runloop private ( ): Task[Unit] = { val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeoutNanos - def timeToDeadlineMillis(): Long = (java.lang.System.nanoTime() - deadline) / 1000000L + def timeToDeadlineMillis(): Long = (deadline - java.lang.System.nanoTime()) / 1000000L val endingTps = streamsToEnd.map(_.tp).toSet