From f3b5efa48ecf6bd7e2873469886de1f9ccee892c Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 20:21:29 +0100 Subject: [PATCH 01/11] Decouple stream halt detection timeout from max poll interval Fixes #1262 --- .../kafka/consumer/internal/RunloopSpec.scala | 1 - .../zio/kafka/consumer/ConsumerSettings.scala | 30 ++++++++++++------- .../internal/PartitionStreamControl.scala | 20 ++++++------- .../zio/kafka/consumer/internal/Runloop.scala | 12 ++++---- .../consumer/internal/RunloopAccess.scala | 1 - 5 files changed, 35 insertions(+), 29 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index e5a4ac71d..93759c133 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -203,7 +203,6 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j { runloop <- Runloop.make( consumerSettings, 100.millis, - 100.millis, diagnostics, consumerAccess, partitionsHub diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 7e5fc01b5..a1eb8d6f3 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -31,7 +31,8 @@ final case class ConsumerSettings( fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(), metricLabels: Set[MetricLabel] = Set.empty, runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), - authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis), + streamHaltDetectionTimeout: Duration = 5.minutes ) { // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: require( @@ -152,19 +153,28 @@ final case class ConsumerSettings( /** * Set Kafka's `max.poll.interval.ms` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms for more information. - * - * Zio-kafka uses this value also to determine whether a stream stopped processing. If no chunks are pulled from a - * stream for this interval (while data is available) we consider the stream to be halted. When this happens we - * interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of zio-kafka - * we may (instead of a shutdown) stop only the affected subscription. - * - * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. The - * maximum number of records in a single poll is configured with the `max.poll.records` configuration (see - * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]). */ def withMaxPollInterval(maxPollInterval: Duration): ConsumerSettings = withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString) + /** + * Zio-kafka uses this value to determine whether a stream stopped processing, to safeguard against alive consumers in + * the consumer group which hold partition assignments but make no progress. If no chunks are pulled by user code from + * a partition stream for this interval (while data is available) we consider the stream to be halted. When this + * happens we interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of + * zio-kafka we may (instead of a shutdown) stop only the affected subscription. + * + * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval and take + * into account that this is divided over the partitions. Also prefetching will impact the + * + * The maximum number of records in a single poll is configured with the `max.poll.records` configuration (see + * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]). The records + * are divided over the partition streams into chunks. The stream halt detection timeout needs to be larger than the + * time it takes your code to process one of these chunks. + */ + def withStreamHaltDetectionTimeout(streamHaltDetectionTimeout: Duration): ConsumerSettings = + copy(streamHaltDetectionTimeout = streamHaltDetectionTimeout) + /** * Set Kafka's `max.poll.records` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records for more information. diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 85a83eb19..ef0e398b0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -41,9 +41,9 @@ final class PartitionStreamControl private ( interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], - maxPollInterval: Duration + maxPullInterval: Duration ) extends PartitionStream { - private val maxPollIntervalNanos = maxPollInterval.toNanos + private val maxPullIntervalNanos = maxPullInterval.toNanos private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), @@ -57,7 +57,7 @@ final class PartitionStreamControl private ( } else { for { now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos + newPullDeadline = now + maxPullIntervalNanos _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) _ <- dataQueue.offer(Take.chunk(data)) } yield () @@ -81,16 +81,16 @@ final class PartitionStreamControl private ( * `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data * became available), `false` otherwise */ - private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] = + private[internal] def maxPullIntervalExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Boolean] = { - val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " + + val timeOutMessage = s"No records were polled for more than $maxPullInterval for topic partition $tp. " + "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + "needs more time." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace - interruptionPromise.fail(consumeTimeout) + ZIO.logWarning(timeOutMessage) *> interruptionPromise.fail(consumeTimeout) } /** To be invoked when the partition was lost. It clears the queue and ends the stream. */ @@ -129,14 +129,14 @@ object PartitionStreamControl { tp: TopicPartition, commandQueue: Queue[RunloopCommand], diagnostics: Diagnostics, - maxPollInterval: Duration + streamHaltDetectionTimeout: Duration ): UIO[PartitionStreamControl] = { - val maxPollIntervalNanos = maxPollInterval.toNanos + val streamHaltDetectionTimeoutNanos = streamHaltDetectionTimeout.toNanos def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] = for { now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos + newPullDeadline = now + streamHaltDetectionTimeoutNanos _ <- queueInfo.update(_.withPull(newPullDeadline, records)) } yield () @@ -183,7 +183,7 @@ object PartitionStreamControl { interruptionPromise, completedPromise, queueInfo, - maxPollInterval + streamHaltDetectionTimeout ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 35cf60fa5..3b766d74a 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -25,7 +25,6 @@ private[consumer] final class Runloop private ( topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, - maxPollInterval: Duration, commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], @@ -44,7 +43,7 @@ private[consumer] final class Runloop private ( private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels) private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = - PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval) + PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, settings.streamHaltDetectionTimeout) def stopConsumption: UIO[Unit] = ZIO.logDebug("stopConsumption called") *> @@ -597,15 +596,16 @@ private[consumer] final class Runloop private ( } /** - * Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded - * its poll interval, shutdown the consumer. + * Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded + * its pull interval, shutdown the consumer. */ private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = for { now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream - .maxPollIntervalExceeded(now) + .maxPullIntervalExceeded(now) + .tap(ZIO.when(_)(ZIO.logWarning(s"Stream for ${stream.tp} has exceeded "))) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) } @@ -867,7 +867,6 @@ object Runloop { private[consumer] def make( settings: ConsumerSettings, - maxPollInterval: Duration, maxRebalanceDuration: Duration, diagnostics: Diagnostics, consumer: ConsumerAccess, @@ -888,7 +887,6 @@ object Runloop { topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, - maxPollInterval = maxPollInterval, commitQueue = commitQueue, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6c90d2d1c..e50fa2645 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -90,7 +90,6 @@ private[consumer] object RunloopAccess { makeRunloop = Runloop .make( settings = settings, - maxPollInterval = maxPollInterval, maxRebalanceDuration = maxRebalanceDuration, diagnostics = diagnostics, consumer = consumerAccess, From 2bf131b197831e2a18fba8f9a6db7c1ddb8adbdc Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 20:25:48 +0100 Subject: [PATCH 02/11] Naming consistency --- .../internal/PartitionStreamControlSpec.scala | 4 ++-- .../consumer/internal/PartitionStreamControl.scala | 12 ++++++------ .../scala/zio/kafka/consumer/internal/Runloop.scala | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala index 7941e137d..a565dd9ea 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala @@ -114,7 +114,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { for { control <- createTestControl now <- Clock.nanoTime - exceeded <- control.maxPullIntervalExceeded(now) + exceeded <- control.streamHaltDetectionTimeoutExceeded(now) } yield assertTrue(!exceeded) }, test("maxPollIntervalExceeded returns true after timeout") { @@ -123,7 +123,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { _ <- control.offerRecords(createTestRecords(1)) now <- Clock.nanoTime futureTime = now + Duration.fromSeconds(31).toNanos - exceeded <- control.maxPullIntervalExceeded(futureTime) + exceeded <- control.streamHaltDetectionTimeoutExceeded(futureTime) } yield assertTrue(exceeded) } ), diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 2bd8298ba..7cd8755b4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -31,8 +31,8 @@ abstract class PartitionStream { * the last pulled offset (if any). The promise completes when the stream completed. * @param queueInfoRef * used to track the stream's pull deadline, its queue size, and last pulled offset - * @param maxPollInterval - * see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]] + * @param streamHaltDetectionTimeout + * see [[zio.kafka.consumer.ConsumerSettings.withStreamHaltDetectionTimeout()]] */ final class PartitionStreamControl private ( val tp: TopicPartition, @@ -41,9 +41,9 @@ final class PartitionStreamControl private ( interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], - maxPullInterval: Duration + streamHaltDetectionTimeout: Duration ) extends PartitionStream { - private val maxPullIntervalNanos = maxPullInterval.toNanos + private val maxPullIntervalNanos = streamHaltDetectionTimeout.toNanos private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), @@ -81,12 +81,12 @@ final class PartitionStreamControl private ( * `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data * became available), `false` otherwise */ - private[internal] def maxPullIntervalExceeded(now: NanoTime): UIO[Boolean] = + private[internal] def streamHaltDetectionTimeoutExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Unit] = { - val timeOutMessage = s"No records were polled for more than $maxPullInterval for topic partition $tp. " + + val timeOutMessage = s"No records were polled for more than $streamHaltDetectionTimeout for topic partition $tp. " + "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + "needs more time." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index cc4697a12..b86d7a35d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -609,7 +609,7 @@ private[consumer] final class Runloop private ( now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream - .maxPullIntervalExceeded(now) + .streamHaltDetectionTimeoutExceeded(now) .tap(ZIO.when(_)(ZIO.logWarning(s"Stream for ${stream.tp} has exceeded "))) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) From 89ec03b45364ff33641763e7b8a5241f46d331eb Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 20:30:07 +0100 Subject: [PATCH 03/11] Naming --- .../zio/kafka/consumer/internal/PartitionStreamControl.scala | 4 ++-- .../src/main/scala/zio/kafka/consumer/internal/Runloop.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 7cd8755b4..377d9d5c7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -43,7 +43,7 @@ final class PartitionStreamControl private ( queueInfoRef: Ref[QueueInfo], streamHaltDetectionTimeout: Duration ) extends PartitionStream { - private val maxPullIntervalNanos = streamHaltDetectionTimeout.toNanos + private val streamHaltDetectionTimeoutNanos = streamHaltDetectionTimeout.toNanos private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), @@ -57,7 +57,7 @@ final class PartitionStreamControl private ( } else { for { now <- Clock.nanoTime - newPullDeadline = now + maxPullIntervalNanos + newPullDeadline = now + streamHaltDetectionTimeoutNanos _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) _ <- dataQueue.offer(Take.chunk(data)) } yield () diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index b86d7a35d..745dc4ec7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -592,7 +592,7 @@ private[consumer] final class Runloop private ( pollResult.records ) updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - _ <- checkStreamPollInterval(pollResult.assignedStreams) + _ <- checkStreamsHaveHalted(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, pendingCommits = updatedPendingCommits, @@ -604,7 +604,7 @@ private[consumer] final class Runloop private ( * Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded * its pull interval, shutdown the consumer. */ - private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = + private def checkStreamsHaveHalted(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = for { now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => From 8a0222ed1be5b195322f5576c394e3e7ae4250a8 Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 10 Nov 2024 20:42:06 +0100 Subject: [PATCH 04/11] Fix test --- .../scala/zio/kafka/consumer/ConsumerSpec.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4fae127a0..700b7901a 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -369,13 +369,14 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId <- randomClient _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1)) _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2)) - settings <- consumerSettings( - clientId = clientId, - groupId = Some(group), - maxPollInterval = 2.seconds, - `max.poll.records` = 2 - ) - .map(_.withoutPartitionPreFetching.withPollTimeout(100.millis)) + settings <- + consumerSettings( + clientId = clientId, + groupId = Some(group), + maxPollInterval = 2.seconds, + `max.poll.records` = 2 + ) + .map(_.withoutPartitionPreFetching.withPollTimeout(100.millis).withStreamHaltDetectionTimeout(2.seconds)) consumer <- Consumer.make(settings) _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped From 8300485dffd11b9a45695407d53d63c4c113f463 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 12 Nov 2024 20:26:22 +0100 Subject: [PATCH 05/11] Rename new config to maxStreamPullInterval Also: - restore some scaladoc - simpler way to get `maxPollInterval` - more warnings for providing java consumer --- .../zio/kafka/consumer/ConsumerSpec.scala | 7 ++-- .../internal/PartitionStreamControlSpec.scala | 4 +-- .../kafka/consumer/internal/RunloopSpec.scala | 1 + .../scala/zio/kafka/consumer/Consumer.scala | 4 ++- .../zio/kafka/consumer/ConsumerSettings.scala | 36 ++++++++++--------- .../internal/PartitionStreamControl.scala | 22 ++++++------ .../zio/kafka/consumer/internal/Runloop.scala | 7 ++-- .../consumer/internal/RunloopAccess.scala | 18 ++++------ 8 files changed, 54 insertions(+), 45 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 315ffa8d9..c9a646236 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -375,8 +375,11 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupId = Some(group), maxPollInterval = 2.seconds, `max.poll.records` = 2 - ) - .map(_.withoutPartitionPreFetching.withPollTimeout(100.millis).withStreamHaltDetectionTimeout(2.seconds)) + ).map { + _.withoutPartitionPreFetching + .withPollTimeout(100.millis) + .withMaxStreamPullInterval(2.seconds) + } consumer <- Consumer.make(settings) _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala index a565dd9ea..9fcc3a979 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala @@ -114,7 +114,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { for { control <- createTestControl now <- Clock.nanoTime - exceeded <- control.streamHaltDetectionTimeoutExceeded(now) + exceeded <- control.maxStreamPullIntervalExceeded(now) } yield assertTrue(!exceeded) }, test("maxPollIntervalExceeded returns true after timeout") { @@ -123,7 +123,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { _ <- control.offerRecords(createTestRecords(1)) now <- Clock.nanoTime futureTime = now + Duration.fromSeconds(31).toNanos - exceeded <- control.streamHaltDetectionTimeoutExceeded(futureTime) + exceeded <- control.maxStreamPullIntervalExceeded(futureTime) } yield assertTrue(exceeded) } ), diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index 21a1b4f87..e30c4051e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -204,6 +204,7 @@ object RunloopSpec extends ZIOSpecDefaultSlf4j { runloop <- Runloop.make( consumerSettings, 100.millis, + 100.millis, diagnostics, consumerAccess, partitionsHub diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index def5d8c63..9d7ab0a22 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -221,7 +221,9 @@ object Consumer { * - creating `access` as a fair semaphore with a single permit, * - acquire a permit from `access` before using the consumer, and release if afterwards, * - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`, - * `pause`, `resume`, and `enforceRebalance`. + * `pause`, `resume`, and `enforceRebalance`, + * - keeping the consumer config given to the java consumer in sync with the properties in `settings` (for example + * by constructing `settings` with `ConsumerSettings(bootstrapServers).withProperties(config)`). * * Any deviation of these rules is likely to cause hard to track errors. * diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index a1eb8d6f3..e5e63771f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -32,7 +32,7 @@ final case class ConsumerSettings( metricLabels: Set[MetricLabel] = Set.empty, runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis), - streamHaltDetectionTimeout: Duration = 5.minutes + maxStreamPullIntervalOption: Option[Duration] = None ) { // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: require( @@ -153,31 +153,35 @@ final case class ConsumerSettings( /** * Set Kafka's `max.poll.interval.ms` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms for more information. + * + * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. See also + * the [[withMaxPollRecords maxPollRecords]] configuration. */ def withMaxPollInterval(maxPollInterval: Duration): ConsumerSettings = withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString) /** - * Zio-kafka uses this value to determine whether a stream stopped processing, to safeguard against alive consumers in - * the consumer group which hold partition assignments but make no progress. If no chunks are pulled by user code from - * a partition stream for this interval (while data is available) we consider the stream to be halted. When this - * happens we interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of - * zio-kafka we may (instead of a shutdown) stop only the affected subscription. - * - * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval and take - * into account that this is divided over the partitions. Also prefetching will impact the - * - * The maximum number of records in a single poll is configured with the `max.poll.records` configuration (see - * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]). The records - * are divided over the partition streams into chunks. The stream halt detection timeout needs to be larger than the - * time it takes your code to process one of these chunks. + * The maximum time a stream may run without pulling a chunk of records. + * + * Zio-kafka uses this value to determine whether a stream stopped processing. This is to safeguard against alive + * consumers in the consumer group which hold partition assignments but make no progress. If no chunks are pulled by + * user code from a partition stream for this interval (while data is available) we consider the stream to be halted. + * When this happens we interrupt the stream with a failure. In addition, the entire consumer is shutdown. In future + * versions of zio-kafka we may (instead of a shutdown) stop only the affected subscription. + * + * Make sure that all records from a single poll (see [[withMaxPollRecords maxPollRecords]]) can be processed in this + * interval, even when there is no concurrency because the records are all in the same partition. + * + * The default is equal to [[withMaxPollInterval maxPollInterval]]). */ - def withStreamHaltDetectionTimeout(streamHaltDetectionTimeout: Duration): ConsumerSettings = - copy(streamHaltDetectionTimeout = streamHaltDetectionTimeout) + def withMaxStreamPullInterval(maxStreamPullInterval: Duration): ConsumerSettings = + copy(maxStreamPullIntervalOption = Some(maxStreamPullInterval)) /** * Set Kafka's `max.poll.records` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records for more information. + * + * The default is 500. */ def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings = withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 377d9d5c7..84bf7bb78 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -31,8 +31,8 @@ abstract class PartitionStream { * the last pulled offset (if any). The promise completes when the stream completed. * @param queueInfoRef * used to track the stream's pull deadline, its queue size, and last pulled offset - * @param streamHaltDetectionTimeout - * see [[zio.kafka.consumer.ConsumerSettings.withStreamHaltDetectionTimeout()]] + * @param maxStreamPullInterval + * see [[zio.kafka.consumer.ConsumerSettings.withMaxStreamPullInterval()]] */ final class PartitionStreamControl private ( val tp: TopicPartition, @@ -41,9 +41,9 @@ final class PartitionStreamControl private ( interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], - streamHaltDetectionTimeout: Duration + maxStreamPullInterval: Duration ) extends PartitionStream { - private val streamHaltDetectionTimeoutNanos = streamHaltDetectionTimeout.toNanos + private val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), @@ -57,7 +57,7 @@ final class PartitionStreamControl private ( } else { for { now <- Clock.nanoTime - newPullDeadline = now + streamHaltDetectionTimeoutNanos + newPullDeadline = now + maxStreamPullIntervalNanos _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) _ <- dataQueue.offer(Take.chunk(data)) } yield () @@ -81,12 +81,12 @@ final class PartitionStreamControl private ( * `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data * became available), `false` otherwise */ - private[internal] def streamHaltDetectionTimeoutExceeded(now: NanoTime): UIO[Boolean] = + private[internal] def maxStreamPullIntervalExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Unit] = { - val timeOutMessage = s"No records were polled for more than $streamHaltDetectionTimeout for topic partition $tp. " + + val timeOutMessage = s"No records were polled for more than $maxStreamPullInterval for topic partition $tp. " + "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + "needs more time." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace @@ -130,14 +130,14 @@ object PartitionStreamControl { tp: TopicPartition, requestData: UIO[Unit], diagnostics: Diagnostics, - streamHaltDetectionTimeout: Duration + maxStreamPullInterval: Duration ): UIO[PartitionStreamControl] = { - val streamHaltDetectionTimeoutNanos = streamHaltDetectionTimeout.toNanos + val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] = for { now <- Clock.nanoTime - newPullDeadline = now + streamHaltDetectionTimeoutNanos + newPullDeadline = now + maxStreamPullIntervalNanos _ <- queueInfo.update(_.withPull(newPullDeadline, records)) } yield () @@ -184,7 +184,7 @@ object PartitionStreamControl { interruptionPromise, completedPromise, queueInfo, - streamHaltDetectionTimeout + maxStreamPullInterval ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 804f6fad6..db71ef59c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -30,6 +30,7 @@ private[consumer] final class Runloop private ( lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, + maxStreamPullInterval: Duration, maxRebalanceDuration: Duration, currentStateRef: Ref[State], committedOffsetsRef: Ref[CommitOffsets] @@ -47,7 +48,7 @@ private[consumer] final class Runloop private ( tp, commandQueue.offer(RunloopCommand.Request(tp)).unit, diagnostics, - settings.streamHaltDetectionTimeout + maxStreamPullInterval ) def stopConsumption: UIO[Unit] = @@ -673,7 +674,7 @@ private[consumer] final class Runloop private ( now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream - .streamHaltDetectionTimeoutExceeded(now) + .maxStreamPullIntervalExceeded(now) .tap(ZIO.when(_)(ZIO.logWarning(s"Stream for ${stream.tp} has exceeded "))) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) @@ -936,6 +937,7 @@ object Runloop { private[consumer] def make( settings: ConsumerSettings, + maxStreamPullInterval: Duration, maxRebalanceDuration: Duration, diagnostics: Diagnostics, consumer: ConsumerAccess, @@ -961,6 +963,7 @@ object Runloop { lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, diagnostics = diagnostics, + maxStreamPullInterval = maxStreamPullInterval, maxRebalanceDuration = maxRebalanceDuration, currentStateRef = currentStateRef, committedOffsetsRef = committedOffsetsRef diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index e50fa2645..c7d2e132c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -10,6 +10,8 @@ import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscrip import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ +import scala.jdk.CollectionConverters._ + private[internal] sealed trait RunloopState private[internal] object RunloopState { case object NotStarted extends RunloopState @@ -78,6 +80,7 @@ private[consumer] object RunloopAccess { ): ZIO[Scope, Throwable, RunloopAccess] = for { maxPollInterval <- maxPollIntervalConfig(settings) + maxStreamPullInterval = settings.maxRebalanceDuration.getOrElse(maxPollInterval) // See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]: maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos) // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer @@ -90,6 +93,7 @@ private[consumer] object RunloopAccess { makeRunloop = Runloop .make( settings = settings, + maxStreamPullInterval = maxStreamPullInterval, maxRebalanceDuration = maxRebalanceDuration, diagnostics = diagnostics, consumer = consumerAccess, @@ -101,16 +105,8 @@ private[consumer] object RunloopAccess { } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) private def maxPollIntervalConfig(settings: ConsumerSettings): Task[Duration] = ZIO.attempt { - def defaultMaxPollInterval: Int = ConsumerConfig - .configDef() - .defaultValues() - .get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) - .asInstanceOf[Integer] - - settings.properties - .get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) - .flatMap(_.toString.toIntOption) // Ignore invalid - .getOrElse(defaultMaxPollInterval) - .millis + val consumerConfig = new ConsumerConfig(settings.properties.asJava) + Long.unbox(consumerConfig.getLong(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)).millis } + } From cc403ec8c0fc009bc140d61b5ae01feeb505b52d Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 12 Nov 2024 20:36:47 +0100 Subject: [PATCH 06/11] Revert simpler way to get `maxPollInterval` --- .../kafka/consumer/internal/RunloopAccess.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index c7d2e132c..78f6e23ff 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -10,8 +10,6 @@ import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscrip import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ -import scala.jdk.CollectionConverters._ - private[internal] sealed trait RunloopState private[internal] object RunloopState { case object NotStarted extends RunloopState @@ -105,8 +103,16 @@ private[consumer] object RunloopAccess { } yield new RunloopAccess(runloopStateRef, partitionsHub, makeRunloop, diagnostics) private def maxPollIntervalConfig(settings: ConsumerSettings): Task[Duration] = ZIO.attempt { - val consumerConfig = new ConsumerConfig(settings.properties.asJava) - Long.unbox(consumerConfig.getLong(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)).millis - } + def defaultMaxPollInterval: Int = ConsumerConfig + .configDef() + .defaultValues() + .get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) + .asInstanceOf[Integer] + settings.properties + .get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) + .flatMap(_.toString.toIntOption) // Ignore invalid + .getOrElse(defaultMaxPollInterval) + .millis + } } From e6735c04f144f5e84e69580b15cc676b7b566492 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 12 Nov 2024 20:49:10 +0100 Subject: [PATCH 07/11] Fix getting maxStreamPullInterval, Finish log statement --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 12 +++++++++--- .../zio/kafka/consumer/internal/RunloopAccess.scala | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index db71ef59c..ef740dab0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -657,7 +657,7 @@ private[consumer] final class Runloop private ( pollResult.records ) updatedPendingCommits <- ZIO.filter(state.pendingCommits)(_.isPending) - _ <- checkStreamsHaveHalted(pollResult.assignedStreams) + _ <- checkStreamPullInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, pendingCommits = updatedPendingCommits, @@ -669,18 +669,24 @@ private[consumer] final class Runloop private ( * Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded * its pull interval, shutdown the consumer. */ - private def checkStreamsHaveHalted(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = + private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = { + def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] = + ZIO.logWarning( + s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down" + ) + for { now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream .maxStreamPullIntervalExceeded(now) - .tap(ZIO.when(_)(ZIO.logWarning(s"Stream for ${stream.tp} has exceeded "))) + .tap(ZIO.when(_)(logShutdown(stream))) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) } _ <- shutdown.when(anyExceeded) } yield () + } private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = { def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 78f6e23ff..90c787f84 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -78,7 +78,7 @@ private[consumer] object RunloopAccess { ): ZIO[Scope, Throwable, RunloopAccess] = for { maxPollInterval <- maxPollIntervalConfig(settings) - maxStreamPullInterval = settings.maxRebalanceDuration.getOrElse(maxPollInterval) + maxStreamPullInterval = settings.maxStreamPullIntervalOption.getOrElse(maxPollInterval) // See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]: maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos) // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer From dcb6dc0f00da7bbfdf4a79b9a8edf2341a30b1eb Mon Sep 17 00:00:00 2001 From: svroonland Date: Tue, 12 Nov 2024 20:53:38 +0100 Subject: [PATCH 08/11] Update zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala Co-authored-by: Erik van Oosten --- .../zio/kafka/consumer/internal/PartitionStreamControl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 84bf7bb78..cde72d9a6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -86,7 +86,7 @@ final class PartitionStreamControl private ( /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Unit] = { - val timeOutMessage = s"No records were polled for more than $maxStreamPullInterval for topic partition $tp. " + + val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp. " + "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + "needs more time." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace From fae6d50f0a10ed702f91e5dec1f171793b6ada5f Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Tue, 12 Nov 2024 20:58:36 +0100 Subject: [PATCH 09/11] Remove superfluous logging --- .../zio/kafka/consumer/internal/PartitionStreamControl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index cde72d9a6..7c18448bb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -90,7 +90,7 @@ final class PartitionStreamControl private ( "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + "needs more time." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace - ZIO.logWarning(timeOutMessage) *> interruptionPromise.fail(consumeTimeout).unit + interruptionPromise.fail(consumeTimeout).unit } /** To be invoked when the partition was lost. It clears the queue and ends the stream. */ From 0bc1aeb88c15078de5953f7ea655e4e79cfc2041 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 13 Nov 2024 09:41:34 +0100 Subject: [PATCH 10/11] Restore unnecessary change --- .../zio/kafka/consumer/ConsumerSpec.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index c9a646236..679a0d582 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -369,17 +369,13 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { clientId <- randomClient _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic1, partitions = 1)) _ <- ZIO.fromTry(EmbeddedKafka.createCustomTopic(topic2)) - settings <- - consumerSettings( - clientId = clientId, - groupId = Some(group), - maxPollInterval = 2.seconds, - `max.poll.records` = 2 - ).map { - _.withoutPartitionPreFetching - .withPollTimeout(100.millis) - .withMaxStreamPullInterval(2.seconds) - } + settings <- consumerSettings( + clientId = clientId, + groupId = Some(group), + maxPollInterval = 2.seconds, + `max.poll.records` = 2 + ) + .map(_.withoutPartitionPreFetching.withPollTimeout(100.millis)) consumer <- Consumer.make(settings) _ <- scheduledProduce(topic1, Schedule.fixed(500.millis).jittered).runDrain.forkScoped _ <- scheduledProduce(topic2, Schedule.fixed(500.millis).jittered).runDrain.forkScoped From dee9aaf82eac09e38bef50f774f44cc342c1bcdc Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 13 Nov 2024 09:48:25 +0100 Subject: [PATCH 11/11] Better logging and exception messages --- .../kafka/consumer/internal/PartitionStreamControl.scala | 4 +--- .../main/scala/zio/kafka/consumer/internal/Runloop.scala | 6 ++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 7c18448bb..53af64a5f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -86,9 +86,7 @@ final class PartitionStreamControl private ( /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Unit] = { - val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp. " + - "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + - "needs more time." + val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace interruptionPromise.fail(consumeTimeout).unit } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index ef740dab0..8d45f8e55 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -671,8 +671,10 @@ private[consumer] final class Runloop private ( */ private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = { def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] = - ZIO.logWarning( - s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down" + ZIO.logError( + s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down. " + + "Use ConsumerSettings.withMaxPollInterval or .withMaxStreamPullInterval to set a longer interval when " + + "processing a batch of records needs more time." ) for {