Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alternative fetch strategy for many partitions #1281

Merged
merged 7 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/consumer-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ the partition queues. A very rough estimate for the maximum amount of heap neede
The total can be tuned by changing the `partitionPreFetchBufferLimit`, `max.poll.records` settings.

Another option is to write a custom `FetchStrategy`. For example the `ManyPartitionsQueueSizeBasedFetchStrategy` in
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (not yet tested at scale, use at your own risk). Note that the fetch strategy API is marked as
[draft PR 970](https://github.com/zio/zio-kafka/pull/970) (merged into zio-kafka since 2.8.1). Note that the fetch strategy API is marked as
experimental and may change without notice in any future zio-kafka version.

## Long processing durations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.internal.PartitionStream
import zio.test.{ assertTrue, Spec, TestEnvironment }
import zio.{ Chunk, Scope, UIO, ZIO }

object ManyPartitionsQueueSizeBasedFetchStrategySpec extends ZIOSpecDefaultSlf4j {

private val maxPartitionQueueSize = 50
private val fetchStrategy = ManyPartitionsQueueSizeBasedFetchStrategy(
maxPartitionQueueSize,
maxTotalQueueSize = 80
)

private val tp10 = new TopicPartition("topic1", 0)
private val tp11 = new TopicPartition("topic1", 1)
private val tp20 = new TopicPartition("topic2", 0)
private val tp21 = new TopicPartition("topic2", 1)
private val tp22 = new TopicPartition("topic2", 2)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ManyPartitionsQueueSizeBasedFetchStrategySpec")(
test("stream with queue size above maxSize is paused") {
val streams = Chunk(newStream(tp10, currentQueueSize = 100))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.isEmpty)
},
test("stream with queue size below maxSize may resume when less-equal global max") {
val streams = Chunk(newStream(tp10, currentQueueSize = 10))
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10))
},
test("all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = maxPartitionQueueSize),
newStream(tp11, currentQueueSize = 10),
newStream(tp20, currentQueueSize = 10),
newStream(tp21, currentQueueSize = 10)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result == Set(tp10, tp11, tp20, tp21))
},
test("not all streams with queue size less-equal maxSize may resume when total is less-equal global max") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 40),
newStream(tp11, currentQueueSize = 40),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40)
)
for {
result <- fetchStrategy.selectPartitionsToFetch(streams)
} yield assertTrue(result.size == 2)
},
test("all streams with queue size less-equal maxSize may resume eventually") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 60),
newStream(tp11, currentQueueSize = 60),
newStream(tp20, currentQueueSize = 40),
newStream(tp21, currentQueueSize = 40),
newStream(tp22, currentQueueSize = 40)
)
for {
result1 <- fetchStrategy.selectPartitionsToFetch(streams)
result2 <- fetchStrategy.selectPartitionsToFetch(streams)
result3 <- fetchStrategy.selectPartitionsToFetch(streams)
result4 <- fetchStrategy.selectPartitionsToFetch(streams)
result5 <- fetchStrategy.selectPartitionsToFetch(streams)
results = Chunk(result1, result2, result3, result4, result5)
} yield assertTrue(
// Only partitions from topic 2 are selected (since 40 <= 50):
results.forall(_.forall(_.topic() == "topic2")),
// 2 partitions are selected every time (since 2*40 <= 80):
results.forall(_.size == 2),
// All partitions from topic 2 are selected eventually:
results.flatten.toSet == Set(tp20, tp21, tp22)
)
},
test("different streams may resume every time") {
val streams = Chunk(
newStream(tp10, currentQueueSize = 25),
newStream(tp11, currentQueueSize = 25),
newStream(tp20, currentQueueSize = 25),
newStream(tp21, currentQueueSize = 25),
newStream(tp22, currentQueueSize = 25)
)
for {
result1 <- fetchStrategy.selectPartitionsToFetch(streams)
result2 <- fetchStrategy.selectPartitionsToFetch(streams)
result3 <- fetchStrategy.selectPartitionsToFetch(streams)
result4 <- fetchStrategy.selectPartitionsToFetch(streams)
result5 <- fetchStrategy.selectPartitionsToFetch(streams)
results = Chunk(result1, result2, result3, result4, result5)
} yield assertTrue(
// All partitions are selected eventually (since 25 <= 50):
results.flatten.toSet.size == 5,
// 3 partitions are selected every time (since 3*25 <= 80):
results.forall(_.size == 3),
// In at least 3 different combinations:
results.combinations(2).count {
case Chunk(resultA, resultB) => resultA != resultB
case _ => false // can not happen
} >= 3
)
}
)

private def newStream(topicPartition: TopicPartition, currentQueueSize: Int): PartitionStream =
new PartitionStream {
override def tp: TopicPartition = topicPartition
override def queueSize: UIO[Int] = ZIO.succeed(currentQueueSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait FetchStrategy {
* The queue size at or below which more records are fetched and buffered (per partition). This buffer improves
* throughput and supports varying downstream message processing time, while maintaining some backpressure. Large
* values effectively disable backpressure at the cost of high memory usage, low values will effectively disable
* prefetching in favor of low memory consumption. The number of records that is fetched on every poll is controlled
* prefetching in favor of low memory consumption. The number of records that are fetched on every poll is controlled
* by the `max.poll.records` setting, the number of records fetched for every partition is somewhere between 0 and
* `max.poll.records`.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package zio.kafka.consumer.fetch

import org.apache.kafka.common.TopicPartition
import zio.{ Chunk, ZIO }
import zio.kafka.consumer.internal.PartitionStream

import scala.collection.mutable

/**
* A fetch strategy that allows a stream to fetch data when its queue size is at or below `maxPartitionQueueSize`, as
* long as the total queue size is at or below `maxTotalQueueSize`. This strategy is suitable when
* [[QueueSizeBasedFetchStrategy]] requires too much heap space, particularly when a lot of partitions are being
* consumed.
*
* @param maxPartitionQueueSize
* Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying
* downstream message processing time, while maintaining some backpressure. Low values effectively disable prefetching
* in favour of low memory consumption. Large values leave it up to `maxTotalQueueSize` parameter to backpressure only
* over the buffers of all partitions together.
*
* The number of records that are fetched on every poll is controlled by the `max.poll.records` setting, the number of
* records fetched for every partition is somewhere between 0 and `max.poll.records`.
*
* The default value for this parameter is 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
*
* @param maxTotalQueueSize
* Maximum number of records to be buffered over all partitions together. This can be used to limit memory usage when
* consuming a large number of partitions.
*
* When multiple streams are eligible for pre-fetching (because their queue size is below `maxPartitionQueueSize`), but
* together they exceed `maxTotalQueueSize`, then every call a random set of eligible streams is selected that stays
* below `maxTotalQueueSize`. The randomization ensures fairness and prevents read-starvation for streams at the end of
* the list.
*
* The default value is 20 * the default for `maxPartitionQueueSize`, allowing approximately 20 partitions to do
* pre-fetching in each poll.
*/
final case class ManyPartitionsQueueSizeBasedFetchStrategy(
maxPartitionQueueSize: Int = 1024,
maxTotalQueueSize: Int = 20480
) extends FetchStrategy {
override def selectPartitionsToFetch(
streams: Chunk[PartitionStream]
): ZIO[Any, Nothing, Set[TopicPartition]] =
for {
random <- ZIO.random
shuffledStreams <- random.shuffle(streams)
tps <- ZIO
.foldLeft(shuffledStreams)((mutable.ArrayBuilder.make[TopicPartition], maxTotalQueueSize)) {
case (acc @ (partitions, queueBudget), stream) =>
stream.queueSize.map { queueSize =>
if (queueSize <= maxPartitionQueueSize && queueSize <= queueBudget) {
(partitions += stream.tp, queueBudget - queueSize)
} else acc
}
}
.map { case (tps, _) => tps.result().toSet }
} yield tps
}
Loading