From e99d6b79e0554e02ad4f7d1867d604bc7f06df0a Mon Sep 17 00:00:00 2001 From: anbarasan Date: Sat, 14 Sep 2019 14:20:06 +0800 Subject: [PATCH 1/2] Adds custom labels for every cluster --- README.md | 7 +++- .../templates/030-ConfigMap.yaml | 5 +++ charts/kafka-lag-exporter/values.yaml | 3 ++ examples/standalone/application.conf | 4 ++ src/main/resources/reference.conf | 13 +++++- .../kafkalagexporter/AppConfig.scala | 21 +++++++++- .../lightbend/kafkalagexporter/MainApp.scala | 2 +- .../kafkalagexporter/MetricsSink.scala | 8 +++- .../PrometheusEndpointSink.scala | 42 ++++++++++++------- .../kafkalagexporter/AppConfigSpec.scala | 21 +++++++++- 10 files changed, 102 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 9c3062d1..fd675dfd 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ The latest offset available for topic partition. Kafka Lag Exporter will calcul ### Labels -Each metric may include the following labels when reported. +Each metric will include all the labels defined in the clusters and may include the following labels when reported. * `cluster_name` - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature. * `topic` - The Kafka topic. @@ -209,6 +209,7 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) | `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames | | `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. | | `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. | +| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. | Watchers (`kafka-lag-exporters.watchers{}`) @@ -234,6 +235,10 @@ kafka-lag-exporter { admin-client-properties = { client.id = "admin-client-id" } + labels = { + location = "ny" + zone = "us-east" + } } ] } diff --git a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml index 05f1de66..35f49282 100644 --- a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml +++ b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml @@ -30,6 +30,11 @@ data: {{ $key }} = {{ quote $val }} {{- end }} } + labels = { + {{- range $key, $val := $cluster.labels }} + {{ $key }} = {{ quote $val }} + {{- end }} + } } {{- end }} ] diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 6bc87efc..ad399f12 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -16,6 +16,9 @@ clusters: {} # security.protocol: SSL # ssl.truststore.location: /path/to/my.truststore.jks # ssl.trustore.password: mypwd +# labels: +# location: ny +# zone: "us-east" ## The interval between refreshing metrics pollIntervalSeconds: 30 diff --git a/examples/standalone/application.conf b/examples/standalone/application.conf index 112c57fc..01d6a4cc 100644 --- a/examples/standalone/application.conf +++ b/examples/standalone/application.conf @@ -4,6 +4,10 @@ kafka-lag-exporter { { name = "a-cluster" bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092" + labels = { + location = "ny" + zone = "us-east" + } } ] } \ No newline at end of file diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index a8832007..f9a56bde 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -8,8 +8,17 @@ kafka-lag-exporter { client-group-id = "kafkalagexporter" client-group-id = ${?KAFKA_LAG_EXPORTER_CLIENT_GROUP_ID} kafka-client-timeout = 10 seconds - kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS} - clusters = [] + kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS} + clusters = [ + { + name = "a-cluster" + bootstrap-brokers = "localhost:9094" + labels = { + location = "ny" + zone = "us-east" + } + } + ] clusters = ${?KAFKA_LAG_EXPORTER_CLUSTERS} watchers = { strimzi = "false" diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index 5fd5827b..851288a0 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -12,6 +12,7 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.compat.java8.DurationConverters._ import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Try object AppConfig { def apply(config: Config): AppConfig = { @@ -32,12 +33,21 @@ object AppConfig { parseKafkaClientsProperties(clusterConfig.getConfig("admin-client-properties")) else Map.empty[String, String] + val labels = + Try { + val labels = clusterConfig.getConfig("labels") + labels.entrySet().asScala.map( + entry => (entry.getKey, entry.getValue.unwrapped().toString) + ).toMap + }.getOrElse(Map.empty[String, String]) + KafkaCluster( clusterConfig.getString("name"), clusterConfig.getString("bootstrap-brokers"), consumerProperties, - adminClientProperties + adminClientProperties, + labels ) } val strimziWatcher = c.getString("watchers.strimzi").toBoolean @@ -73,7 +83,8 @@ object AppConfig { final case class KafkaCluster(name: String, bootstrapBrokers: String, consumerProperties: Map[String, String] = Map.empty, - adminClientProperties: Map[String, String] = Map.empty) { + adminClientProperties: Map[String, String] = Map.empty, + labels: Map[String, String] = Map.empty) { override def toString(): String = { s""" | Cluster name: $name @@ -100,5 +111,11 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p | Strimzi: $strimziWatcher """.stripMargin } + + def globalLabelsForCluster(clusterName: String): Map[String, String] = { + clusters.find(_.name == clusterName).map { + cluster => cluster.labels + }.getOrElse(Map.empty[String, String]) + } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala index f94919af..39667e3d 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala @@ -29,7 +29,7 @@ object MainApp extends App { val clientCreator = (cluster: KafkaCluster) => KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc) - val endpointCreator = () => PrometheusEndpointSink(appConfig.port, Metrics.definitions) + val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions) ActorSystem( KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter") diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala index f4274ad2..64fbe4ce 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala @@ -14,16 +14,20 @@ object MetricsSink { final case class GaugeDefinition(name: String, help: String, labels: List[String]) type MetricDefinitions = List[GaugeDefinition] + trait clusterMetric extends Metric{ + def clusterName: String + } + trait Metric { def labels: List[String] def definition: GaugeDefinition } - trait MetricValue extends Metric { + trait MetricValue extends clusterMetric { def value: Double } - trait RemoveMetric extends Metric + trait RemoveMetric extends clusterMetric } trait MetricsSink { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index adec176a..dd26d467 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -12,34 +12,41 @@ import io.prometheus.client.{CollectorRegistry, Gauge} import scala.util.Try object PrometheusEndpointSink { - def apply(httpPort: Int, definitions: MetricDefinitions): MetricsSink = - Try(new PrometheusEndpointSink(httpPort, definitions)) + def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink = + Try(new PrometheusEndpointSink(appConfig, definitions)) .fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink) } -class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitions) extends MetricsSink { +class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink { - private val server = new HTTPServer(httpPort) + private val server = new HTTPServer(appConfig.port) private val registry = CollectorRegistry.defaultRegistry DefaultExports.initialize() - private val metrics: Map[GaugeDefinition, Gauge] = definitions.map(definition => - definition -> Gauge.build() - .name(definition.name) - .help(definition.help) - .labelNames(definition.labels: _*) - .register(registry) - ).toMap + private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = { + appConfig.clusters.map { cluster => + val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).map(_._1).toSeq + cluster.name -> definitions.map(definition => + definition -> Gauge.build() + .name(definition.name) + .help(definition.help) + .labelNames(globalLabelNamesForCluster ++ definition.labels: _*) + .register(registry) + ).toMap + }.toMap + } override def report(m: MetricValue): Unit = { - val metric = metrics.getOrElse(m.definition, throw new IllegalArgumentException(s"No metric with definition ${m.definition.name} registered")) - metric.labels(m.labels: _*).set(m.value) + val metric = getMetricsForClusterName(m.definition, m.clusterName) + val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).map(_._2).toSeq + metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value) } - override def remove(m: RemoveMetric): Unit = - metrics.get(m.definition).foreach(_.remove(m.labels: _*)) + override def remove(m: RemoveMetric): Unit = { + metrics.foreach(_._2.foreach(_._2.remove(m.labels: _*))) + } override def stop(): Unit = { /* @@ -49,4 +56,9 @@ class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitio registry.clear() server.stop() } + + private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = { + val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered")) + metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered")) + } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala index c5bf142a..071029ed 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala @@ -22,25 +22,44 @@ class AppConfigSpec extends FreeSpec with Matchers { | admin-client-properties = { | client.id = "admin-client-id" | } + | labels = { + | environment= "integration" + | location = "ny" + | } | } | { | name = "clusterB" | bootstrap-brokers = "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092" + | labels = { + | environment= "production" + | } + | } + | { + | name = "clusterC" + | bootstrap-brokers = "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092" | } | ] |}""".stripMargin) val appConfig = AppConfig(config) - appConfig.clusters.length shouldBe 2 + appConfig.clusters.length shouldBe 3 appConfig.clusters(0).name shouldBe "clusterA" appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092" appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id" appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id" + appConfig.clusters(0).labels("environment") shouldBe "integration" + appConfig.clusters(0).labels("location") shouldBe "ny" appConfig.clusters(1).name shouldBe "clusterB" appConfig.clusters(1).bootstrapBrokers shouldBe "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092" appConfig.clusters(1).consumerProperties shouldBe Map.empty appConfig.clusters(1).adminClientProperties shouldBe Map.empty + appConfig.clusters(1).labels("environment") shouldBe "production" + appConfig.clusters(2).name shouldBe "clusterC" + appConfig.clusters(2).bootstrapBrokers shouldBe "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092" + appConfig.clusters(2).consumerProperties shouldBe Map.empty + appConfig.clusters(2).adminClientProperties shouldBe Map.empty + appConfig.clusters(2).labels shouldBe Map.empty } } From 37d457dc184b97c2aef501f1a7d47ac245623195 Mon Sep 17 00:00:00 2001 From: anbarasan Date: Sat, 14 Sep 2019 23:56:36 +0800 Subject: [PATCH 2/2] Fixes as per the review suggestions --- README.md | 2 +- src/main/resources/reference.conf | 11 +---------- .../com/lightbend/kafkalagexporter/MetricsSink.scala | 6 +++--- .../kafkalagexporter/PrometheusEndpointSink.scala | 8 +++++--- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index fd675dfd..9af6deef 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ The latest offset available for topic partition. Kafka Lag Exporter will calcul ### Labels -Each metric will include all the labels defined in the clusters and may include the following labels when reported. +Each metric may include the following labels when reported. If you define the labels property for Configuration of a cluster then those labels will also be included. * `cluster_name` - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature. * `topic` - The Kafka topic. diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index f9a56bde..a952fd41 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -9,16 +9,7 @@ kafka-lag-exporter { client-group-id = ${?KAFKA_LAG_EXPORTER_CLIENT_GROUP_ID} kafka-client-timeout = 10 seconds kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS} - clusters = [ - { - name = "a-cluster" - bootstrap-brokers = "localhost:9094" - labels = { - location = "ny" - zone = "us-east" - } - } - ] + clusters = [] clusters = ${?KAFKA_LAG_EXPORTER_CLUSTERS} watchers = { strimzi = "false" diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala index 64fbe4ce..b4b4d11c 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala @@ -14,7 +14,7 @@ object MetricsSink { final case class GaugeDefinition(name: String, help: String, labels: List[String]) type MetricDefinitions = List[GaugeDefinition] - trait clusterMetric extends Metric{ + trait ClusterMetric extends Metric{ def clusterName: String } @@ -23,11 +23,11 @@ object MetricsSink { def definition: GaugeDefinition } - trait MetricValue extends clusterMetric { + trait MetricValue extends ClusterMetric { def value: Double } - trait RemoveMetric extends clusterMetric + trait RemoveMetric extends ClusterMetric } trait MetricsSink { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index dd26d467..538d3fba 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -26,7 +26,7 @@ class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDe private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = { appConfig.clusters.map { cluster => - val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).map(_._1).toSeq + val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).keys.toSeq cluster.name -> definitions.map(definition => definition -> Gauge.build() .name(definition.name) @@ -39,13 +39,15 @@ class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDe override def report(m: MetricValue): Unit = { val metric = getMetricsForClusterName(m.definition, m.clusterName) - val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).map(_._2).toSeq + val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).values.toSeq metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value) } override def remove(m: RemoveMetric): Unit = { - metrics.foreach(_._2.foreach(_._2.remove(m.labels: _*))) + metrics.foreach { case (_, gaugeDefinitionsForCluster) => + gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(m.labels: _*)) + } } override def stop(): Unit = {