From 5e37292c2f152e744a22c50da7b4e5aecbafd9aa Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sat, 16 Nov 2024 11:16:18 +0100 Subject: [PATCH 1/4] Prevent unlimited enqueueing of CommitAvailable commands --- .../zio/kafka/consumer/internal/Runloop.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 8d45f8e55..b017c7a3d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -33,7 +33,8 @@ private[consumer] final class Runloop private ( maxStreamPullInterval: Duration, maxRebalanceDuration: Duration, currentStateRef: Ref[State], - committedOffsetsRef: Ref[CommitOffsets] + committedOffsetsRef: Ref[CommitOffsets], + commitAvailable: Ref[Boolean] ) { private val commitTimeout = settings.commitTimeout private val commitTimeoutNanos = settings.commitTimeout.toNanos @@ -327,10 +328,11 @@ private[consumer] final class Runloop private ( for { p <- Promise.make[Throwable, Unit] startTime = java.lang.System.nanoTime() - _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) - _ <- commandQueue.offer(RunloopCommand.CommitAvailable) - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + _ <- commitQueue.offer(Runloop.Commit(java.lang.System.nanoTime(), offsets, p)) + commitAvailable <- commitAvailable.getAndSet(true) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable).unless(commitAvailable) + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) endTime = java.lang.System.nanoTime() latency = (endTime - startTime).nanoseconds _ <- consumerMetrics.observeCommit(latency) @@ -819,6 +821,7 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { + _ <- commitAvailable.set(false) commitCommands <- commitQueue.takeAll _ <- ZIO.logDebug( s"Processing ${commitCommands.size} commits," + @@ -958,6 +961,7 @@ object Runloop { lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) + commitAvailable <- Ref.make(false) committedOffsetsRef <- Ref.make(CommitOffsets.empty) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) executor <- ZIO.executor @@ -974,7 +978,8 @@ object Runloop { maxStreamPullInterval = maxStreamPullInterval, maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, - committedOffsetsRef = committedOffsetsRef + committedOffsetsRef = committedOffsetsRef, + commitAvailable = commitAvailable ) _ <- ZIO.logDebug("Starting Runloop") From 6afa3efad87d702d10a707d225deb9b238b12a06 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 26 Nov 2024 21:13:06 +0100 Subject: [PATCH 2/4] Renames + comment --- .../zio/kafka/consumer/internal/Runloop.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 39570b7c7..6d8d9178f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -23,7 +23,7 @@ private[consumer] final class Runloop private ( sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, commandQueue: Queue[RunloopCommand], - commitAvailableQueue: Queue[Boolean], + commitAvailable: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, @@ -489,7 +489,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) + .merge(ZStream.fromQueue(commitAvailable).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { @@ -583,10 +583,11 @@ object Runloop { partitionsHub: Hub[Take[Throwable, PartitionAssignment]] ): URIO[Scope, Runloop] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) - commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) - commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) + _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) + // A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll + commitAvailable <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) @@ -597,7 +598,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commitAvailableQueue.offer(true).unit, + commitAvailable.offer(true).unit, sameThreadRuntime ) rebalanceCoordinator = new RebalanceCoordinator( @@ -614,7 +615,7 @@ object Runloop { sameThreadRuntime = sameThreadRuntime, consumer = consumer, commandQueue = commandQueue, - commitAvailableQueue = commitAvailableQueue, + commitAvailable = commitAvailable, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval, From df5cd5f8841ff932fef2720c2c9dd5da26bb6f0e Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 19 Jan 2025 10:45:35 +0100 Subject: [PATCH 3/4] Fix merge error --- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 28302dd33..19c0199eb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -600,8 +600,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commitAvailable.offer(true).unit, - sameThreadRuntime + commitAvailable.offer(true).unit ) rebalanceCoordinator = new RebalanceCoordinator( lastRebalanceEvent, From a1c279a9859e3142d6540dfe3763814cba4670b5 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 19 Jan 2025 10:46:29 +0100 Subject: [PATCH 4/4] Rename for consistency --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 19c0199eb..434816f96 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -23,7 +23,7 @@ private[consumer] final class Runloop private ( sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, commandQueue: Queue[RunloopCommand], - commitAvailable: Queue[Boolean], + commitAvailableQueue: Queue[Boolean], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, maxStreamPullInterval: Duration, @@ -492,7 +492,7 @@ private[consumer] final class Runloop private ( ZStream .fromQueue(commandQueue) - .merge(ZStream.fromQueue(commitAvailable).as(RunloopCommand.CommitAvailable)) + .merge(ZStream.fromQueue(commitAvailableQueue).as(RunloopCommand.CommitAvailable)) .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { @@ -588,8 +588,8 @@ object Runloop { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) // A one-element dropping queue used to signal between two fibers that new commits are pending and we should poll - commitAvailable <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) - lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) + commitAvailableQueue <- ZIO.acquireRelease(Queue.dropping[Boolean](1))(_.shutdown) + lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) @@ -600,7 +600,7 @@ object Runloop { settings.commitTimeout, diagnostics, metrics, - commitAvailable.offer(true).unit + commitAvailableQueue.offer(true).unit ) rebalanceCoordinator = new RebalanceCoordinator( lastRebalanceEvent, @@ -616,7 +616,7 @@ object Runloop { sameThreadRuntime = sameThreadRuntime, consumer = consumer, commandQueue = commandQueue, - commitAvailable = commitAvailable, + commitAvailableQueue = commitAvailableQueue, partitionsHub = partitionsHub, diagnostics = diagnostics, maxStreamPullInterval = maxStreamPullInterval,