diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala index 18b8974ba..a7c353f28 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerMetrics.scala @@ -6,14 +6,19 @@ import zio.{ Chunk, UIO, ZIO } final case class ConsumerMetrics(metricsConsumerId: String) { + // Chunk(0,1,3,8,21,55,149,404,1097,2981) private val streamCountBoundaries: Histogram.Boundaries = - MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048)) + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil)) + + // Chunk(0,100,272,739,2009,5460,14842,40343,109664,298096) + private val streamSizeBoundaries: Histogram.Boundaries = + MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil)) private val pendingRequestsHistogram = Metric .histogram( "consumer_pending_requests", - "The number of streams that are awaiting new data.", + "The number of partition streams that are awaiting new records.", streamCountBoundaries ) .tagged("consumer_id", metricsConsumerId) @@ -31,17 +36,30 @@ final case class ConsumerMetrics(metricsConsumerId: String) { Metric .histogram( "consumer_queue_size", - "The number of records in stream queues.", - MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0, 128, 256, 512, 1024, 2048, 4096, 8192)) + "The number of records queued per partition.", + streamSizeBoundaries + ) + .tagged("consumer_id", metricsConsumerId) + + private val allQueueSizeHistogram = + Metric + .histogram( + "consumer_all_queue_size", + "The number of records queued in the consumer (all partitions).", + streamSizeBoundaries ) .tagged("consumer_id", metricsConsumerId) def observeMetrics(state: Runloop.State): UIO[Unit] = ZIO .when(state.subscriptionState.isSubscribed) { - (ZIO.succeed(state.pendingRequests.size.toDouble) @@ pendingRequestsHistogram) *> - (ZIO.succeed(state.pendingCommits.size.toDouble) @@ pendingCommitsHistogram) *> - ZIO.foreachDiscard(state.assignedStreams)(_.queueSize.map(_.toDouble) @@ queueSizeHistogram) + for { + queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize.map(_.toDouble)) + _ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs)) + _ <- allQueueSizeHistogram.update(queueSizes.sum) + _ <- ZIO.succeed(state.pendingRequests.size.toDouble) @@ pendingRequestsHistogram + _ <- ZIO.succeed(state.pendingCommits.size.toDouble) @@ pendingCommitsHistogram + } yield () } .unit diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 3032e301f..cd23ef678 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -686,7 +686,7 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - _ <- consumerMetrics.observeMetrics(state) + _ <- consumerMetrics.observeMetrics(state).fork commitCommands <- commitQueue.takeAll _ <- ZIO.logDebug( s"Processing ${commitCommands.size} commits," +