Skip to content

Commit

Permalink
Add alternative fetch strategy for many partitions (#1281)
Browse files Browse the repository at this point in the history
When many hundreds of partitions need to be consumed, an excessive
amount of heap can be used for pre-fetching. The
`ManyPartitionsQueueSizeBasedFetchStrategy` works similarly as the
default `QueueSizeBasedFetchStrategy` but limits total memory usage.
  • Loading branch information
erikvanoosten authored and svroonland committed Aug 10, 2024
1 parent 27f033e commit 108b285
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/consumer-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede
The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings.

Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (not yet tested at scale, use at your own risk). Note that the fetch strategy API is marked as
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as
experimental and may change without notice in any future zio-kafka version.

## Long processing durations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.internal.PartitionStream
import zio.test.{ assertTrue, Spec, TestEnvironment }
import zio.{ Chunk, Scope, UIO, ZIO }

object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {

private val maxPartitionQueueSize = 50
private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy(
maxPartitionQueueSize,
maxTotalQueueSize = 80
)

private val tp10 = new TopicPartition("topic1", 0)
private val tp11 = new TopicPartition("topic1", 1)
private val tp20 = new TopicPartition("topic2", 0)
private val tp21 = new TopicPartition("topic2", 1)
private val tp22 = new TopicPartition("topic2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")(
test("stream with queue size above maxSize is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size below maxSize may resume when less-equal global max") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
},
test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = maxPartitionQueueSize),
newStream(tp11, currentQueueSize = 10),
newStream(tp20, currentQueueSize = 10),
newStream(tp21, currentQueueSize = 10)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10, tp11, tp20, tp21))
},
test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 40),
newStream(tp11, currentQueueSize = 40),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.size == 2)
},
test("all streams with queue size less-equal maxSize may resume eventually") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 60),
newStream(tp11, currentQueueSize = 60),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40),
newStream(tp22, currentQueueSize = 40)
)
for {
result1 <- fetchStrategy.selectPartitionsToFetch(streams)
result2 <- fetchStrategy.selectPartitionsToFetch(streams)
result3 <- fetchStrategy.selectPartitionsToFetch(streams)
result4 <- fetchStrategy.selectPartitionsToFetch(streams)
result5 <- fetchStrategy.selectPartitionsToFetch(streams)
results = Chunk(result1, result2, result3, result4, result5)
} yield assertTrue(
// Only partitions from topic 2 are selected (since 40 <= 50):
results.forall(_.forall(_.topic() == "topic2")),
// 2 partitions are selected every time (since 2*40 <= 80):
results.forall(_.size == 2),
// All partitions from topic 2 are selected eventually:
results.flatten.toSet == Set(tp20, tp21, tp22)
)
},
test("different streams may resume every time") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 25),
newStream(tp11, currentQueueSize = 25),
newStream(tp20, currentQueueSize = 25),
newStream(tp21, currentQueueSize = 25),
newStream(tp22, currentQueueSize = 25)
)
for {
result1 <- fetchStrategy.selectPartitionsToFetch(streams)
result2 <- fetchStrategy.selectPartitionsToFetch(streams)
result3 <- fetchStrategy.selectPartitionsToFetch(streams)
result4 <- fetchStrategy.selectPartitionsToFetch(streams)
result5 <- fetchStrategy.selectPartitionsToFetch(streams)
results = Chunk(result1, result2, result3, result4, result5)
} yield assertTrue(
// All partitions are selected eventually (since 25 <= 50):
results.flatten.toSet.size == 5,
// 3 partitions are selected every time (since 3*25 <= 80):
results.forall(_.size == 3),
// In at least 3 different combinations:
results.combinations(2).count {
case Chunk(resultA, resultB) => resultA != resultB
case _ => false // can not happen
} >= 3
)
}
)

private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStream =
new PartitionStream {
override def tp: TopicPartition = topicPartition
override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait FetchStrategy {
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* prefetching in favor of low memory consumption. The number of records that are fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.{ Chunk, ZIO }
import zio.kafka.consumer.internal.PartitionStream

import scala.collection.mutable

/**
* A fetch strategy that allows a stream to fetch data when its queue size is at or below `maxPartitionQueueSize`, as
* long as the total queue size is at or below `maxTotalQueueSize`. This strategy is suitable when
* [[QueueSizeBasedFetchStrategy]] requires too much heap space, particularly when a lot of partitions are being
* consumed.
*
* @param maxPartitionQueueSize
* Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying
* downstream message processing time, while maintaining some backpressure. Low values effectively disable prefetching
* in favour of low memory consumption. Large values leave it up to `maxTotalQueueSize` parameter to backpressure only
* over the buffers of all partitions together.
*
* The number of records that are fetched on every poll is controlled by the `max.poll.records` setting, the number of
* records fetched for every partition is somewhere between 0 and `max.poll.records`.
*
* The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
*
* @param maxTotalQueueSize
* Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when
* consuming a large number of partitions.
*
* When multiple streams are eligible for pre-fetching (because their queue size is below `maxPartitionQueueSize`), but
* together they exceed `maxTotalQueueSize`, then every call a random set of eligible streams is selected that stays
* below `maxTotalQueueSize`. The randomization ensures fairness and prevents read-starvation for streams at the end of
* the list.
*
* The default value is 20 * the default for `maxPartitionQueueSize`, allowing approximately 20 partitions to do
* pre-fetching in each poll.
*/
final case class ManyPartitionsQueueSizeBasedFetchStrategy(
maxPartitionQueueSize: Int = 1024,
maxTotalQueueSize: Int = 20480
) extends FetchStrategy {
override def selectPartitionsToFetch(
streams: Chunk[PartitionStream]
): ZIO[Any, Nothing, Set[TopicPartition]] =
for {
random <- ZIO.random
shuffledStreams <- random.shuffle(streams)
tps <- ZIO
.foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) {
case (acc @ (partitions, queueBudget), stream) =>
stream.queueSize.map { queueSize =>
if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) {
(partitions += stream.tp, queueBudget - queueSize)
} else acc
}
}
.map { case (tps, _) => tps.result().toSet }
} yield tps
}

0 comments on commit 108b285

Please sign in to comment.