Skip to content

Commit

Permalink
Improvements:
Browse files Browse the repository at this point in the history
- add metric `allQueueSizeHistogram`
- better metrics descriptions
- better bucket boundaries
- observe metrics in the background
  • Loading branch information
erikvanoosten committed Dec 25, 2023
1 parent cb595dc commit 8ea6721
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import zio.{ Chunk, UIO, ZIO }

final case class ConsumerMetrics(metricsConsumerId: String) {

// Chunk(0,1,3,8,21,55,149,404,1097,2981)
private val streamCountBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048))
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil))

// Chunk(0,100,272,739,2009,5460,14842,40343,109664,298096)
private val streamSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil))

private val pendingRequestsHistogram =
Metric
.histogram(
"consumer_pending_requests",
"The number of streams that are awaiting new data.",
"The number of partition streams that are awaiting new records.",
streamCountBoundaries
)
.tagged("consumer_id", metricsConsumerId)
Expand All @@ -31,17 +36,30 @@ final case class ConsumerMetrics(metricsConsumerId: String) {
Metric
.histogram(
"consumer_queue_size",
"The number of records in stream queues.",
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0, 128, 256, 512, 1024, 2048, 4096, 8192))
"The number of records queued per partition.",
streamSizeBoundaries
)
.tagged("consumer_id", metricsConsumerId)

private val allQueueSizeHistogram =
Metric
.histogram(
"consumer_all_queue_size",
"The number of records queued in the consumer (all partitions).",
streamSizeBoundaries
)
.tagged("consumer_id", metricsConsumerId)

def observeMetrics(state: Runloop.State): UIO[Unit] =
ZIO
.when(state.subscriptionState.isSubscribed) {
(ZIO.succeed(state.pendingRequests.size.toDouble) @@ pendingRequestsHistogram) *>
(ZIO.succeed(state.pendingCommits.size.toDouble) @@ pendingCommitsHistogram) *>
ZIO.foreachDiscard(state.assignedStreams)(_.queueSize.map(_.toDouble) @@ queueSizeHistogram)
for {
queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize.map(_.toDouble))
_ <- ZIO.foreachDiscard(queueSizes)(qs => queueSizeHistogram.update(qs))
_ <- allQueueSizeHistogram.update(queueSizes.sum)
_ <- ZIO.succeed(state.pendingRequests.size.toDouble) @@ pendingRequestsHistogram
_ <- ZIO.succeed(state.pendingCommits.size.toDouble) @@ pendingCommitsHistogram
} yield ()
}
.unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ private[consumer] final class Runloop private (
.takeWhile(_ != RunloopCommand.StopRunloop)
.runFoldChunksDiscardZIO(initialState) { (state, commands) =>
for {
_ <- consumerMetrics.observeMetrics(state)
_ <- consumerMetrics.observeMetrics(state).fork
commitCommands <- commitQueue.takeAll
_ <- ZIO.logDebug(
s"Processing ${commitCommands.size} commits," +
Expand Down

0 comments on commit 8ea6721

Please sign in to comment.