diff --git a/README.md b/README.md index 52547044..caa53000 100644 --- a/README.md +++ b/README.md @@ -240,7 +240,8 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) | `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object | | `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames | | `group-whitelist` | `[".*"]` | No | A list of Regex of consumer groups monitored. For example, if you only wish to expose only certain groups with `input` and `output` prefixes, use `["^input-.+", "^output-.+"]`. | -| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. | +| `topic-whitelist` | `[".*"]` | No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. | +| `topic-blacklist` | `[]` | No | A list of Regex of topics **not** monitored. For example, if you wish **not** expose certain topics , use either `["^unmonitored-topic.+"]` or `["unmonitored-topic1", "unmonitored-topic2"]`. | | `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. | | `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. | | `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. | diff --git a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml index ee9c1e87..4b2bae7f 100644 --- a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml +++ b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml @@ -30,12 +30,20 @@ data: {{- end }} {{- if $cluster.topicWhitelist }} topic-whitelist = [ - {{- $lastIndex := sub (len $cluster.groupWhitelist) 1}} - {{- range $i, $whitelist := $cluster.groupWhitelist }} + {{- $lastIndex := sub (len $cluster.topicWhitelist) 1}} + {{- range $i, $whitelist := $cluster.topicWhitelist }} {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} {{- end }} ] {{- end }} + {{- if $cluster.topicBlacklist }} + topic-blacklist = [ + {{- $lastIndex := sub (len $cluster.topicBlacklist) 1}} + {{- range $i, $blacklist := $cluster.topicBlacklist }} + {{ quote $blacklist }}{{- if ne $i $lastIndex -}}, {{ end }} + {{- end }} + ] + {{- end }} consumer-properties = { {{- range $key, $val := $cluster.consumerProperties }} {{ $key }} = {{ quote $val }} diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 3df1040a..318264d2 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -6,6 +6,8 @@ clusters: {} # bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092" # topicWhitelist: # - "^xyz-corp-topics\..+" +# topicBlacklist: +# - "^unmonitored-topics-.+" # groupWhitelist: # - "^analytics-app-.+" # # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index 9142eb09..7b43909e 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -50,11 +50,16 @@ object AppConfig { clusterConfig.getStringList("topic-whitelist").asScala.toList else KafkaCluster.TopicWhitelistDefault + val topicBlacklist = if (clusterConfig.hasPath("topic-blacklist")) + clusterConfig.getStringList("topic-blacklist").asScala.toList + else KafkaCluster.TopicBlacklistDefault + KafkaCluster( clusterConfig.getString("name"), clusterConfig.getString("bootstrap-brokers"), groupWhitelist, topicWhitelist, + topicBlacklist, consumerProperties, adminClientProperties, labels @@ -95,11 +100,13 @@ object AppConfig { object KafkaCluster { val GroupWhitelistDefault = List(".*") val TopicWhitelistDefault = List(".*") + val TopicBlacklistDefault = List.empty[String] } final case class KafkaCluster(name: String, bootstrapBrokers: String, groupWhitelist: List[String] = KafkaCluster.GroupWhitelistDefault, topicWhitelist: List[String] = KafkaCluster.TopicWhitelistDefault, + topicBlacklist: List[String] = KafkaCluster.TopicBlacklistDefault, consumerProperties: Map[String, String] = Map.empty, adminClientProperties: Map[String, String] = Map.empty, labels: Map[String, String] = Map.empty) { @@ -109,6 +116,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String, | Cluster Kafka bootstrap brokers: $bootstrapBrokers | Consumer group whitelist: [${groupWhitelist.mkString(", ")}] | Topic whitelist: [${topicWhitelist.mkString(", ")}] + | Topic blacklist: [${topicBlacklist.mkString(", ")}] """.stripMargin } } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 5d27925c..6e6cd04c 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -162,7 +162,9 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = { val groupTopicPartitions = for { member <- desc.members().asScala - ktp <- member.assignment().topicPartitions().asScala if cluster.topicWhitelist.exists(r => ktp.topic().matches(r)) + ktp <- member.assignment().topicPartitions().asScala + if cluster.topicWhitelist.exists(r => ktp.topic().matches(r)) + if !cluster.topicBlacklist.exists(r => ktp.topic().matches(r)) } yield Domain.GroupTopicPartition( groupId, member.clientId(), diff --git a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala index c3213390..d8904e10 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala @@ -126,6 +126,17 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost)) } + "will only fetch non blacklisted topics" in { + val tmpCluster: KafkaCluster = cluster.copy(topicWhitelist = List(topic, topic2), topicBlacklist = List(topic)) + val (_,_,client) = getClient(tmpCluster) + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + + client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost)) + } + "will only fetch whitelisted topics when whitelist is a regex against multiple topics" in { val tmpCluster = cluster.copy(topicWhitelist = List("test.+")) val (_,_,client) = getClient(tmpCluster) @@ -137,6 +148,17 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost)) } + "will exclude topics that match with blacklist regex" in { + val tmpCluster = cluster.copy(topicWhitelist = List("test.+"), topicBlacklist = List(".+topic-2")) + val (_,_,client) = getClient(tmpCluster) + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + + client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2).map(gtp => gtp.copy(host = tmpHost)) + } + "will not return any topics if the whitelist is empty" in { val tmpCluster = cluster.copy(topicWhitelist = List.empty) val (_,_,client) = getClient(tmpCluster)