Skip to content

Commit

Permalink
Collect commit metrics (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jan 1, 2024
1 parent 9fbaab1 commit a3e0bad
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,50 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
_ <- pollSizeHistogram.update(pollSize)
} yield ()

// -----------------------------------------------------
//
// Commit metrics
//

// Chunk(0.01,0.02,0.04,0.08,0.16,0.32,0.64,1.28,2.56) in seconds
// Chunk(10,20,40,80,160,320,640,1280,2560) in milliseconds
private val commitLatencyBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.exponential(0.01, 2.0, 9)

// Chunk(1,3,8,21,55,149,404,1097,2981,8104)
private val commitSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil))

private val commitCounter: Metric.Counter[Int] =
Metric.counterInt("ziokafka_consumer_commits", "The number of commits.").tagged(metricLabels)

private val commitLatencyHistogram: Metric.Histogram[Duration] =
Metric
.histogram(
"ziokafka_consumer_commit_latency",
"The duration of a single commit in seconds.",
commitLatencyBoundaries
)
.contramap[Duration](_.toNanos.toDouble / 1e9)
.tagged(metricLabels)

private val commitSizeHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_commit_size",
"The number of records (offsets) per commit.",
commitSizeBoundaries
)
.contramap[Int](_.toDouble)
.tagged(metricLabels)

def observeCommit(latency: Duration, commitSize: Int): UIO[Unit] =
for {
_ <- commitCounter.increment
_ <- commitLatencyHistogram.update(latency)
_ <- commitSizeHistogram.update(commitSize)
} yield ()

// -----------------------------------------------------
//
// Rebalance metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,14 @@ private[consumer] final class Runloop private (
offsets =>
for {
p <- Promise.make[Throwable, Unit]
start = java.lang.System.nanoTime()
_ <- commitQueue.offer(Runloop.Commit(offsets, p))
_ <- commandQueue.offer(RunloopCommand.CommitAvailable)
_ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets))
_ <- p.await.timeoutFail(CommitTimeout)(commitTimeout)
end = java.lang.System.nanoTime()
latency = (end - start).nanoseconds
_ <- consumerMetrics.observeCommit(latency, 0) // TODO: get commit size
} yield ()

/** Merge commits and prepare parameters for calling `consumer.commitAsync`. */
Expand Down

0 comments on commit a3e0bad

Please sign in to comment.