Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Evict all metrics for a cluster on collector stop or failure #154

Merged
merged 2 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import java.time.Clock
import akka.actor.Cancellable
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import com.lightbend.kafkalagexporter.ConsumerGroupCollector.OffsetsSnapshot.MetricKeys
import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract
import com.lightbend.kafkalagexporter.LookupTable.Table.{LagIsZero, Prediction, TooFewPoints}
import org.slf4j.Logger

import scala.collection.immutable
import scala.concurrent.duration._
Expand All @@ -27,21 +29,33 @@ object ConsumerGroupCollector {
final case object Stop extends Stop
final case class StopWithError(throwable: Throwable) extends Message
final case class MetaData(pollTime: Long) extends Message
object OffsetsSnapshot {
final case class MetricKeys(
tps: List[Domain.TopicPartition] = Nil,
groups: List[String] = Nil,
gtps: List[Domain.GroupTopicPartition] = Nil
)

}
final case class OffsetsSnapshot(
timestamp: Long,
groups: List[String],
earliestOffsets: PartitionOffsets,
latestOffsets: PartitionOffsets,
lastGroupOffsets: GroupOffsets
) extends Message {
import OffsetsSnapshot._

private val TpoFormat = " %-64s%-11s%s"
private val GtpFormat = " %-64s%-64s%-11s%s"

def diff(other: OffsetsSnapshot): (List[TopicPartition], List[String], List[GroupTopicPartition]) = {
val evictedTps = latestOffsets.keySet.diff(other.latestOffsets.keySet).toList
val evictedGroups = groups.diff(other.groups)
val evictedGtps = lastGroupOffsets.keySet.diff(other.lastGroupOffsets.keySet).toList
(evictedTps, evictedGroups, evictedGtps)
val metricKeys: MetricKeys = MetricKeys(latestOffsets.keys.toList, groups, lastGroupOffsets.keys.toList)

def diff(other: OffsetsSnapshot): MetricKeys = {
val evictedTps = metricKeys.tps.diff(other.metricKeys.tps)
val evictedGroups = metricKeys.groups.diff(other.metricKeys.groups)
val evictedGtps = metricKeys.gtps.diff(other.metricKeys.gtps)
MetricKeys(evictedTps, evictedGroups, evictedGtps)
}

override def toString: String = {
Expand Down Expand Up @@ -156,13 +170,10 @@ object ConsumerGroupCollector {
case (context, snapshot: OffsetsSnapshot) =>
context.log.debug("Received Offsets Snapshot:\n{}", snapshot)

val (evictedTps, evictedGroups, evictedGtps) = state
.lastSnapshot
.map(_.diff(snapshot))
.getOrElse((Nil, Nil, Nil))
val evictedKeys = state.lastSnapshot.map(_.diff(snapshot)).getOrElse(MetricKeys())

context.log.info("Updating lookup tables")
refreshLookupTable(state, snapshot, evictedTps)
refreshLookupTable(state, snapshot, evictedKeys.tps)

reporters.foreach { reporter =>
context.log.info("Reporting offsets")
Expand All @@ -171,7 +182,7 @@ object ConsumerGroupCollector {
reportConsumerGroupMetrics(config, reporter, snapshot, state.topicPartitionTables)

context.log.info("Clearing evicted metrics")
reportEvictedMetrics(config, reporter, evictedTps, evictedGroups, evictedGtps)
evictMetricsFromReporter(config, reporter, evictedKeys)
}

context.log.info("Polling in {}", config.pollInterval)
Expand All @@ -193,14 +204,25 @@ object ConsumerGroupCollector {
state.scheduledCollect.cancel()
Behaviors.stopped { () =>
client.close()
evictAllClusterMetrics(context.log, config, reporters, state)
context.log.info("Gracefully stopped polling and Kafka client for cluster: {}", config.cluster.name)
}
case (_, StopWithError(t)) =>
case (context, StopWithError(t)) =>
state.scheduledCollect.cancel()
client.close()
evictAllClusterMetrics(context.log, config, reporters, state)
throw new Exception("A failure occurred while retrieving offsets. Shutting down.", t)
}

/**
* Evict all metrics from reports before shutdown
*/
private def evictAllClusterMetrics(log: Logger, config: CollectorConfig, reporters: List[ActorRef[MetricsSink.Message]], state: CollectorState) = {
log.info("Clearing all metrics before shutdown")
val metricKeys = state.lastSnapshot.map(_.metricKeys).getOrElse(MetricKeys())
reporters.foreach(reporter => evictMetricsFromReporter(config, reporter, metricKeys))
}

/**
* Refresh Lookup table. Remove topic partitions that are no longer relevant and update tables with new Point's.
*/
Expand Down Expand Up @@ -281,26 +303,27 @@ object ConsumerGroupCollector {
} reporter ! Metrics.TopicPartitionValueMessage(Metrics.LatestOffsetMetric, config.cluster.name, tp, point.offset)
}

private def reportEvictedMetrics(
private def evictMetricsFromReporter(
config: CollectorConfig,
reporter: ActorRef[MetricsSink.Message],
tps: List[Domain.TopicPartition],
groups: List[String],
gtps: List[Domain.GroupTopicPartition]): Unit = {
tps.foreach(tp => reporter ! Metrics.TopicPartitionRemoveMetricMessage(Metrics.LatestOffsetMetric, config.cluster.name, tp))
groups.foreach { group =>
metricKeys: MetricKeys): Unit = {
metricKeys.tps.foreach { tp =>
reporter ! Metrics.TopicPartitionRemoveMetricMessage(Metrics.LatestOffsetMetric, config.cluster.name, tp)
reporter ! Metrics.TopicPartitionRemoveMetricMessage(Metrics.EarliestOffsetMetric, config.cluster.name, tp)
}
metricKeys.groups.foreach { group =>
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupOffsetLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.MaxGroupTimeLagMetric, config.cluster.name, group)
reporter ! Metrics.GroupRemoveMetricMessage(Metrics.SumGroupOffsetLagMetric, config.cluster.name, group)
}
gtps.foreach { gtp =>
metricKeys.gtps.foreach { gtp =>
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.LastGroupOffsetMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.OffsetLagMetric, config.cluster.name, gtp)
reporter ! Metrics.GroupPartitionRemoveMetricMessage(Metrics.TimeLagMetric, config.cluster.name, gtp)
}

for {
(group, gtps) <- gtps.groupBy(_.id)
(group, gtps) <- metricKeys.gtps.groupBy(_.id)
topic <- gtps.map(_.topic).distinct
} reporter ! Metrics.GroupTopicRemoveMetricMessage(Metrics.SumGroupTopicOffsetLagMetric, config.cluster.name, group, topic)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.lightbend.kafkalagexporter.integration

object ExporterPorts {
val IntegrationSpec = 8000
val MetricsEvictionSpec = 8001
val MetricsEvictionOnFailureSpec = 8002
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.lightbend.kafkalagexporter.Metrics._

import scala.util.Try

class IntegrationSpec extends SpecBase(exporterPort = 8000) {
class IntegrationSpec extends SpecBase(exporterPort = ExporterPorts.IntegrationSpec) {

"kafka lag exporter" should {
val group = createGroupId(1)
Expand All @@ -25,6 +25,7 @@ class IntegrationSpec extends SpecBase(exporterPort = 8000) {

val rules = List(
Rule.create(LatestOffsetMetric, (actual: String) => actual shouldBe (totalOffsets + 1).toDouble.toString, clusterName, topic, partition),
Rule.create(EarliestOffsetMetric, (actual: String) => actual shouldBe 0.toDouble.toString, clusterName, topic, partition),
Rule.create(LastGroupOffsetMetric, (actual: String) => actual shouldBe offsetsToCommit.toDouble.toString, clusterName, group, topic, partition),
Rule.create(OffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group, topic, partition),
// TODO: update test so we can assert actual lag in time. keep producer running for more than two polling cycles.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter.integration

import com.lightbend.kafkalagexporter.Metrics._

class MetricsEvictionOnFailureSpec extends SpecBase(exporterPort = ExporterPorts.MetricsEvictionOnFailureSpec) {
"kafka lag exporter" should {
"not report metrics for group members or partitions after a failure" in {
val group = createGroupId(1)
val partition = "0"
val topic = createTopic(1, 1, 1)

val offsetsToCommit = 5
val totalOffsets = 10

val rules = List(
Rule.create(LatestOffsetMetric, (actual: String) => actual shouldBe (totalOffsets + 1).toDouble.toString, clusterName, topic, partition),
Rule.create(EarliestOffsetMetric, (actual: String) => actual shouldBe 0.toDouble.toString, clusterName, topic, partition),
Rule.create(LastGroupOffsetMetric, (actual: String) => actual shouldBe offsetsToCommit.toDouble.toString, clusterName, group, topic, partition),
Rule.create(OffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group, topic, partition),
Rule.create(TimeLagMetric, (_: String) => (), clusterName, group, topic, partition),
Rule.create(MaxGroupOffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group),
Rule.create(MaxGroupTimeLagMetric, (_: String) => (), clusterName, group)
)

val simulator = new LagSimulator(topic, group)
simulator.produceElements(totalOffsets)
simulator.consumeElements(offsetsToCommit)
simulator.shutdown()

eventually(scrapeAndAssert(exporterPort, "Assert offset-based metrics", rules: _*))

stopKafka()

eventually(scrapeAndAssertDne(exporterPort, "Assert offset-based metrics no longer exist", rules: _*))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package com.lightbend.kafkalagexporter.integration

import scala.jdk.CollectionConverters._
import com.lightbend.kafkalagexporter.Metrics.{LastGroupOffsetMetric, LatestOffsetMetric, MaxGroupOffsetLagMetric, MaxGroupTimeLagMetric, OffsetLagMetric, TimeLagMetric}
import com.lightbend.kafkalagexporter.Metrics._

class MetricsEvictionSpec extends SpecBase(exporterPort = 8001) {
import scala.jdk.CollectionConverters._

class MetricsEvictionSpec extends SpecBase(exporterPort = ExporterPorts.MetricsEvictionSpec) {
"kafka lag exporter" should {
"not report metrics for group members or partitions that no longer exist" in {
val group = createGroupId(1)
Expand All @@ -20,6 +20,7 @@ class MetricsEvictionSpec extends SpecBase(exporterPort = 8001) {

val rules = List(
Rule.create(LatestOffsetMetric, (actual: String) => actual shouldBe (totalOffsets + 1).toDouble.toString, clusterName, topic, partition),
Rule.create(EarliestOffsetMetric, (actual: String) => actual shouldBe 0.toDouble.toString, clusterName, topic, partition),
Rule.create(LastGroupOffsetMetric, (actual: String) => actual shouldBe offsetsToCommit.toDouble.toString, clusterName, group, topic, partition),
Rule.create(OffsetLagMetric, (actual: String) => actual shouldBe (offsetsToCommit + 1).toDouble.toString, clusterName, group, topic, partition),
Rule.create(TimeLagMetric, (_: String) => (), clusterName, group, topic, partition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ abstract class SpecBase(val exporterPort: Int)

override def afterEach(): Unit = {
kafkaLagExporter ! KafkaClusterManager.Stop
Await.result(kafkaLagExporter.whenTerminated, 10 seconds)
Await.result(kafkaLagExporter.whenTerminated, 15 seconds)
}
}