diff --git a/README.md b/README.md index 9c3062d1..9af6deef 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ The latest offset available for topic partition. Kafka Lag Exporter will calcul ### Labels -Each metric may include the following labels when reported. +Each metric may include the following labels when reported. If you define the labels property for Configuration of a cluster then those labels will also be included. * `cluster_name` - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature. * `topic` - The Kafka topic. @@ -209,6 +209,7 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) | `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames | | `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. | | `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. | +| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. | Watchers (`kafka-lag-exporters.watchers{}`) @@ -234,6 +235,10 @@ kafka-lag-exporter { admin-client-properties = { client.id = "admin-client-id" } + labels = { + location = "ny" + zone = "us-east" + } } ] } diff --git a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml index 05f1de66..35f49282 100644 --- a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml +++ b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml @@ -30,6 +30,11 @@ data: {{ $key }} = {{ quote $val }} {{- end }} } + labels = { + {{- range $key, $val := $cluster.labels }} + {{ $key }} = {{ quote $val }} + {{- end }} + } } {{- end }} ] diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 6bc87efc..ad399f12 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -16,6 +16,9 @@ clusters: {} # security.protocol: SSL # ssl.truststore.location: /path/to/my.truststore.jks # ssl.trustore.password: mypwd +# labels: +# location: ny +# zone: "us-east" ## The interval between refreshing metrics pollIntervalSeconds: 30 diff --git a/examples/standalone/application.conf b/examples/standalone/application.conf index 112c57fc..01d6a4cc 100644 --- a/examples/standalone/application.conf +++ b/examples/standalone/application.conf @@ -4,6 +4,10 @@ kafka-lag-exporter { { name = "a-cluster" bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092" + labels = { + location = "ny" + zone = "us-east" + } } ] } \ No newline at end of file diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index a8832007..a952fd41 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -8,7 +8,7 @@ kafka-lag-exporter { client-group-id = "kafkalagexporter" client-group-id = ${?KAFKA_LAG_EXPORTER_CLIENT_GROUP_ID} kafka-client-timeout = 10 seconds - kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS} + kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS} clusters = [] clusters = ${?KAFKA_LAG_EXPORTER_CLUSTERS} watchers = { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index 5fd5827b..851288a0 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -12,6 +12,7 @@ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.compat.java8.DurationConverters._ import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Try object AppConfig { def apply(config: Config): AppConfig = { @@ -32,12 +33,21 @@ object AppConfig { parseKafkaClientsProperties(clusterConfig.getConfig("admin-client-properties")) else Map.empty[String, String] + val labels = + Try { + val labels = clusterConfig.getConfig("labels") + labels.entrySet().asScala.map( + entry => (entry.getKey, entry.getValue.unwrapped().toString) + ).toMap + }.getOrElse(Map.empty[String, String]) + KafkaCluster( clusterConfig.getString("name"), clusterConfig.getString("bootstrap-brokers"), consumerProperties, - adminClientProperties + adminClientProperties, + labels ) } val strimziWatcher = c.getString("watchers.strimzi").toBoolean @@ -73,7 +83,8 @@ object AppConfig { final case class KafkaCluster(name: String, bootstrapBrokers: String, consumerProperties: Map[String, String] = Map.empty, - adminClientProperties: Map[String, String] = Map.empty) { + adminClientProperties: Map[String, String] = Map.empty, + labels: Map[String, String] = Map.empty) { override def toString(): String = { s""" | Cluster name: $name @@ -100,5 +111,11 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p | Strimzi: $strimziWatcher """.stripMargin } + + def globalLabelsForCluster(clusterName: String): Map[String, String] = { + clusters.find(_.name == clusterName).map { + cluster => cluster.labels + }.getOrElse(Map.empty[String, String]) + } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala index f94919af..39667e3d 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MainApp.scala @@ -29,7 +29,7 @@ object MainApp extends App { val clientCreator = (cluster: KafkaCluster) => KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc) - val endpointCreator = () => PrometheusEndpointSink(appConfig.port, Metrics.definitions) + val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions) ActorSystem( KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter") diff --git a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala index f4274ad2..b4b4d11c 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/MetricsSink.scala @@ -14,16 +14,20 @@ object MetricsSink { final case class GaugeDefinition(name: String, help: String, labels: List[String]) type MetricDefinitions = List[GaugeDefinition] + trait ClusterMetric extends Metric{ + def clusterName: String + } + trait Metric { def labels: List[String] def definition: GaugeDefinition } - trait MetricValue extends Metric { + trait MetricValue extends ClusterMetric { def value: Double } - trait RemoveMetric extends Metric + trait RemoveMetric extends ClusterMetric } trait MetricsSink { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala index adec176a..538d3fba 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/PrometheusEndpointSink.scala @@ -12,34 +12,43 @@ import io.prometheus.client.{CollectorRegistry, Gauge} import scala.util.Try object PrometheusEndpointSink { - def apply(httpPort: Int, definitions: MetricDefinitions): MetricsSink = - Try(new PrometheusEndpointSink(httpPort, definitions)) + def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink = + Try(new PrometheusEndpointSink(appConfig, definitions)) .fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink) } -class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitions) extends MetricsSink { +class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink { - private val server = new HTTPServer(httpPort) + private val server = new HTTPServer(appConfig.port) private val registry = CollectorRegistry.defaultRegistry DefaultExports.initialize() - private val metrics: Map[GaugeDefinition, Gauge] = definitions.map(definition => - definition -> Gauge.build() - .name(definition.name) - .help(definition.help) - .labelNames(definition.labels: _*) - .register(registry) - ).toMap + private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = { + appConfig.clusters.map { cluster => + val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).keys.toSeq + cluster.name -> definitions.map(definition => + definition -> Gauge.build() + .name(definition.name) + .help(definition.help) + .labelNames(globalLabelNamesForCluster ++ definition.labels: _*) + .register(registry) + ).toMap + }.toMap + } override def report(m: MetricValue): Unit = { - val metric = metrics.getOrElse(m.definition, throw new IllegalArgumentException(s"No metric with definition ${m.definition.name} registered")) - metric.labels(m.labels: _*).set(m.value) + val metric = getMetricsForClusterName(m.definition, m.clusterName) + val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).values.toSeq + metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value) } - override def remove(m: RemoveMetric): Unit = - metrics.get(m.definition).foreach(_.remove(m.labels: _*)) + override def remove(m: RemoveMetric): Unit = { + metrics.foreach { case (_, gaugeDefinitionsForCluster) => + gaugeDefinitionsForCluster.get(m.definition).foreach(_.remove(m.labels: _*)) + } + } override def stop(): Unit = { /* @@ -49,4 +58,9 @@ class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitio registry.clear() server.stop() } + + private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = { + val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered")) + metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered")) + } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala index c5bf142a..071029ed 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala @@ -22,25 +22,44 @@ class AppConfigSpec extends FreeSpec with Matchers { | admin-client-properties = { | client.id = "admin-client-id" | } + | labels = { + | environment= "integration" + | location = "ny" + | } | } | { | name = "clusterB" | bootstrap-brokers = "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092" + | labels = { + | environment= "production" + | } + | } + | { + | name = "clusterC" + | bootstrap-brokers = "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092" | } | ] |}""".stripMargin) val appConfig = AppConfig(config) - appConfig.clusters.length shouldBe 2 + appConfig.clusters.length shouldBe 3 appConfig.clusters(0).name shouldBe "clusterA" appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092" appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id" appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id" + appConfig.clusters(0).labels("environment") shouldBe "integration" + appConfig.clusters(0).labels("location") shouldBe "ny" appConfig.clusters(1).name shouldBe "clusterB" appConfig.clusters(1).bootstrapBrokers shouldBe "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092" appConfig.clusters(1).consumerProperties shouldBe Map.empty appConfig.clusters(1).adminClientProperties shouldBe Map.empty + appConfig.clusters(1).labels("environment") shouldBe "production" + appConfig.clusters(2).name shouldBe "clusterC" + appConfig.clusters(2).bootstrapBrokers shouldBe "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092" + appConfig.clusters(2).consumerProperties shouldBe Map.empty + appConfig.clusters(2).adminClientProperties shouldBe Map.empty + appConfig.clusters(2).labels shouldBe Map.empty } }