From 108b285199c0f3b16f2b28f54d6906dcadc0f8b2 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 16 Jul 2024 19:15:59 +0200 Subject: [PATCH] Add alternative fetch strategy for many partitions (#1281) When many hundreds of partitions need to be consumed, an excessive amount of heap can be used for pre-fetching. The `ManyPartitionsQueueSizeBasedFetchStrategy` works similarly as the default `QueueSizeBasedFetchStrategy` but limits total memory usage. --- docs/consumer-tuning.md | 2 +- ...tionsQueueSizeBasedFetchStrategySpec.scala | 117 ++++++++++++++++++ .../kafka/consumer/fetch/FetchStrategy.scala | 2 +- ...artitionsQueueSizeBasedFetchStrategy.scala | 59 +++++++++ 4 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala diff --git a/docs/consumer-tuning.md b/docs/consumer-tuning.md index 5282ee543..625e5c336 100644 --- a/docs/consumer-tuning.md +++ b/docs/consumer-tuning.md @@ -57,7 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings. Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in -[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (not yet tested at scale, use at your own risk). Note that the fetch strategy API is marked as +[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as experimental and may change without notice in any future zio-kafka version. ## Long processing durations diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala new file mode 100644 index 000000000..e18a736c4 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala @@ -0,0 +1,117 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.internal.PartitionStream +import zio.test.{ assertTrue, Spec, TestEnvironment } +import zio.{ Chunk, Scope, UIO, ZIO } + +object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j { + + private val maxPartitionQueueSize = 50 + private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy( + maxPartitionQueueSize, + maxTotalQueueSize = 80 + ) + + private val tp10 = new TopicPartition("topic1", 0) + private val tp11 = new TopicPartition("topic1", 1) + private val tp20 = new TopicPartition("topic2", 0) + private val tp21 = new TopicPartition("topic2", 1) + private val tp22 = new TopicPartition("topic2", 2) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")( + test("stream with queue size above maxSize is paused") { + val streams = Chunk(newStream(tp10, currentQueueSize = 100)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.isEmpty) + }, + test("stream with queue size below maxSize may resume when less-equal global max") { + val streams = Chunk(newStream(tp10, currentQueueSize = 10)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10)) + }, + test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = maxPartitionQueueSize), + newStream(tp11, currentQueueSize = 10), + newStream(tp20, currentQueueSize = 10), + newStream(tp21, currentQueueSize = 10) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10, tp11, tp20, tp21)) + }, + test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 40), + newStream(tp11, currentQueueSize = 40), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.size == 2) + }, + test("all streams with queue size less-equal maxSize may resume eventually") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 60), + newStream(tp11, currentQueueSize = 60), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40), + newStream(tp22, currentQueueSize = 40) + ) + for { + result1 <- fetchStrategy.selectPartitionsToFetch(streams) + result2 <- fetchStrategy.selectPartitionsToFetch(streams) + result3 <- fetchStrategy.selectPartitionsToFetch(streams) + result4 <- fetchStrategy.selectPartitionsToFetch(streams) + result5 <- fetchStrategy.selectPartitionsToFetch(streams) + results = Chunk(result1, result2, result3, result4, result5) + } yield assertTrue( + // Only partitions from topic 2 are selected (since 40 <= 50): + results.forall(_.forall(_.topic() == "topic2")), + // 2 partitions are selected every time (since 2*40 <= 80): + results.forall(_.size == 2), + // All partitions from topic 2 are selected eventually: + results.flatten.toSet == Set(tp20, tp21, tp22) + ) + }, + test("different streams may resume every time") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 25), + newStream(tp11, currentQueueSize = 25), + newStream(tp20, currentQueueSize = 25), + newStream(tp21, currentQueueSize = 25), + newStream(tp22, currentQueueSize = 25) + ) + for { + result1 <- fetchStrategy.selectPartitionsToFetch(streams) + result2 <- fetchStrategy.selectPartitionsToFetch(streams) + result3 <- fetchStrategy.selectPartitionsToFetch(streams) + result4 <- fetchStrategy.selectPartitionsToFetch(streams) + result5 <- fetchStrategy.selectPartitionsToFetch(streams) + results = Chunk(result1, result2, result3, result4, result5) + } yield assertTrue( + // All partitions are selected eventually (since 25 <= 50): + results.flatten.toSet.size == 5, + // 3 partitions are selected every time (since 3*25 <= 80): + results.forall(_.size == 3), + // In at least 3 different combinations: + results.combinations(2).count { + case Chunk(resultA, resultB) => resultA != resultB + case _ => false // can not happen + } >= 3 + ) + } + ) + + private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStream = + new PartitionStream { + override def tp: TopicPartition = topicPartition + override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index 1b0f98d82..e8098fac0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -32,7 +32,7 @@ trait FetchStrategy { * The queue size at or below which more records are fetched and buffered (per partition). This buffer improves * throughput and supports varying downstream message processing time, while maintaining some backpressure. Large * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable - * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled + * prefetching in favor of low memory consumption. The number of records that are fetched on every poll is controlled * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and * `max.poll.records`. * diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala new file mode 100644 index 000000000..2d2de3a2c --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -0,0 +1,59 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.{ Chunk, ZIO } +import zio.kafka.consumer.internal.PartitionStream + +import scala.collection.mutable + +/** + * A fetch strategy that allows a stream to fetch data when its queue size is at or below `maxPartitionQueueSize`, as + * long as the total queue size is at or below `maxTotalQueueSize`. This strategy is suitable when + * [[QueueSizeBasedFetchStrategy]] requires too much heap space, particularly when a lot of partitions are being + * consumed. + * + * @param maxPartitionQueueSize + * Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying + * downstream message processing time, while maintaining some backpressure. Low values effectively disable prefetching + * in favour of low memory consumption. Large values leave it up to `maxTotalQueueSize` parameter to backpressure only + * over the buffers of all partitions together. + * + * The number of records that are fetched on every poll is controlled by the `max.poll.records` setting, the number of + * records fetched for every partition is somewhere between 0 and `max.poll.records`. + * + * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + * + * @param maxTotalQueueSize + * Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when + * consuming a large number of partitions. + * + * When multiple streams are eligible for pre-fetching (because their queue size is below `maxPartitionQueueSize`), but + * together they exceed `maxTotalQueueSize`, then every call a random set of eligible streams is selected that stays + * below `maxTotalQueueSize`. The randomization ensures fairness and prevents read-starvation for streams at the end of + * the list. + * + * The default value is 20 * the default for `maxPartitionQueueSize`, allowing approximately 20 partitions to do + * pre-fetching in each poll. + */ +final case class ManyPartitionsQueueSizeBasedFetchStrategy( + maxPartitionQueueSize: Int = 1024, + maxTotalQueueSize: Int = 20480 +) extends FetchStrategy { + override def selectPartitionsToFetch( + streams: Chunk[PartitionStream] + ): ZIO[Any, Nothing, Set[TopicPartition]] = + for { + random <- ZIO.random + shuffledStreams <- random.shuffle(streams) + tps <- ZIO + .foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) { + case (acc @ (partitions, queueBudget), stream) => + stream.queueSize.map { queueSize => + if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) { + (partitions += stream.tp, queueBudget - queueSize) + } else acc + } + } + .map { case (tps, _) => tps.result().toSet } + } yield tps +}