Skip to content

Commit

Permalink
Collect poll metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jan 1, 2024
1 parent f9d9abb commit 9fbaab1
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,50 @@ import zio._

final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {

// -----------------------------------------------------
//
// Poll metrics
//

// Chunk(0.01,0.02,0.04,0.08,0.16,0.32,0.64,1.28,2.56) in seconds
// Chunk(10,20,40,80,160,320,640,1280,2560) in milliseconds
private val pollLatencyBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.exponential(0.01, 2.0, 9)

// Chunk(1,3,8,21,55,149,404,1097,2981,8104)
private val pollSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil))

private val pollCounter: Metric.Counter[Int] =
Metric.counterInt("ziokafka_consumer_polls", "The number of polls.").tagged(metricLabels)

private val pollLatencyHistogram: Metric.Histogram[Duration] =
Metric
.histogram(
"ziokafka_consumer_poll_latency",
"The duration of a single poll in seconds.",
pollLatencyBoundaries
)
.contramap[Duration](_.toNanos.toDouble / 1e9)
.tagged(metricLabels)

private val pollSizeHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_poll_size",
"The number of records fetched by a single poll.",
pollSizeBoundaries
)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

def observePoll(latency: Duration, pollSize: Int): UIO[Unit] =
for {
_ <- pollCounter.increment
_ <- pollLatencyHistogram.update(latency)
_ <- pollSizeHistogram.update(pollSize)
} yield ()

// -----------------------------------------------------
//
// Rebalance metrics
Expand All @@ -20,8 +64,8 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
"ziokafka_consumer_partitions_currently_assigned",
"The number of partitions currently assigned to the consumer"
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private def partitionsToStateCounter(state: String): Metric.Counter[Int] =
Metric
Expand Down Expand Up @@ -57,7 +101,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
private val streamSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil))

private val pollSizeBoundaries: Histogram.Boundaries =
private val queuePollSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk[Double](0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

private val pendingRequestsHistogram =
Expand All @@ -67,8 +111,8 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
"The number of partition streams that are awaiting new records.",
streamCountBoundaries
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val pendingCommitsHistogram =
Metric
Expand All @@ -77,8 +121,8 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
"The number of commits that are awaiting completion.",
streamCountBoundaries
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val queueSizeHistogram =
Metric
Expand All @@ -87,18 +131,18 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
"The number of records queued per partition.",
streamSizeBoundaries
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val queuePollsHistogram =
Metric
.histogram(
"ziokafka_consumer_queue_polls",
"The number of polls records are idling in the queue for a partition.",
pollSizeBoundaries
queuePollSizeBoundaries
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val allQueueSizeHistogram =
Metric
Expand All @@ -107,8 +151,8 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
"The number of records queued in the consumer (all partitions).",
streamSizeBoundaries
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

def observeMetrics(state: Runloop.State): UIO[Unit] =
ZIO
Expand Down
36 changes: 22 additions & 14 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,14 @@ private[consumer] final class Runloop private (
if (toPause.nonEmpty) c.pause(toPause.asJava)
}

private def handlePoll(state: State): Task[State] =
private def handlePoll(state: State): Task[State] = {
def timed[A](task: => A): (Duration, A) = {
val start = java.lang.System.nanoTime()
val result = task
val end = java.lang.System.nanoTime()
((end - start).nanoseconds, result)
}

for {
partitionsToFetch <- fetchStrategy.selectPartitionsToFetch(state.assignedStreams)
_ <- ZIO.logDebug(
Expand All @@ -442,21 +449,21 @@ private[consumer] final class Runloop private (

resumeAndPausePartitions(c, prevAssigned, partitionsToFetch)

val polledRecords = {
val records = c.poll(pollTimeout)
if (records eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else records
}
val (pollDuration, recordsOrNull) = timed(c.poll(pollTimeout))
val polledRecords =
if (recordsOrNull eq null) ConsumerRecords.empty[Array[Byte], Array[Byte]]() else recordsOrNull

diagnostics.emit {
val providedTps = polledRecords.partitions().asScala.toSet
val requestedPartitions = state.pendingRequests.map(_.tp).toSet
consumerMetrics.observePoll(pollDuration, polledRecords.count()) *>
diagnostics.emit {
val providedTps = polledRecords.partitions().asScala.toSet
val requestedPartitions = state.pendingRequests.map(_.tp).toSet

DiagnosticEvent.Poll(
tpRequested = requestedPartitions,
tpWithData = providedTps,
tpWithoutData = requestedPartitions -- providedTps
)
} *>
DiagnosticEvent.Poll(
tpRequested = requestedPartitions,
tpWithData = providedTps,
tpWithoutData = requestedPartitions -- providedTps
)
} *>
lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap {
case RebalanceEvent(false, _, _, _, _) =>
// The fast track, rebalance listener was not invoked:
Expand Down Expand Up @@ -548,6 +555,7 @@ private[consumer] final class Runloop private (
pendingCommits = updatedPendingCommits,
assignedStreams = pollResult.assignedStreams
)
}

/**
* Check each stream to see if it exceeded its poll interval. If so, halt it. In addition, if any stream has exceeded
Expand Down

0 comments on commit 9fbaab1

Please sign in to comment.