Skip to content

Commit

Permalink
Make maxRebalanceDuration configurable (#1118)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Nov 29, 2023
1 parent db2f566 commit 8ab0c96
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
22 changes: 20 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ final case class ConsumerSettings(
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
Expand Down Expand Up @@ -156,7 +157,7 @@ final case class ConsumerSettings(
copy(restartStreamOnRebalancing = value)

/**
* WARNING: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet.
* NOTE: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet.
*
* @param value
* Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is
Expand All @@ -179,7 +180,7 @@ final case class ConsumerSettings(
* messages until the revoked streams are ready committing.
*
* Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this
* calculates to 3 minutes.
* calculates to 3 minutes. See [[#withMaxRebalanceDuration]] to change the default.
*
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
Expand All @@ -189,6 +190,23 @@ final case class ConsumerSettings(
def withRebalanceSafeCommits(value: Boolean): ConsumerSettings =
copy(rebalanceSafeCommits = value)

/**
* NOTE: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet.
*
* @param value
* Maximum time spent in the rebalance callback when `rebalanceSafeCommits` is enabled.
*
* In this time zio-kafka awaits processing of records and the completion of commits.
*
* By default this value is set to 3/5 of `maxPollInterval` which by default calculates to 3 minutes. Only values
* between `commitTimeout` and `maxPollInterval` are useful. Lower values will make the rebalance callback be done
* immediately, higher values lead to lost partitions.
*
* See [[#withRebalanceSafeCommits]] for more information.
*/
def withMaxRebalanceDuration(value: Duration): ConsumerSettings =
copy(maxRebalanceDuration = Some(value))

def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings =
withProperties(credentialsStore.properties)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[consumer] final class Runloop private (
offsetRetrieval: OffsetRetrieval,
userRebalanceListener: RebalanceListener,
restartStreamsOnRebalancing: Boolean,
maxRebalanceDuration: Duration,
rebalanceSafeCommits: Boolean,
currentStateRef: Ref[State],
committedOffsetsRef: Ref[CommitOffsets],
Expand Down Expand Up @@ -82,11 +83,6 @@ private[consumer] final class Runloop private (
// `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to
// another thread cannot be used.

// Maximum time spent in the rebalance callback.
// In this time zio-kafka awaits processing of records and the completion of commits.
// We use 3/5 of `maxPollInterval` which by default calculates to 3 minutes.
val maxEndingStreamsInterval = (maxPollInterval.toNanos / 5L) * 3L

// Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled.
val commitQueuePollInterval = 100.millis

Expand All @@ -107,7 +103,7 @@ private[consumer] final class Runloop private (
state: State,
streamsToEnd: Chunk[PartitionStreamControl]
): Task[Unit] = {
val deadline = java.lang.System.nanoTime() + maxEndingStreamsInterval - commitTimeout.toNanos
val deadline = java.lang.System.nanoTime() + maxRebalanceDuration.toNanos - commitTimeout.toNanos

val endingTps = streamsToEnd.map(_.tp).toSet

Expand Down Expand Up @@ -790,6 +786,7 @@ object Runloop {
userRebalanceListener: RebalanceListener,
restartStreamsOnRebalancing: Boolean,
rebalanceSafeCommits: Boolean,
maxRebalanceDuration: Duration,
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
fetchStrategy: FetchStrategy
): URIO[Scope, Runloop] =
Expand Down Expand Up @@ -818,6 +815,7 @@ object Runloop {
userRebalanceListener = userRebalanceListener,
restartStreamsOnRebalancing = restartStreamsOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = maxRebalanceDuration,
currentStateRef = currentStateRef,
committedOffsetsRef = committedOffsetsRef,
fetchStrategy = fetchStrategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private[consumer] object RunloopAccess {
): ZIO[Scope, Throwable, RunloopAccess] =
for {
maxPollInterval <- maxPollIntervalConfig(settings)
// See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]:
maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos)
// This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer
// When the Consumer is shutdown, the Runloop and the Hub will be shutdown too (before the consumer)
consumerScope <- ZIO.scope
Expand All @@ -97,6 +99,7 @@ private[consumer] object RunloopAccess {
userRebalanceListener = settings.rebalanceListener,
restartStreamsOnRebalancing = settings.restartStreamOnRebalancing,
rebalanceSafeCommits = settings.rebalanceSafeCommits,
maxRebalanceDuration = maxRebalanceDuration,
partitionsHub = partitionsHub,
fetchStrategy = settings.fetchStrategy
)
Expand Down

0 comments on commit 8ab0c96

Please sign in to comment.