Skip to content

Commit

Permalink
Add metric for the number of polls records are idling in the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Dec 31, 2023
1 parent 3ad1fda commit 15a0326
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
private val streamSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil))

private val pollSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk[Double](0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

private val pendingRequestsHistogram =
Metric
.histogram(
Expand Down Expand Up @@ -41,6 +44,15 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
)
.tagged(metricLabels)

private val queuePollsHistogram =
Metric
.histogram(
"ziokafka_consumer_queue_polls",
"The number of polls records are idling in the queue for a partition.",
pollSizeBoundaries
)
.tagged(metricLabels)

private val allQueueSizeHistogram =
Metric
.histogram(
Expand All @@ -54,6 +66,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
ZIO
.when(state.subscriptionState.isSubscribed) {
for {
_ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram)
queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize.map(_.toDouble))
_ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs))
_ <- allQueueSizeHistogram.update(queueSizes.sum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,28 @@ final class PartitionStreamControl private (
LogAnnotation("partition", tp.partition().toString)
)

/** Offer new data for the stream to process. */
/** Offer new data for the stream to process. Should be called on every poll, also when `data.isEmpty` */
private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
if (data.isEmpty) {
queueInfoRef.update(_.withEmptyPoll)
} else {
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfoRef.update(_.withOffer(newPullDeadline, data.size))
_ <- dataQueue.offer(Take.chunk(data))
} yield ()
}

def queueSize: UIO[Int] = queueInfoRef.get.map(_.size)

/**
* @return
* the number of polls there are records idling in the queue. It is increased on every poll (when the queue is
* nonEmpty) and reset to 0 when the stream pulls the records
*/
def outstandingPolls: UIO[Int] = queueInfoRef.get.map(_.outstandingPolls)

/**
* @param now
* the time as given by `Clock.nanoTime`
Expand Down Expand Up @@ -130,7 +141,7 @@ object PartitionStreamControl {
completedPromise <- Promise.make[Nothing, Option[Offset]]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.nanoTime
queueInfo <- Ref.make(QueueInfo(now, 0, None))
queueInfo <- Ref.make(QueueInfo(now, 0, None, 0))
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
Expand Down Expand Up @@ -171,12 +182,30 @@ object PartitionStreamControl {

// The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0.
// (Note that theoretically `size` can go below 0 when the update operations are reordered.)
private final case class QueueInfo(pullDeadline: NanoTime, size: Int, lastPulledOffset: Option[Offset]) {
private final case class QueueInfo(
pullDeadline: NanoTime,
size: Int,
lastPulledOffset: Option[Offset],
outstandingPolls: Int
) {
def withEmptyPoll: QueueInfo =
copy(outstandingPolls = outstandingPolls + 1)

def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo =
QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount, lastPulledOffset)
QueueInfo(
pullDeadline = if (size <= 0) newPullDeadline else pullDeadline,
size = size + recordCount,
lastPulledOffset = lastPulledOffset,
outstandingPolls = outstandingPolls + 1
)

def withPull(newPullDeadline: NanoTime, records: Chunk[ByteArrayCommittableRecord]): QueueInfo =
QueueInfo(newPullDeadline, size - records.size, records.lastOption.map(_.offset).orElse(lastPulledOffset))
QueueInfo(
pullDeadline = newPullDeadline,
size = size - records.size,
lastPulledOffset = records.lastOption.map(_.offset).orElse(lastPulledOffset),
outstandingPolls = 0
)

def deadlineExceeded(now: NanoTime): Boolean =
size > 0 && pullDeadline <= now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,9 @@ private[consumer] final class Runloop private (
_ <- ZIO.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) ZIO.unit
else {
if (records.isEmpty) {
streamControl.offerRecords(Chunk.empty)
} else {
val builder = ChunkBuilder.make[Record](records.size())
val iterator = records.iterator()
while (iterator.hasNext) {
Expand Down

0 comments on commit 15a0326

Please sign in to comment.