From d0678a61873e2af9fc850309372dde20839bdaab Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Wed, 5 Jul 2023 20:16:50 +0200 Subject: [PATCH 1/5] Add alternative fetch strategy for many partitions When many hundreds of partitions need to be consumed, an excessive amount of heap can be used for pre-fetching. The `ManyPartitionsQueueSizeBasedFetchStrategy` works similarly as the default `QueueSizeBasedFetchStrategy` but limits total memory usage. --- ...tionsQueueSizeBasedFetchStrategySpec.scala | 87 +++++++++++++++++++ ...artitionsQueueSizeBasedFetchStrategy.scala | 51 +++++++++++ 2 files changed, 138 insertions(+) create mode 100644 zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala new file mode 100644 index 000000000..5d612de26 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala @@ -0,0 +1,87 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.kafka.consumer.internal.PartitionStream +import zio.test.{ assertTrue, Spec, TestEnvironment } +import zio.{ Chunk, Scope, UIO, ZIO } + +object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j { + + private val maxPartitionQueueSize = 50 + private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy( + maxPartitionQueueSize, + maxTotalQueueSize = 80 + ) + + private val tp10 = new TopicPartition("topic1", 0) + private val tp11 = new TopicPartition("topic1", 1) + private val tp20 = new TopicPartition("topic2", 0) + private val tp21 = new TopicPartition("topic2", 1) + private val tp22 = new TopicPartition("topic2", 2) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")( + test("stream with queue size above maxSize is paused") { + val streams = Chunk(newStream(tp10, currentQueueSize = 100)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.isEmpty) + }, + test("stream with queue size below maxSize may resume when less-equal global max") { + val streams = Chunk(newStream(tp10, currentQueueSize = 10)) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10)) + }, + test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = maxPartitionQueueSize), + newStream(tp11, currentQueueSize = 10), + newStream(tp20, currentQueueSize = 10), + newStream(tp21, currentQueueSize = 10) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result == Set(tp10, tp11, tp20, tp21)) + }, + test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 40), + newStream(tp11, currentQueueSize = 40), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40) + ) + for { + result <- fetchStrategy.selectPartitionsToFetch(streams) + } yield assertTrue(result.size == 2) + }, + test("all streams with queue size less-equal maxSize may resume eventually") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 60), + newStream(tp11, currentQueueSize = 60), + newStream(tp20, currentQueueSize = 40), + newStream(tp21, currentQueueSize = 40), + newStream(tp22, currentQueueSize = 40) + ) + for { + result1 <- fetchStrategy.selectPartitionsToFetch(streams) + result2 <- fetchStrategy.selectPartitionsToFetch(streams) + result3 <- fetchStrategy.selectPartitionsToFetch(streams) + result4 <- fetchStrategy.selectPartitionsToFetch(streams) + result5 <- fetchStrategy.selectPartitionsToFetch(streams) + results = Chunk(result1, result2, result3, result4, result5) + } yield assertTrue( + results.forall(_.size == 2), + results.forall(_.forall(_.topic() == "topic2")), + results.flatten.toSet.size == 3 + ) + } + ) + + private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStream = + new PartitionStream { + override def tp: TopicPartition = topicPartition + override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala new file mode 100644 index 000000000..fc07653d1 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -0,0 +1,51 @@ +package zio.kafka.consumer.fetch + +import org.apache.kafka.common.TopicPartition +import zio.{ Chunk, ZIO } +import zio.kafka.consumer.internal.PartitionStream + +import scala.collection.mutable + +/** + * A fetch strategy that allows a stream to fetch data when its queue size is at or below `maxPartitionQueueSize`, as + * long as the total queue size is at or below `maxTotalQueueSize`. This strategy is suitable when + * [[QueueSizeBasedFetchStrategy]] requires too much heap space, particularly when a lot of partitions are being + * consumed. + * + * @param maxPartitionQueueSize + * Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying + * downstream message processing time, while maintaining some backpressure. Large values effectively disable + * backpressure at the cost of high memory usage, low values will effectively disable prefetching in favour of low + * memory consumption. The number of records that is fetched on every poll is controlled by the `max.poll.records` + * setting, the number of records fetched for every partition is somewhere between 0 and `max.poll.records`. + * + * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + * + * @param maxTotalQueueSize + * Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when + * consuming a large number of partitions. + * + * The default value is 20 * the default for `maxTotalQueueSize`, allowing approximately 20 partitions to do + * pre-fetching in each poll. + */ +final case class ManyPartitionsQueueSizeBasedFetchStrategy( + maxPartitionQueueSize: Int = 1024, + maxTotalQueueSize: Int = 20480 +) extends FetchStrategy { + override def selectPartitionsToFetch( + streams: Chunk[PartitionStream] + ): ZIO[Any, Nothing, Set[TopicPartition]] = { + // By shuffling the streams we prevent read-starvation for streams at the end of the list. + val shuffledStreams = scala.util.Random.shuffle(streams) + ZIO + .foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) { + case (acc @ (partitions, queueBudget), stream) => + stream.queueSize.map { queueSize => + if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) { + (partitions += stream.tp, queueBudget - queueSize) + } else acc + } + } + .map { case (tps, _) => tps.result().toSet } + } +} From acbd49c32493dd86c708be93177dc8183d733f7f Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 14 Jul 2024 11:57:42 +0200 Subject: [PATCH 2/5] Update docs --- docs/consumer-tuning.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/consumer-tuning.md b/docs/consumer-tuning.md index 3e34c521d..62c828835 100644 --- a/docs/consumer-tuning.md +++ b/docs/consumer-tuning.md @@ -57,7 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings. Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in -[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (not yet tested at scale, use at your own risk). Note that the fetch strategy API is marked as +[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as experimental and may change without notice in any future zio-kafka version. ## Long processing durations From 064c68834f7663f548a6d8b06f97198c8f7f9d83 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 15 Jul 2024 21:20:37 +0200 Subject: [PATCH 3/5] Use zio.random, more tests Also: fix typo --- ...tionsQueueSizeBasedFetchStrategySpec.scala | 34 ++++++++++++++- .../kafka/consumer/fetch/FetchStrategy.scala | 2 +- ...artitionsQueueSizeBasedFetchStrategy.scala | 41 ++++++++++--------- 3 files changed, 55 insertions(+), 22 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala index 5d612de26..e18a736c4 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategySpec.scala @@ -72,9 +72,39 @@ object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j result5 <- fetchStrategy.selectPartitionsToFetch(streams) results = Chunk(result1, result2, result3, result4, result5) } yield assertTrue( - results.forall(_.size == 2), + // Only partitions from topic 2 are selected (since 40 <= 50): results.forall(_.forall(_.topic() == "topic2")), - results.flatten.toSet.size == 3 + // 2 partitions are selected every time (since 2*40 <= 80): + results.forall(_.size == 2), + // All partitions from topic 2 are selected eventually: + results.flatten.toSet == Set(tp20, tp21, tp22) + ) + }, + test("different streams may resume every time") { + val streams = Chunk( + newStream(tp10, currentQueueSize = 25), + newStream(tp11, currentQueueSize = 25), + newStream(tp20, currentQueueSize = 25), + newStream(tp21, currentQueueSize = 25), + newStream(tp22, currentQueueSize = 25) + ) + for { + result1 <- fetchStrategy.selectPartitionsToFetch(streams) + result2 <- fetchStrategy.selectPartitionsToFetch(streams) + result3 <- fetchStrategy.selectPartitionsToFetch(streams) + result4 <- fetchStrategy.selectPartitionsToFetch(streams) + result5 <- fetchStrategy.selectPartitionsToFetch(streams) + results = Chunk(result1, result2, result3, result4, result5) + } yield assertTrue( + // All partitions are selected eventually (since 25 <= 50): + results.flatten.toSet.size == 5, + // 3 partitions are selected every time (since 3*25 <= 80): + results.forall(_.size == 3), + // In at least 3 different combinations: + results.combinations(2).count { + case Chunk(resultA, resultB) => resultA != resultB + case _ => false // can not happen + } >= 3 ) } ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala index 1b0f98d82..e8098fac0 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/FetchStrategy.scala @@ -32,7 +32,7 @@ trait FetchStrategy { * The queue size at or below which more records are fetched and buffered (per partition). This buffer improves * throughput and supports varying downstream message processing time, while maintaining some backpressure. Large * values effectively disable backpressure at the cost of high memory usage, low values will effectively disable - * prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled + * prefetching in favor of low memory consumption. The number of records that are fetched on every poll is controlled * by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and * `max.poll.records`. * diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala index fc07653d1..cd8e64955 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -14,13 +14,14 @@ import scala.collection.mutable * * @param maxPartitionQueueSize * Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying - * downstream message processing time, while maintaining some backpressure. Large values effectively disable - * backpressure at the cost of high memory usage, low values will effectively disable prefetching in favour of low - * memory consumption. The number of records that is fetched on every poll is controlled by the `max.poll.records` - * setting, the number of records fetched for every partition is somewhere between 0 and `max.poll.records`. + * downstream message processing time, while maintaining some backpressure. Low values effectively disable prefetching + * in favour of low memory consumption. Large values leave it up to `maxTotalQueueSize` parameter to backpressure only + * on total memory usage. * - * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + * The number of records that are fetched on every poll is controlled by the `max.poll.records` setting, the number of + * records fetched for every partition is somewhere between 0 and `max.poll.records`. * + * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. * @param maxTotalQueueSize * Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when * consuming a large number of partitions. @@ -34,18 +35,20 @@ final case class ManyPartitionsQueueSizeBasedFetchStrategy( ) extends FetchStrategy { override def selectPartitionsToFetch( streams: Chunk[PartitionStream] - ): ZIO[Any, Nothing, Set[TopicPartition]] = { - // By shuffling the streams we prevent read-starvation for streams at the end of the list. - val shuffledStreams = scala.util.Random.shuffle(streams) - ZIO - .foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) { - case (acc @ (partitions, queueBudget), stream) => - stream.queueSize.map { queueSize => - if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) { - (partitions += stream.tp, queueBudget - queueSize) - } else acc - } - } - .map { case (tps, _) => tps.result().toSet } - } + ): ZIO[Any, Nothing, Set[TopicPartition]] = + for { + // By shuffling the streams we prevent read-starvation for streams at the end of the list. + random <- ZIO.random + shuffledStreams <- random.shuffle(streams) + tps <- ZIO + .foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) { + case (acc @ (partitions, queueBudget), stream) => + stream.queueSize.map { queueSize => + if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) { + (partitions += stream.tp, queueBudget - queueSize) + } else acc + } + } + .map { case (tps, _) => tps.result().toSet } + } yield tps } From a5a157fa460f39995cbfb63eab26f69aead6daef Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Mon, 15 Jul 2024 21:40:00 +0200 Subject: [PATCH 4/5] Completed scaladoc --- .../ManyPartitionsQueueSizeBasedFetchStrategy.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala index cd8e64955..6bbd7572f 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -16,16 +16,22 @@ import scala.collection.mutable * Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying * downstream message processing time, while maintaining some backpressure. Low values effectively disable prefetching * in favour of low memory consumption. Large values leave it up to `maxTotalQueueSize` parameter to backpressure only - * on total memory usage. + * over the buffers of all partitions together. * * The number of records that are fetched on every poll is controlled by the `max.poll.records` setting, the number of * records fetched for every partition is somewhere between 0 and `max.poll.records`. * * The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2. + * * @param maxTotalQueueSize * Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when * consuming a large number of partitions. * + * When multiple streams are eligible for pre-fetching (because their queue size is below `maxPartitionQueueSize`), but + * together they exceed `maxTotalQueueSize`, then every call a random set of eligible streams is selected that stays + * below `maxTotalQueueSize`. The randomization ensures fairness and prevents read-starvation for streams at the end of + * the list. + * * The default value is 20 * the default for `maxTotalQueueSize`, allowing approximately 20 partitions to do * pre-fetching in each poll. */ @@ -37,7 +43,6 @@ final case class ManyPartitionsQueueSizeBasedFetchStrategy( streams: Chunk[PartitionStream] ): ZIO[Any, Nothing, Set[TopicPartition]] = for { - // By shuffling the streams we prevent read-starvation for streams at the end of the list. random <- ZIO.random shuffledStreams <- random.shuffle(streams) tps <- ZIO From bdbffd57498deafc81564bc57ae32f4dc1746280 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 16 Jul 2024 18:54:54 +0200 Subject: [PATCH 5/5] Fix scaladoc --- .../fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala index 6bbd7572f..2d2de3a2c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/fetch/ManyPartitionsQueueSizeBasedFetchStrategy.scala @@ -32,7 +32,7 @@ import scala.collection.mutable * below `maxTotalQueueSize`. The randomization ensures fairness and prevents read-starvation for streams at the end of * the list. * - * The default value is 20 * the default for `maxTotalQueueSize`, allowing approximately 20 partitions to do + * The default value is 20 * the default for `maxPartitionQueueSize`, allowing approximately 20 partitions to do * pre-fetching in each poll. */ final case class ManyPartitionsQueueSizeBasedFetchStrategy(