Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Adds custom labels for every cluster #61

Merged
merged 2 commits into from
Sep 14, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ The latest offset available for topic partition. Kafka Lag Exporter will calcul

### Labels

Each metric may include the following labels when reported.
Each metric will include all the labels defined in the clusters and may include the following labels when reported.
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved

* `cluster_name` - Either the statically defined Kafka cluster name, or the metadata.name of the Strimzi Kafka cluster that was discovered with the Strimzi auto discovery feature.
* `topic` - The Kafka topic.
Expand Down Expand Up @@ -209,6 +209,7 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |

Watchers (`kafka-lag-exporters.watchers{}`)

Expand All @@ -234,6 +235,10 @@ kafka-lag-exporter {
admin-client-properties = {
client.id = "admin-client-id"
}
labels = {
location = "ny"
zone = "us-east"
}
}
]
}
Expand Down
5 changes: 5 additions & 0 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ data:
{{ $key }} = {{ quote $val }}
{{- end }}
}
labels = {
{{- range $key, $val := $cluster.labels }}
{{ $key }} = {{ quote $val }}
{{- end }}
}
}
{{- end }}
]
Expand Down
3 changes: 3 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ clusters: {}
# security.protocol: SSL
# ssl.truststore.location: /path/to/my.truststore.jks
# ssl.trustore.password: mypwd
# labels:
# location: ny
# zone: "us-east"

## The interval between refreshing metrics
pollIntervalSeconds: 30
Expand Down
4 changes: 4 additions & 0 deletions examples/standalone/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ kafka-lag-exporter {
{
name = "a-cluster"
bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092"
labels = {
location = "ny"
zone = "us-east"
}
}
]
}
13 changes: 11 additions & 2 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ kafka-lag-exporter {
client-group-id = "kafkalagexporter"
client-group-id = ${?KAFKA_LAG_EXPORTER_CLIENT_GROUP_ID}
kafka-client-timeout = 10 seconds
kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS}
clusters = []
kafka-client-timeout = ${?KAFKA_LAG_EXPORTER_KAFKA_CLIENT_TIMEOUT_SECONDS}
clusters = [
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved
{
name = "a-cluster"
bootstrap-brokers = "localhost:9094"
labels = {
location = "ny"
zone = "us-east"
}
}
]
clusters = ${?KAFKA_LAG_EXPORTER_CLUSTERS}
watchers = {
strimzi = "false"
Expand Down
21 changes: 19 additions & 2 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.compat.java8.DurationConverters._
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Try

object AppConfig {
def apply(config: Config): AppConfig = {
Expand All @@ -32,12 +33,21 @@ object AppConfig {
parseKafkaClientsProperties(clusterConfig.getConfig("admin-client-properties"))
else
Map.empty[String, String]
val labels =
Try {
val labels = clusterConfig.getConfig("labels")
labels.entrySet().asScala.map(
entry => (entry.getKey, entry.getValue.unwrapped().toString)
).toMap
}.getOrElse(Map.empty[String, String])


KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
consumerProperties,
adminClientProperties
adminClientProperties,
labels
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
Expand Down Expand Up @@ -73,7 +83,8 @@ object AppConfig {

final case class KafkaCluster(name: String, bootstrapBrokers: String,
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty) {
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
override def toString(): String = {
s"""
| Cluster name: $name
Expand All @@ -100,5 +111,11 @@ final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, p
| Strimzi: $strimziWatcher
""".stripMargin
}

def globalLabelsForCluster(clusterName: String): Map[String, String] = {
clusters.find(_.name == clusterName).map {
cluster => cluster.labels
}.getOrElse(Map.empty[String, String])
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object MainApp extends App {

val clientCreator = (cluster: KafkaCluster) =>
KafkaClient(cluster, appConfig.clientGroupId, appConfig.clientTimeout)(kafkaClientEc)
val endpointCreator = () => PrometheusEndpointSink(appConfig.port, Metrics.definitions)
val endpointCreator = () => PrometheusEndpointSink(appConfig, Metrics.definitions)

ActorSystem(
KafkaClusterManager.init(appConfig, endpointCreator, clientCreator), "kafka-lag-exporter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@ object MetricsSink {
final case class GaugeDefinition(name: String, help: String, labels: List[String])
type MetricDefinitions = List[GaugeDefinition]

trait clusterMetric extends Metric{
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved
def clusterName: String
}

trait Metric {
def labels: List[String]
def definition: GaugeDefinition
}

trait MetricValue extends Metric {
trait MetricValue extends clusterMetric {
def value: Double
}

trait RemoveMetric extends Metric
trait RemoveMetric extends clusterMetric
}

trait MetricsSink {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,41 @@ import io.prometheus.client.{CollectorRegistry, Gauge}
import scala.util.Try

object PrometheusEndpointSink {
def apply(httpPort: Int, definitions: MetricDefinitions): MetricsSink =
Try(new PrometheusEndpointSink(httpPort, definitions))
def apply(appConfig: AppConfig, definitions: MetricDefinitions): MetricsSink =
Try(new PrometheusEndpointSink(appConfig, definitions))
.fold(t => throw new Exception("Could not create Prometheus Endpoint", t), sink => sink)
}

class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitions) extends MetricsSink {
class PrometheusEndpointSink private(appConfig: AppConfig, definitions: MetricDefinitions) extends MetricsSink {

private val server = new HTTPServer(httpPort)
private val server = new HTTPServer(appConfig.port)
private val registry = CollectorRegistry.defaultRegistry

DefaultExports.initialize()

private val metrics: Map[GaugeDefinition, Gauge] = definitions.map(definition =>
definition -> Gauge.build()
.name(definition.name)
.help(definition.help)
.labelNames(definition.labels: _*)
.register(registry)
).toMap
private val metrics: Map[String, Map[GaugeDefinition, Gauge]] = {
appConfig.clusters.map { cluster =>
val globalLabelNamesForCluster = appConfig.globalLabelsForCluster(cluster.name).map(_._1).toSeq
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved
cluster.name -> definitions.map(definition =>
definition -> Gauge.build()
.name(definition.name)
.help(definition.help)
.labelNames(globalLabelNamesForCluster ++ definition.labels: _*)
.register(registry)
).toMap
}.toMap
}

override def report(m: MetricValue): Unit = {
val metric = metrics.getOrElse(m.definition, throw new IllegalArgumentException(s"No metric with definition ${m.definition.name} registered"))
metric.labels(m.labels: _*).set(m.value)
val metric = getMetricsForClusterName(m.definition, m.clusterName)
val globalLabelValuesForCluster = appConfig.globalLabelsForCluster(m.clusterName).map(_._2).toSeq
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved
metric.labels(globalLabelValuesForCluster ++ m.labels: _*).set(m.value)
}


override def remove(m: RemoveMetric): Unit =
metrics.get(m.definition).foreach(_.remove(m.labels: _*))
override def remove(m: RemoveMetric): Unit = {
metrics.foreach(_._2.foreach(_._2.remove(m.labels: _*)))
anbarasantr marked this conversation as resolved.
Show resolved Hide resolved
}

override def stop(): Unit = {
/*
Expand All @@ -49,4 +56,9 @@ class PrometheusEndpointSink private(httpPort: Int, definitions: MetricDefinitio
registry.clear()
server.stop()
}

private def getMetricsForClusterName(gaugeDefinition: GaugeDefinition, clusterName: String): Gauge = {
val metricsForCluster = metrics.getOrElse(clusterName, throw new IllegalArgumentException(s"No metric for the ${clusterName} registered"))
metricsForCluster.getOrElse(gaugeDefinition, throw new IllegalArgumentException(s"No metric with definition ${gaugeDefinition.name} registered"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,44 @@ class AppConfigSpec extends FreeSpec with Matchers {
| admin-client-properties = {
| client.id = "admin-client-id"
| }
| labels = {
| environment= "integration"
| location = "ny"
| }
| }
| {
| name = "clusterB"
| bootstrap-brokers = "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092"
| labels = {
| environment= "production"
| }
| }
| {
| name = "clusterC"
| bootstrap-brokers = "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092"
| }
| ]
|}""".stripMargin)

val appConfig = AppConfig(config)

appConfig.clusters.length shouldBe 2
appConfig.clusters.length shouldBe 3
appConfig.clusters(0).name shouldBe "clusterA"
appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092"
appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id"
appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id"
appConfig.clusters(0).labels("environment") shouldBe "integration"
appConfig.clusters(0).labels("location") shouldBe "ny"
appConfig.clusters(1).name shouldBe "clusterB"
appConfig.clusters(1).bootstrapBrokers shouldBe "b-1.cluster-b.xyzcorp.com:9092,b-2.cluster-b.xyzcorp.com:9092"
appConfig.clusters(1).consumerProperties shouldBe Map.empty
appConfig.clusters(1).adminClientProperties shouldBe Map.empty
appConfig.clusters(1).labels("environment") shouldBe "production"
appConfig.clusters(2).name shouldBe "clusterC"
appConfig.clusters(2).bootstrapBrokers shouldBe "c-1.cluster-b.xyzcorp.com:9092,c-2.cluster-b.xyzcorp.com:9092"
appConfig.clusters(2).consumerProperties shouldBe Map.empty
appConfig.clusters(2).adminClientProperties shouldBe Map.empty
appConfig.clusters(2).labels shouldBe Map.empty
}
}

Expand Down