From 8ab0c96ba920aa9d1bb98fa0b1c313192d78e8db Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 29 Nov 2023 10:24:43 +0100 Subject: [PATCH] Make maxRebalanceDuration configurable (#1118) --- .../zio/kafka/consumer/ConsumerSettings.scala | 22 +++++++++++++++++-- .../zio/kafka/consumer/internal/Runloop.scala | 10 ++++----- .../consumer/internal/RunloopAccess.scala | 3 +++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index bcb5b7479..b42483b83 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -29,6 +29,7 @@ final case class ConsumerSettings( rebalanceListener: RebalanceListener = RebalanceListener.noop, restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, + maxRebalanceDuration: Option[Duration] = None, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { @@ -156,7 +157,7 @@ final case class ConsumerSettings( copy(restartStreamOnRebalancing = value) /** - * WARNING: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet. + * NOTE: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet. * * @param value * Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is @@ -179,7 +180,7 @@ final case class ConsumerSettings( * messages until the revoked streams are ready committing. * * Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this - * calculates to 3 minutes. + * calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default. * * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases @@ -189,6 +190,23 @@ final case class ConsumerSettings( def withRebalanceSafeCommits(value: Boolean): ConsumerSettings = copy(rebalanceSafeCommits = value) + /** + * NOTE: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet. + * + * @param value + * Maximum time spent in the rebalance callback when `rebalanceSafeCommits` is enabled. + * + * In this time zio-kafka awaits processing of records and the completion of commits. + * + * By default this value is set to 3/5 of `maxPollInterval` which by default calculates to 3 minutes. Only values + * between `commitTimeout` and `maxPollInterval` are useful. Lower values will make the rebalance callback be done + * immediately, higher values lead to lost partitions. + * + * See [[#withRebalanceSafeCommits]] for more information. + */ + def withMaxRebalanceDuration(value: Duration): ConsumerSettings = + copy(maxRebalanceDuration = Some(value)) + def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings = withProperties(credentialsStore.properties) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 48d1477be..1b4fd79c4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -36,6 +36,7 @@ private[consumer] final class Runloop private ( offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + maxRebalanceDuration: Duration, rebalanceSafeCommits: Boolean, currentStateRef: Ref[State], committedOffsetsRef: Ref[CommitOffsets], @@ -82,11 +83,6 @@ private[consumer] final class Runloop private ( // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to // another thread cannot be used. - // Maximum time spent in the rebalance callback. - // In this time zio-kafka awaits processing of records and the completion of commits. - // We use 3/5 of `maxPollInterval` which by default calculates to 3 minutes. - val maxEndingStreamsInterval = (maxPollInterval.toNanos / 5L) * 3L - // Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. val commitQueuePollInterval = 100.millis @@ -107,7 +103,7 @@ private[consumer] final class Runloop private ( state: State, streamsToEnd: Chunk[PartitionStreamControl] ): Task[Unit] = { - val deadline = java.lang.System.nanoTime() + maxEndingStreamsInterval - commitTimeout.toNanos + val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeout.toNanos val endingTps = streamsToEnd.map(_.tp).toSet @@ -790,6 +786,7 @@ object Runloop { userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, rebalanceSafeCommits: Boolean, + maxRebalanceDuration: Duration, partitionsHub: Hub[Take[Throwable, PartitionAssignment]], fetchStrategy: FetchStrategy ): URIO[Scope, Runloop] = @@ -818,6 +815,7 @@ object Runloop { userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, rebalanceSafeCommits = rebalanceSafeCommits, + maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, committedOffsetsRef = committedOffsetsRef, fetchStrategy = fetchStrategy diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 8abfbb724..3e1d0eb8d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -78,6 +78,8 @@ private[consumer] object RunloopAccess { ): ZIO[Scope, Throwable, RunloopAccess] = for { maxPollInterval <- maxPollIntervalConfig(settings) + // See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]: + maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos) // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer // When the Consumer is shutdown, the Runloop and the Hub will be shutdown too (before the consumer) consumerScope <- ZIO.scope @@ -97,6 +99,7 @@ private[consumer] object RunloopAccess { userRebalanceListener = settings.rebalanceListener, restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, rebalanceSafeCommits = settings.rebalanceSafeCommits, + maxRebalanceDuration = maxRebalanceDuration, partitionsHub = partitionsHub, fetchStrategy = settings.fetchStrategy )