Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Add kafka topic blacklist #90

Merged
merged 6 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)
| `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object |
| `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames |
| `group-whitelist` | `[".*"]` | No | A list of Regex of consumer groups monitored. For example, if you only wish to expose only certain groups with `input` and `output` prefixes, use `["^input-.+", "^output-.+"]`. |
| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. |
| `topic-whitelist` | `[".*"]` | No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. |
| `topic-blacklist` | `[]` | No | A list of Regex of topics **not** monitored. For example, if you wish **not** expose certain topics , use either `["^unmonitored-topic.+"]` or `["unmonitored-topic1", "unmonitored-topic2"]`. |
| `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. |
| `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. |
| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. |
Expand Down
12 changes: 10 additions & 2 deletions charts/kafka-lag-exporter/templates/030-ConfigMap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@ data:
{{- end }}
{{- if $cluster.topicWhitelist }}
topic-whitelist = [
{{- $lastIndex := sub (len $cluster.groupWhitelist) 1}}
{{- range $i, $whitelist := $cluster.groupWhitelist }}
{{- $lastIndex := sub (len $cluster.topicWhitelist) 1}}
{{- range $i, $whitelist := $cluster.topicWhitelist }}
{{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
{{- if $cluster.topicBlacklist }}
topic-blacklist = [
{{- $lastIndex := sub (len $cluster.topicBlacklist) 1}}
{{- range $i, $blacklist := $cluster.topicBlacklist }}
{{ quote $blacklist }}{{- if ne $i $lastIndex -}}, {{ end }}
{{- end }}
]
{{- end }}
consumer-properties = {
{{- range $key, $val := $cluster.consumerProperties }}
{{ $key }} = {{ quote $val }}
Expand Down
2 changes: 2 additions & 0 deletions charts/kafka-lag-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ clusters: {}
# bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092"
# topicWhitelist:
# - "^xyz-corp-topics\..+"
# topicBlacklist:
# - "^unmonitored-topics-.+"
# groupWhitelist:
# - "^analytics-app-.+"
# # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ object AppConfig {
clusterConfig.getStringList("topic-whitelist").asScala.toList
else KafkaCluster.TopicWhitelistDefault

val topicBlacklist = if (clusterConfig.hasPath("topic-blacklist"))
clusterConfig.getStringList("topic-blacklist").asScala.toList
else KafkaCluster.TopicBlacklistDefault

KafkaCluster(
clusterConfig.getString("name"),
clusterConfig.getString("bootstrap-brokers"),
groupWhitelist,
topicWhitelist,
topicBlacklist,
consumerProperties,
adminClientProperties,
labels
Expand Down Expand Up @@ -95,11 +100,13 @@ object AppConfig {
object KafkaCluster {
val GroupWhitelistDefault = List(".*")
val TopicWhitelistDefault = List(".*")
val TopicBlacklistDefault = List.empty[String]
}

final case class KafkaCluster(name: String, bootstrapBrokers: String,
groupWhitelist: List[String] = KafkaCluster.GroupWhitelistDefault,
topicWhitelist: List[String] = KafkaCluster.TopicWhitelistDefault,
topicBlacklist: List[String] = KafkaCluster.TopicBlacklistDefault,
consumerProperties: Map[String, String] = Map.empty,
adminClientProperties: Map[String, String] = Map.empty,
labels: Map[String, String] = Map.empty) {
Expand All @@ -109,6 +116,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
| Cluster Kafka bootstrap brokers: $bootstrapBrokers
| Consumer group whitelist: [${groupWhitelist.mkString(", ")}]
| Topic whitelist: [${topicWhitelist.mkString(", ")}]
| Topic blacklist: [${topicBlacklist.mkString(", ")}]
""".stripMargin
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster,
private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = {
val groupTopicPartitions = for {
member <- desc.members().asScala
ktp <- member.assignment().topicPartitions().asScala if cluster.topicWhitelist.exists(r => ktp.topic().matches(r))
ktp <- member.assignment().topicPartitions().asScala
if cluster.topicWhitelist.exists(r => ktp.topic().matches(r))
if !cluster.topicBlacklist.exists(r => ktp.topic().matches(r))
} yield Domain.GroupTopicPartition(
groupId,
member.clientId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS
client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"will only fetch non blacklisted topics" in {
val tmpCluster: KafkaCluster = cluster.copy(topicWhitelist = List(topic, topic2), topicBlacklist = List(topic))
val (_,_,client) = getClient(tmpCluster)
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"will only fetch whitelisted topics when whitelist is a regex against multiple topics" in {
val tmpCluster = cluster.copy(topicWhitelist = List("test.+"))
val (_,_,client) = getClient(tmpCluster)
Expand All @@ -137,6 +148,17 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS
client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost))
}

"will exclude topics that match with blacklist regex" in {
val tmpCluster = cluster.copy(topicWhitelist = List("test.+"), topicBlacklist = List(".+topic-2"))
val (_,_,client) = getClient(tmpCluster)
val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava
val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava

val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node)

client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2).map(gtp => gtp.copy(host = tmpHost))
}

"will not return any topics if the whitelist is empty" in {
val tmpCluster = cluster.copy(topicWhitelist = List.empty)
val (_,_,client) = getClient(tmpCluster)
Expand Down