Skip to content

Commit

Permalink
Prepare for user customization
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten committed Jan 14, 2024
1 parent 0737fe3 commit 3f3e2c7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,33 @@ import zio.metrics.MetricKeyType.Histogram
import zio.metrics._
import zio._

final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
/**
* Implementations of this trait are responsible for measuring all consumer metrics. The different methods are invoked
* from different places in the consumer.
*
* WARNING: This is an INTERNAL API. If may change in an incompatible way, or disappear, without notice, in any
* zio-kafka version.
*/
private[internal] trait ConsumerMetrics {
def observePoll(resumedCount: Int, pausedCount: Int, latency: Duration, pollSize: Int): UIO[Unit]
def observeCommit(latency: Duration, commitSize: Long): UIO[Unit]
def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit]
def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit]
}

/**
* A `ConsumerMetrics` that uses zio-metrics for measuring.
*
* Sub-classes are allowed to override the Histogram boundaries.
*
* WARNING: This is an INTERNAL API. If may change in an incompatible way, or disappear, without notice, in any
* zio-kafka version.
*
* @param metricLabels
* the metric labels that are added to each metric
*/
//noinspection ScalaWeakerAccess
private[internal] class ZioConsumerMetrics(metricLabels: Set[MetricLabel]) extends ConsumerMetrics {

// -----------------------------------------------------
//
Expand All @@ -13,13 +39,13 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {

// 0.01,0.03,0.08,0.21,0.55,1.49,4.04,10.97,29.81,81.04 in seconds
// 10,30,80,210,550,1490,4040,10970,29810,81040 in milliseconds
private val pollLatencyBoundaries: Histogram.Boundaries =
protected val pollLatencyBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(
Chunk.iterate(0.01, 10)(_ * Math.E).map(d => Math.ceil(d * 100.0) / 100.0)
)

// Chunk(1,3,8,21,55,149,404,1097,2981,8104)
private val pollSizeBoundaries: Histogram.Boundaries =
// 1,3,8,21,55,149,404,1097,2981,8104
protected val pollSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil))

private val pollCounter: Metric.Counter[Int] =
Expand Down Expand Up @@ -65,7 +91,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

def observePoll(resumedCount: Int, pausedCount: Int, latency: Duration, pollSize: Int): UIO[Unit] =
override def observePoll(resumedCount: Int, pausedCount: Int, latency: Duration, pollSize: Int): UIO[Unit] =
for {
_ <- pollCounter.increment
_ <- partitionsResumedInLatestPollGauge.update(resumedCount)
Expand All @@ -81,13 +107,13 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {

// 0.01,0.03,0.08,0.21,0.55,1.49,4.04,10.97,29.81,81.04 in seconds
// 10,30,80,210,550,1490,4040,10970,29810,81040 in milliseconds
private val commitLatencyBoundaries: Histogram.Boundaries =
protected val commitLatencyBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(
Chunk.iterate(0.01, 10)(_ * Math.E).map(d => Math.ceil(d * 100.0) / 100.0)
)

// Chunk(1,3,8,21,55,149,404,1097,2981,8104)
private val commitSizeBoundaries: Histogram.Boundaries =
// 1,3,8,21,55,149,404,1097,2981,8104
protected val commitSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk.iterate(1.0, 10)(_ * Math.E).map(Math.ceil))

private val commitCounter: Metric.Counter[Int] =
Expand Down Expand Up @@ -116,7 +142,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Long](_.toDouble)
.tagged(metricLabels)

def observeCommit(latency: Duration, commitSize: Long): UIO[Unit] =
override def observeCommit(latency: Duration, commitSize: Long): UIO[Unit] =
for {
_ <- commitCounter.increment
_ <- commitLatencyHistogram.update(latency)
Expand Down Expand Up @@ -154,7 +180,12 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
private val partitionsRevokedCounter = partitionsToStateCounter("revoked")
private val partitionsLostCounter = partitionsToStateCounter("lost")

def observeRebalance(currentlyAssignedCount: Int, assignedCount: Int, revokedCount: Int, lostCount: Int): UIO[Unit] =
override def observeRebalance(
currentlyAssignedCount: Int,
assignedCount: Int,
revokedCount: Int,
lostCount: Int
): UIO[Unit] =
for {
_ <- rebalanceCounter.increment
_ <- partitionsCurrentlyAssignedGauge.update(currentlyAssignedCount)
Expand All @@ -168,18 +199,18 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
// Runloop metrics
//

// Chunk(0,1,3,8,21,55,149,404,1097,2981)
private val streamCountBoundaries: Histogram.Boundaries =
// 0,1,3,8,21,55,149,404,1097,2981
protected val streamCountBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil))

// Chunk(0,100,272,739,2009,5460,14842,40343,109664,298096)
private val streamSizeBoundaries: Histogram.Boundaries =
// 0,100,272,739,2009,5460,14842,40343,109664,298096
protected val streamSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(100.0, 9)(_ * Math.E).map(Math.ceil))

private val queuePollSizeBoundaries: Histogram.Boundaries =
protected val queuePollSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk[Double](0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

private val pendingRequestsHistogram =
private val pendingRequestsHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_pending_requests",
Expand All @@ -189,7 +220,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val pendingCommitsHistogram =
private val pendingCommitsHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_pending_commits",
Expand All @@ -199,7 +230,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val queueSizeHistogram =
private val queueSizeHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_queue_size",
Expand All @@ -209,7 +240,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val queuePollsHistogram =
private val queuePollsHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_queue_polls",
Expand All @@ -219,7 +250,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

private val allQueueSizeHistogram =
private val allQueueSizeHistogram: Metric.Histogram[Int] =
Metric
.histogram(
"ziokafka_consumer_all_queue_size",
Expand All @@ -238,8 +269,8 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[SubscriptionState](s => if (s.isSubscribed) 1 else 0)
.tagged(metricLabels)

// Chunk(0,1,3,8,21,55,149,404,1097,2981)
private val commandAndCommitQueueSizeBoundaries: Histogram.Boundaries =
// 0,1,3,8,21,55,149,404,1097,2981
protected val commandAndCommitQueueSizeBoundaries: Histogram.Boundaries =
MetricKeyType.Histogram.Boundaries.fromChunk(Chunk(0.0) ++ Chunk.iterate(1.0, 9)(_ * Math.E).map(Math.ceil))

private val commandQueueSizeHistogram: Metric.Histogram[Int] =
Expand All @@ -262,7 +293,7 @@ final case class ConsumerMetrics(metricLabels: Set[MetricLabel]) {
.contramap[Int](_.toDouble)
.tagged(metricLabels)

def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] =
override def observeRunloopMetrics(state: Runloop.State, commandQueueSize: Int, commitQueueSize: Int): UIO[Unit] =
for {
_ <- ZIO.foreachDiscard(state.assignedStreams)(_.outstandingPolls @@ queuePollsHistogram)
queueSizes <- ZIO.foreach(state.assignedStreams)(_.queueSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[consumer] final class Runloop private (
private val restartStreamsOnRebalancing = settings.restartStreamOnRebalancing
private val rebalanceSafeCommits = settings.rebalanceSafeCommits

private val consumerMetrics = ConsumerMetrics(settings.metricLabels)
private val consumerMetrics = new ZioConsumerMetrics(settings.metricLabels)

private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] =
PartitionStreamControl.newPartitionStream(tp, commandQueue, diagnostics, maxPollInterval)
Expand Down

0 comments on commit 3f3e2c7

Please sign in to comment.