Skip to content

Commit

Permalink
Collect rebalance metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jan 1, 2024
1 parent 12ac387 commit f9d9abb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,49 @@ import zio._

final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {

// -----------------------------------------------------
//
// Rebalance metrics
//

private val rebalanceCounter: Metric.Counter[Int] =
Metric.counterInt("ziokafka_consumer_rebalances", "The number of rebalances").tagged(metricLabels)

private val partitionsCurrentlyAssignedGauge: Metric.Gauge[Int] =
Metric
.gauge(
"ziokafka_consumer_partitions_currently_assigned",
"The number of partitions currently assigned to the consumer"
)
.tagged(metricLabels)
.contramap[Int](_.toDouble)

private def partitionsToStateCounter(state: String): Metric.Counter[Int] =
Metric
.counterInt(
s"ziokafka_consumer_partitions_$state",
s"The number of partitions $state to the consumer"
)
.tagged(metricLabels)

private val partitionsAssignedCounter = partitionsToStateCounter("assigned")
private val partitionsRevokedCounter = partitionsToStateCounter("revoked")
private val partitionsLostCounter = partitionsToStateCounter("lost")

def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] =
for {
_ <- rebalanceCounter.increment
_ <- partitionsCurrentlyAssignedGauge.update(currentlyAssignedCount)
_ <- partitionsAssignedCounter.incrementBy(assignedCount)
_ <- partitionsRevokedCounter.incrementBy(revokedCount)
_ <- partitionsLostCounter.incrementBy(lostCount)
} yield ()

// -----------------------------------------------------
//
// Partition stream metrics
//

// Chunk(0,1,3,8,21,55,149,404,1097,2981)
private val streamCountBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil))
Expand All @@ -25,7 +68,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
streamCountBoundaries
)
.tagged(metricLabels)
.contramap((_: Int).toDouble)
.contramap[Int](_.toDouble)

private val pendingCommitsHistogram =
Metric
Expand All @@ -35,7 +78,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
streamCountBoundaries
)
.tagged(metricLabels)
.contramap((_: Int).toDouble)
.contramap[Int](_.toDouble)

private val queueSizeHistogram =
Metric
Expand All @@ -45,7 +88,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
streamSizeBoundaries
)
.tagged(metricLabels)
.contramap((_: Int).toDouble)
.contramap[Int](_.toDouble)

private val queuePollsHistogram =
Metric
Expand All @@ -55,7 +98,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
pollSizeBoundaries
)
.tagged(metricLabels)
.contramap((_: Int).toDouble)
.contramap[Int](_.toDouble)

private val allQueueSizeHistogram =
Metric
Expand All @@ -65,7 +108,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
streamSizeBoundaries
)
.tagged(metricLabels)
.contramap((_: Int).toDouble)
.contramap[Int](_.toDouble)

def observeMetrics(state: Runloop.State): UIO[Unit] =
ZIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ private[consumer] final class Runloop private (
_ <-
committedOffsetsRef.update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit]

_ <- consumerMetrics.observeRebalance(
currentAssigned.size,
assignedTps.size,
revokedTps.size,
lostTps.size
)
_ <- diagnostics.emit(
Rebalance(
revoked = revokedTps,
Expand Down

0 comments on commit f9d9abb

Please sign in to comment.