Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple stream halt detection timeout from max poll interval #1376

Merged
merged 14 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault {
for {
control <- createTestControl
now <- Clock.nanoTime
exceeded <- control.maxPollIntervalExceeded(now)
exceeded <- control.maxStreamPullIntervalExceeded(now)
} yield assertTrue(!exceeded)
},
test("maxPollIntervalExceeded returns true after timeout") {
Expand All @@ -123,7 +123,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault {
_ <- control.offerRecords(createTestRecords(1))
now <- Clock.nanoTime
futureTime = now + Duration.fromSeconds(31).toNanos
exceeded <- control.maxPollIntervalExceeded(futureTime)
exceeded <- control.maxStreamPullIntervalExceeded(futureTime)
} yield assertTrue(exceeded)
}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ object Consumer {
* - creating `access` as a fair semaphore with a single permit,
* - acquire a permit from `access` before using the consumer, and release if afterwards,
* - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`,
* `pause`, `resume`, and `enforceRebalance`.
* `pause`, `resume`, and `enforceRebalance`,
* - keeping the consumer config given to the java consumer in sync with the properties in `settings` (for example
* by constructing `settings` with `ConsumerSettings(bootstrapServers).withProperties(config)`).
*
* Any deviation of these rules is likely to cause hard to track errors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ final case class ConsumerSettings(
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(),
metricLabels: Set[MetricLabel] = Set.empty,
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis),
maxStreamPullIntervalOption: Option[Duration] = None
) {
// Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType:
require(
Expand Down Expand Up @@ -153,21 +154,34 @@ final case class ConsumerSettings(
* Set Kafka's `max.poll.interval.ms` configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms for more information.
*
* Zio-kafka uses this value also to determine whether a stream stopped processing. If no chunks are pulled from a
* stream for this interval (while data is available) we consider the stream to be halted. When this happens we
* interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of zio-kafka
* we may (instead of a shutdown) stop only the affected subscription.
*
* The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. The
* maximum number of records in a single poll is configured with the `max.poll.records` configuration (see
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]).
* The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. See also
* the [[withMaxPollRecords maxPollRecords]] configuration.
*/
def withMaxPollInterval(maxPollInterval: Duration): ConsumerSettings =
withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString)

/**
* The maximum time a stream may run without pulling a chunk of records.
*
* Zio-kafka uses this value to determine whether a stream stopped processing. This is to safeguard against alive
* consumers in the consumer group which hold partition assignments but make no progress. If no chunks are pulled by
* user code from a partition stream for this interval (while data is available) we consider the stream to be halted.
* When this happens we interrupt the stream with a failure. In addition, the entire consumer is shutdown. In future
* versions of zio-kafka we may (instead of a shutdown) stop only the affected subscription.
*
* Make sure that all records from a single poll (see [[withMaxPollRecords maxPollRecords]]) can be processed in this
* interval, even when there is no concurrency because the records are all in the same partition.
*
* The default is equal to [[withMaxPollInterval maxPollInterval]]).
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still correct, if so where is it set?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In RunloopAccess:

maxStreamPullInterval = settings.maxStreamPullIntervalOption.getOrElse(maxPollInterval)

*/
def withMaxStreamPullInterval(maxStreamPullInterval: Duration): ConsumerSettings =
copy(maxStreamPullIntervalOption = Some(maxStreamPullInterval))

/**
* Set Kafka's `max.poll.records` configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records for more information.
*
* The default is 500.
*/
def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings =
withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ abstract class PartitionStream {
* the last pulled offset (if any). The promise completes when the stream completed.
* @param queueInfoRef
* used to track the stream's pull deadline, its queue size, and last pulled offset
* @param maxPollInterval
* see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]]
* @param maxStreamPullInterval
* see [[zio.kafka.consumer.ConsumerSettings.withMaxStreamPullInterval()]]
*/
final class PartitionStreamControl private (
val tp: TopicPartition,
Expand All @@ -41,9 +41,9 @@ final class PartitionStreamControl private (
interruptionPromise: Promise[Throwable, Nothing],
val completedPromise: Promise[Nothing, Option[Offset]],
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
maxStreamPullInterval: Duration
) extends PartitionStream {
private val maxPollIntervalNanos = maxPollInterval.toNanos
private val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos

private val logAnnotate = ZIO.logAnnotate(
LogAnnotation("topic", tp.topic()),
Expand All @@ -57,7 +57,7 @@ final class PartitionStreamControl private (
} else {
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
newPullDeadline = now + maxStreamPullIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
Expand All @@ -81,14 +81,12 @@ final class PartitionStreamControl private (
* `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data
* became available), `false` otherwise
*/
private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] =
private[internal] def maxStreamPullIntervalExceeded(now: NanoTime): UIO[Boolean] =
queueInfoRef.get.map(_.deadlineExceeded(now))

/** To be invoked when the stream is no longer processing. */
private[internal] def halt: UIO[Unit] = {
val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " +
"Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " +
"needs more time."
val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp."
val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace
interruptionPromise.fail(consumeTimeout).unit
}
Expand Down Expand Up @@ -130,14 +128,14 @@ object PartitionStreamControl {
tp: TopicPartition,
requestData: UIO[Unit],
diagnostics: Diagnostics,
maxPollInterval: Duration
maxStreamPullInterval: Duration
): UIO[PartitionStreamControl] = {
val maxPollIntervalNanos = maxPollInterval.toNanos
val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos

def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] =
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
newPullDeadline = now + maxStreamPullIntervalNanos
_ <- queueInfo.update(_.withPull(newPullDeadline, records))
} yield ()

Expand Down Expand Up @@ -184,7 +182,7 @@ object PartitionStreamControl {
interruptionPromise,
completedPromise,
queueInfo,
maxPollInterval
maxStreamPullInterval
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ private[consumer] final class Runloop private (
topLevelExecutor: Executor,
sameThreadRuntime: Runtime[Any],
consumer: ConsumerAccess,
maxPollInterval: Duration,
commitQueue: Queue[Commit],
commandQueue: Queue[RunloopCommand],
lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent],
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
diagnostics: Diagnostics,
maxStreamPullInterval: Duration,
maxRebalanceDuration: Duration,
currentStateRef: Ref[State],
committedOffsetsRef: Ref[CommitOffsets]
Expand All @@ -48,7 +48,7 @@ private[consumer] final class Runloop private (
tp,
commandQueue.offer(RunloopCommand.Request(tp)).unit,
diagnostics,
maxPollInterval
maxStreamPullInterval
)

def stopConsumption: UIO[Unit] =
Expand Down Expand Up @@ -657,7 +657,7 @@ private[consumer] final class Runloop private (
pollResult.records
)
updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending)
_ <- checkStreamPollInterval(pollResult.assignedStreams)
_ <- checkStreamPullInterval(pollResult.assignedStreams)
} yield state.copy(
pendingRequests = fulfillResult.pendingRequests,
pendingCommits = updatedPendingCommits,
Expand All @@ -666,20 +666,29 @@ private[consumer] final class Runloop private (
}

/**
* Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded
* its poll interval, shutdown the consumer.
* Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded
* its pull interval, shutdown the consumer.
*/
private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] =
private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = {
def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] =
ZIO.logError(
s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down. " +
"Use ConsumerSettings.withMaxPollInterval or .withMaxStreamPullInterval to set a longer interval when " +
"processing a batch of records needs more time."
)

for {
now <- Clock.nanoTime
anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) =>
stream
.maxPollIntervalExceeded(now)
.maxStreamPullIntervalExceeded(now)
.tap(ZIO.when(_)(logShutdown(stream)))
.tap(exceeded => if (exceeded) stream.halt else ZIO.unit)
.map(acc || _)
}
_ <- shutdown.when(anyExceeded)
} yield ()
}

private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = {
def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] =
Expand Down Expand Up @@ -936,7 +945,7 @@ object Runloop {

private[consumer] def make(
settings: ConsumerSettings,
maxPollInterval: Duration,
maxStreamPullInterval: Duration,
maxRebalanceDuration: Duration,
diagnostics: Diagnostics,
consumer: ConsumerAccess,
Expand All @@ -957,12 +966,12 @@ object Runloop {
topLevelExecutor = executor,
sameThreadRuntime = sameThreadRuntime,
consumer = consumer,
maxPollInterval = maxPollInterval,
commitQueue = commitQueue,
commandQueue = commandQueue,
lastRebalanceEvent = lastRebalanceEvent,
partitionsHub = partitionsHub,
diagnostics = diagnostics,
maxStreamPullInterval = maxStreamPullInterval,
maxRebalanceDuration = maxRebalanceDuration,
currentStateRef = currentStateRef,
committedOffsetsRef = committedOffsetsRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ private[consumer] object RunloopAccess {
): ZIO[Scope, Throwable, RunloopAccess] =
for {
maxPollInterval <- maxPollIntervalConfig(settings)
maxStreamPullInterval = settings.maxStreamPullIntervalOption.getOrElse(maxPollInterval)
// See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]:
maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos)
// This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer
Expand All @@ -90,7 +91,7 @@ private[consumer] object RunloopAccess {
makeRunloop = Runloop
.make(
settings = settings,
maxPollInterval = maxPollInterval,
maxStreamPullInterval = maxStreamPullInterval,
maxRebalanceDuration = maxRebalanceDuration,
diagnostics = diagnostics,
consumer = consumerAccess,
Expand Down
Loading