diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala index 887b8bf1c..9fcc3a979 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/PartitionStreamControlSpec.scala @@ -114,7 +114,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { for { control <- createTestControl now <- Clock.nanoTime - exceeded <- control.maxPollIntervalExceeded(now) + exceeded <- control.maxStreamPullIntervalExceeded(now) } yield assertTrue(!exceeded) }, test("maxPollIntervalExceeded returns true after timeout") { @@ -123,7 +123,7 @@ object PartitionStreamControlSpec extends ZIOSpecDefault { _ <- control.offerRecords(createTestRecords(1)) now <- Clock.nanoTime futureTime = now + Duration.fromSeconds(31).toNanos - exceeded <- control.maxPollIntervalExceeded(futureTime) + exceeded <- control.maxStreamPullIntervalExceeded(futureTime) } yield assertTrue(exceeded) } ), diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala index 43c42a3ce..add83ecd5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RebalanceCoordinatorSpec.scala @@ -9,7 +9,7 @@ import zio.kafka.consumer.internal.RebalanceCoordinator.RebalanceEvent import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings } import zio.test._ -import zio.{ durationInt, Chunk, Promise, Ref, Scope, Task, UIO, ZIO } +import zio.{ durationInt, Chunk, Promise, Ref, Scope, Semaphore, Task, UIO, ZIO } import java.util @@ -185,7 +185,7 @@ object RebalanceCoordinatorSpec extends ZIOSpecDefaultSlf4j { settings: ConsumerSettings = ConsumerSettings(List("")).withCommitTimeout(1.second), rebalanceSafeCommits: Boolean = false ): ZIO[Scope, Throwable, RebalanceCoordinator] = - ConsumerAccess.make(mockConsumer).map { consumerAccess => + Semaphore.make(1).map(new ConsumerAccess(mockConsumer, _)).map { consumerAccess => new RebalanceCoordinator( lastEvent, settings.withRebalanceSafeCommits(rebalanceSafeCommits), diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index def5d8c63..9d7ab0a22 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -221,7 +221,9 @@ object Consumer { * - creating `access` as a fair semaphore with a single permit, * - acquire a permit from `access` before using the consumer, and release if afterwards, * - not using the following consumer methods: `subscribe`, `unsubscribe`, `assign`, `poll`, `commit*`, `seek`, - * `pause`, `resume`, and `enforceRebalance`. + * `pause`, `resume`, and `enforceRebalance`, + * - keeping the consumer config given to the java consumer in sync with the properties in `settings` (for example + * by constructing `settings` with `ConsumerSettings(bootstrapServers).withProperties(config)`). * * Any deviation of these rules is likely to cause hard to track errors. * diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 7e5fc01b5..e5e63771f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -31,7 +31,8 @@ final case class ConsumerSettings( fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy(), metricLabels: Set[MetricLabel] = Set.empty, runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), - authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) + authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis), + maxStreamPullIntervalOption: Option[Duration] = None ) { // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: require( @@ -153,21 +154,34 @@ final case class ConsumerSettings( * Set Kafka's `max.poll.interval.ms` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms for more information. * - * Zio-kafka uses this value also to determine whether a stream stopped processing. If no chunks are pulled from a - * stream for this interval (while data is available) we consider the stream to be halted. When this happens we - * interrupt the stream with a failure. In addition the entire consumer is shutdown. In future versions of zio-kafka - * we may (instead of a shutdown) stop only the affected subscription. - * - * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. The - * maximum number of records in a single poll is configured with the `max.poll.records` configuration (see - * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records and [[withMaxPollRecords]]). + * The default is 5 minutes. Make sure that all records from a single poll can be processed in this interval. See also + * the [[withMaxPollRecords maxPollRecords]] configuration. */ def withMaxPollInterval(maxPollInterval: Duration): ConsumerSettings = withProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval.toMillis.toString) + /** + * The maximum time a stream may run without pulling a chunk of records. + * + * Zio-kafka uses this value to determine whether a stream stopped processing. This is to safeguard against alive + * consumers in the consumer group which hold partition assignments but make no progress. If no chunks are pulled by + * user code from a partition stream for this interval (while data is available) we consider the stream to be halted. + * When this happens we interrupt the stream with a failure. In addition, the entire consumer is shutdown. In future + * versions of zio-kafka we may (instead of a shutdown) stop only the affected subscription. + * + * Make sure that all records from a single poll (see [[withMaxPollRecords maxPollRecords]]) can be processed in this + * interval, even when there is no concurrency because the records are all in the same partition. + * + * The default is equal to [[withMaxPollInterval maxPollInterval]]). + */ + def withMaxStreamPullInterval(maxStreamPullInterval: Duration): ConsumerSettings = + copy(maxStreamPullIntervalOption = Some(maxStreamPullInterval)) + /** * Set Kafka's `max.poll.records` configuration. See * https://kafka.apache.org/documentation/#consumerconfigs_max.poll.records for more information. + * + * The default is 500. */ def withMaxPollRecords(maxPollRecords: Int): ConsumerSettings = withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index bc1d68454..53af64a5f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -31,8 +31,8 @@ abstract class PartitionStream { * the last pulled offset (if any). The promise completes when the stream completed. * @param queueInfoRef * used to track the stream's pull deadline, its queue size, and last pulled offset - * @param maxPollInterval - * see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]] + * @param maxStreamPullInterval + * see [[zio.kafka.consumer.ConsumerSettings.withMaxStreamPullInterval()]] */ final class PartitionStreamControl private ( val tp: TopicPartition, @@ -41,9 +41,9 @@ final class PartitionStreamControl private ( interruptionPromise: Promise[Throwable, Nothing], val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], - maxPollInterval: Duration + maxStreamPullInterval: Duration ) extends PartitionStream { - private val maxPollIntervalNanos = maxPollInterval.toNanos + private val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos private val logAnnotate = ZIO.logAnnotate( LogAnnotation("topic", tp.topic()), @@ -57,7 +57,7 @@ final class PartitionStreamControl private ( } else { for { now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos + newPullDeadline = now + maxStreamPullIntervalNanos _ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size)) _ <- dataQueue.offer(Take.chunk(data)) } yield () @@ -81,14 +81,12 @@ final class PartitionStreamControl private ( * `true` when the stream has data available, but none has been pulled for more than `maxPollInterval` (since data * became available), `false` otherwise */ - private[internal] def maxPollIntervalExceeded(now: NanoTime): UIO[Boolean] = + private[internal] def maxStreamPullIntervalExceeded(now: NanoTime): UIO[Boolean] = queueInfoRef.get.map(_.deadlineExceeded(now)) /** To be invoked when the stream is no longer processing. */ private[internal] def halt: UIO[Unit] = { - val timeOutMessage = s"No records were polled for more than $maxPollInterval for topic partition $tp. " + - "Use ConsumerSettings.withMaxPollInterval to set a longer interval if processing a batch of records " + - "needs more time." + val timeOutMessage = s"No records were pulled for more than $maxStreamPullInterval for topic partition $tp." val consumeTimeout = new TimeoutException(timeOutMessage) with NoStackTrace interruptionPromise.fail(consumeTimeout).unit } @@ -130,14 +128,14 @@ object PartitionStreamControl { tp: TopicPartition, requestData: UIO[Unit], diagnostics: Diagnostics, - maxPollInterval: Duration + maxStreamPullInterval: Duration ): UIO[PartitionStreamControl] = { - val maxPollIntervalNanos = maxPollInterval.toNanos + val maxStreamPullIntervalNanos = maxStreamPullInterval.toNanos def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] = for { now <- Clock.nanoTime - newPullDeadline = now + maxPollIntervalNanos + newPullDeadline = now + maxStreamPullIntervalNanos _ <- queueInfo.update(_.withPull(newPullDeadline, records)) } yield () @@ -184,7 +182,7 @@ object PartitionStreamControl { interruptionPromise, completedPromise, queueInfo, - maxPollInterval + maxStreamPullInterval ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 2c4ecab17..192d1b651 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,10 +22,10 @@ private[consumer] final class Runloop private ( topLevelExecutor: Executor, sameThreadRuntime: Runtime[Any], consumer: ConsumerAccess, - maxPollInterval: Duration, commandQueue: Queue[RunloopCommand], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], diagnostics: Diagnostics, + maxStreamPullInterval: Duration, currentStateRef: Ref[State], rebalanceCoordinator: RebalanceCoordinator, consumerMetrics: ConsumerMetrics, @@ -36,7 +36,7 @@ private[consumer] final class Runloop private ( tp, commandQueue.offer(RunloopCommand.Request(tp)).unit, diagnostics, - maxPollInterval + maxStreamPullInterval ) def stopConsumption: UIO[Unit] = @@ -329,7 +329,7 @@ private[consumer] final class Runloop private ( pollResult.records ) _ <- committer.updatePendingCommitsAfterPoll - _ <- checkStreamPollInterval(pollResult.assignedStreams) + _ <- checkStreamPullInterval(pollResult.assignedStreams) } yield state.copy( pendingRequests = fulfillResult.pendingRequests, assignedStreams = pollResult.assignedStreams @@ -337,20 +337,29 @@ private[consumer] final class Runloop private ( } /** - * Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded - * its poll interval, shutdown the consumer. + * Check each stream to see if it exceeded its pull interval. If so, halt it. In addition, if any stream has exceeded + * its pull interval, shutdown the consumer. */ - private def checkStreamPollInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = + private def checkStreamPullInterval(streams: Chunk[PartitionStreamControl]): ZIO[Any, Nothing, Unit] = { + def logShutdown(stream: PartitionStreamControl): ZIO[Any, Nothing, Unit] = + ZIO.logError( + s"Stream for ${stream.tp} has not pulled chunks for more than $maxStreamPullInterval, shutting down. " + + "Use ConsumerSettings.withMaxPollInterval or .withMaxStreamPullInterval to set a longer interval when " + + "processing a batch of records needs more time." + ) + for { now <- Clock.nanoTime anyExceeded <- ZIO.foldLeft(streams)(false) { case (acc, stream) => stream - .maxPollIntervalExceeded(now) + .maxStreamPullIntervalExceeded(now) + .tap(ZIO.when(_)(logShutdown(stream))) .tap(exceeded => if (exceeded) stream.halt else ZIO.unit) .map(acc || _) } _ <- shutdown.when(anyExceeded) } yield () + } private def handleCommand(state: State, cmd: RunloopCommand.StreamCommand): Task[State] = { def doChangeSubscription(newSubscriptionState: SubscriptionState): Task[State] = @@ -564,7 +573,7 @@ object Runloop { private[consumer] def make( settings: ConsumerSettings, - maxPollInterval: Duration, + maxStreamPullInterval: Duration, maxRebalanceDuration: Duration, diagnostics: Diagnostics, consumer: ConsumerAccess, @@ -600,10 +609,10 @@ object Runloop { topLevelExecutor = executor, sameThreadRuntime = sameThreadRuntime, consumer = consumer, - maxPollInterval = maxPollInterval, commandQueue = commandQueue, partitionsHub = partitionsHub, diagnostics = diagnostics, + maxStreamPullInterval = maxStreamPullInterval, currentStateRef = currentStateRef, consumerMetrics = metrics, rebalanceCoordinator = rebalanceCoordinator, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6c90d2d1c..90c787f84 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -78,6 +78,7 @@ private[consumer] object RunloopAccess { ): ZIO[Scope, Throwable, RunloopAccess] = for { maxPollInterval <- maxPollIntervalConfig(settings) + maxStreamPullInterval = settings.maxStreamPullIntervalOption.getOrElse(maxPollInterval) // See scaladoc of [[ConsumerSettings.withMaxRebalanceDuration]]: maxRebalanceDuration = settings.maxRebalanceDuration.getOrElse(((maxPollInterval.toNanos / 5L) * 3L).nanos) // This scope allows us to link the lifecycle of the Runloop and of the Hub to the lifecycle of the Consumer @@ -90,7 +91,7 @@ private[consumer] object RunloopAccess { makeRunloop = Runloop .make( settings = settings, - maxPollInterval = maxPollInterval, + maxStreamPullInterval = maxStreamPullInterval, maxRebalanceDuration = maxRebalanceDuration, diagnostics = diagnostics, consumer = consumerAccess,