From ad9d39f41150e9ac78d96ce1c6c5412113628c4b Mon Sep 17 00:00:00 2001 From: Sean Glover Date: Sun, 22 Sep 2019 17:33:21 -0400 Subject: [PATCH] Implement consumer group whitelist & misc. refactorings --- README.md | 8 +- .../templates/030-ConfigMap.yaml | 22 +- charts/kafka-lag-exporter/values.yaml | 17 +- src/main/resources/reference.conf | 1 - .../kafkalagexporter/AppConfig.scala | 19 +- .../kafkalagexporter/KafkaClient.scala | 90 ++++--- .../kafkalagexporter/AppConfigSpec.scala | 4 + .../kafkalagexporter/KafkaClientSpec.scala | 228 ++++++++++-------- 8 files changed, 243 insertions(+), 146 deletions(-) diff --git a/README.md b/README.md index 3b85b9c8..0d11a383 100644 --- a/README.md +++ b/README.md @@ -212,7 +212,6 @@ General Configuration (`kafka-lag-exporter{}`) | `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature | | `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. | | `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` | -| `topic-whitelist` | `[".*"]` | Regex of topics monitored, e.g. `["input-.+", "output-.+"]` | Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) @@ -220,9 +219,11 @@ Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`) |---------------------------|-------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `name` | `""` | Yes | A unique cluster name to for this Kafka connection detail object | | `bootstrap-brokers` | `""` | Yes | Kafka bootstrap brokers. Comma delimited list of broker hostnames | +| `group-whitelist` | `[".*"]` | No | A list of Regex of consumer groups monitored. For example, if you only wish to expose only certain groups with `input` and `output` prefixes, use `["^input-.+", "^output-.+"]`. | +| `topic-whitelist` | `[".*"]` No | A list of Regex of topics monitored. For example, if you only wish to expose only certain topics, use either `["^topic.+"]` or `["topic1", "topic2"]`. | | `consumer-properties` | `{}` | No | A map of key value pairs used to configure the `KafkaConsumer`. See the [Consumer Config](https://kafka.apache.org/documentation/#consumerconfigs) section of the Kafka documentation for options. | | `admin-client-properties` | `{}` | No | A map of key value pairs used to configure the `AdminClient`. See the [Admin Config](https://kafka.apache.org/documentation/#adminclientconfigs) section of the Kafka documentation for options. | -| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. | +| `labels` | `{}` | No | A map of key value pairs will be set as additional custom labels per cluster for all the metrics in prometheus. | Watchers (`kafka-lag-exporters.watchers{}`) @@ -242,6 +243,9 @@ kafka-lag-exporter { { name = "a-cluster" bootstrap-brokers = "a-1.cluster-a.xyzcorp.com:9092,a-2.cluster-a.xyzcorp.com:9092,a-3.cluster-a.xyzcorp.com:9092" + topic-whitelist = [ + "widgets-.+" + ] consumer-properties = { client.id = "consumer-client-id" } diff --git a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml index a658c70c..ee9c1e87 100644 --- a/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml +++ b/charts/kafka-lag-exporter/templates/030-ConfigMap.yaml @@ -20,6 +20,22 @@ data: { name = "{{ $cluster.name }}" bootstrap-brokers = "{{ $cluster.bootstrapBrokers }}" + {{- if $cluster.groupWhitelist }} + group-whitelist = [ + {{- $lastIndex := sub (len $cluster.groupWhitelist) 1}} + {{- range $i, $whitelist := $cluster.groupWhitelist }} + {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} + {{- end }} + ] + {{- end }} + {{- if $cluster.topicWhitelist }} + topic-whitelist = [ + {{- $lastIndex := sub (len $cluster.groupWhitelist) 1}} + {{- range $i, $whitelist := $cluster.groupWhitelist }} + {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} + {{- end }} + ] + {{- end }} consumer-properties = { {{- range $key, $val := $cluster.consumerProperties }} {{ $key }} = {{ quote $val }} @@ -47,12 +63,6 @@ data: {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} {{- end }} ] - topic-whitelist = [ - {{- $lastIndex := sub (len .Values.topicWhitelist) 1}} - {{- range $i, $whitelist := .Values.topicWhitelist }} - {{ quote $whitelist }}{{- if ne $i $lastIndex -}}, {{ end }} - {{- end }} - ] } akka { diff --git a/charts/kafka-lag-exporter/values.yaml b/charts/kafka-lag-exporter/values.yaml index 4c5591bb..71e1d419 100644 --- a/charts/kafka-lag-exporter/values.yaml +++ b/charts/kafka-lag-exporter/values.yaml @@ -4,6 +4,10 @@ clusters: {} #clusters: # - name: "default" # bootstrapBrokers: "simple-strimzi-kafka-bootstrap.strimzi.svc.cluster.local:9092" +# topicWhitelist: +# - "^xyz-corp-topics\..+" +# groupWhitelist: +# - "^analytics-app-.+" # # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig # # can be defined in this configuration section. # # https://kafka.apache.org/documentation/#consumerconfigs @@ -52,19 +56,6 @@ serviceAccount: metricWhitelist: - .* -## You can use regex to control the set of topics monitored. -## For example, if you only wish to expose only certain topics, use either: -## topicWhitelist: -## - ^topic.+ -## -## Or -## -## topicWhitelist: -## - topic1 -## - topic2 -topicWhitelist: - - .* - ## The log level of the ROOT logger rootLogLevel: INFO ## The log level of Kafka Lag Exporter diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 06afc6db..cd3c2a24 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -16,7 +16,6 @@ kafka-lag-exporter { strimzi = ${?KAFKA_LAG_EXPORTER_STRIMZI} } metric-whitelist = [".*"] - topic-whitelist = [".*"] } akka { diff --git a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala index e883221d..9142eb09 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala @@ -42,11 +42,18 @@ object AppConfig { ).toMap }.getOrElse(Map.empty[String, String]) - val topicWhitelist = c.getStringList("topic-whitelist").asScala.toList + val groupWhitelist = if (clusterConfig.hasPath("group-whitelist")) + clusterConfig.getStringList("group-whitelist").asScala.toList + else KafkaCluster.GroupWhitelistDefault + + val topicWhitelist = if (clusterConfig.hasPath("topic-whitelist")) + clusterConfig.getStringList("topic-whitelist").asScala.toList + else KafkaCluster.TopicWhitelistDefault KafkaCluster( clusterConfig.getString("name"), clusterConfig.getString("bootstrap-brokers"), + groupWhitelist, topicWhitelist, consumerProperties, adminClientProperties, @@ -85,7 +92,14 @@ object AppConfig { } } -final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhitelist: List[String] = List(".*"), +object KafkaCluster { + val GroupWhitelistDefault = List(".*") + val TopicWhitelistDefault = List(".*") +} + +final case class KafkaCluster(name: String, bootstrapBrokers: String, + groupWhitelist: List[String] = KafkaCluster.GroupWhitelistDefault, + topicWhitelist: List[String] = KafkaCluster.TopicWhitelistDefault, consumerProperties: Map[String, String] = Map.empty, adminClientProperties: Map[String, String] = Map.empty, labels: Map[String, String] = Map.empty) { @@ -93,6 +107,7 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String, topicWhite s""" | Cluster name: $name | Cluster Kafka bootstrap brokers: $bootstrapBrokers + | Consumer group whitelist: [${groupWhitelist.mkString(", ")}] | Topic whitelist: [${topicWhitelist.mkString(", ")}] """.stripMargin } diff --git a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala index 8472bfe9..4afb34da 100644 --- a/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala +++ b/src/main/scala/com/lightbend/kafkalagexporter/KafkaClient.scala @@ -10,7 +10,7 @@ import java.util.concurrent.TimeUnit import java.{lang, util} import com.lightbend.kafkalagexporter.Domain.{GroupOffsets, PartitionOffsets} -import com.lightbend.kafkalagexporter.KafkaClient.KafkaClientContract +import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaClientContract} import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -28,8 +28,12 @@ object KafkaClient { val CommonClientConfigRetryBackoffMs = 1000 // longer interval between retry attempts so we don't overload clusters (default = 100ms) val ConsumerConfigAutoCommit = false - def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = - new KafkaClient(cluster, groupId, clientTimeout)(ec) + def apply(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration)(implicit ec: ExecutionContext): KafkaClientContract = { + val consumer = new ConsumerKafkaClient(createConsumerClient(cluster, groupId, clientTimeout), clientTimeout) + val adminKafkaClient = new AdminKafkaClient(createAdminClient(cluster, clientTimeout), clientTimeout) + new KafkaClient(cluster, consumer, adminKafkaClient)(ec) + } + trait KafkaClientContract { def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])] @@ -64,7 +68,7 @@ object KafkaClient { private def createConsumerClient(cluster: KafkaCluster, groupId: String, clientTimeout: FiniteDuration): KafkaConsumer[Byte, Byte] = { val props = new Properties() val deserializer = (new ByteArrayDeserializer).getClass.getName - // https://kafka.apache.org/documentation/#consumerconfigs + // KafkaConsumer config: https://kafka.apache.org/documentation/#consumerconfigs props.putAll(cluster.consumerProperties.asJava) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapBrokers) props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) @@ -85,37 +89,72 @@ object KafkaClient { private[kafkalagexporter] implicit class KafkaTopicPartitionOps(tp: Domain.TopicPartition) { def asKafka: KafkaTopicPartition = new KafkaTopicPartition(tp.topic, tp.partition) } + + /** + * AdminClient wrapper. Encapsulates calls to `AdminClient`. This abstraction exists so the `AdminClient` can be be + * mocked in tests because the various `*Result` types that are returned cannot be mocked. + */ + trait AdminKafkaClientContract { + def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]] + def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]] + def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] + def close(): Unit + } + + class AdminKafkaClient private[kafkalagexporter](client: AdminClient, clientTimeout: FiniteDuration) + (implicit ec: ExecutionContext) extends AdminKafkaClientContract { + private implicit val _clientTimeout: Duration = clientTimeout.toJava + + private val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt) + private val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis.toInt) + private val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis.toInt) + + def listConsumerGroups(): Future[util.Collection[ConsumerGroupListing]] = kafkaFuture(client.listConsumerGroups(listGroupOptions).all()) + def describeConsumerGroups(groupIds: List[String]): Future[util.Map[String, ConsumerGroupDescription]] = + kafkaFuture(client.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all()) + def listConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] = + kafkaFuture(client.listConsumerGroupOffsets(group, listConsumerGroupsOptions).partitionsToOffsetAndMetadata()) + def close(): Unit = client.close(_clientTimeout) + } + + trait ConsumerKafkaClientContract { + def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] + def close(): Unit + } + + class ConsumerKafkaClient private[kafkalagexporter](consumer: KafkaConsumer[Byte,Byte], clientTimeout: FiniteDuration) extends ConsumerKafkaClientContract { + private val _clientTimeout: Duration = clientTimeout.toJava + + def endOffsets(partitions: util.Collection[KafkaTopicPartition]): util.Map[KafkaTopicPartition, java.lang.Long] = + consumer.endOffsets(partitions, _clientTimeout) + def close(): Unit = consumer.close(_clientTimeout) + } + } class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, - groupId: String, - clientTimeout: FiniteDuration) + consumer: ConsumerKafkaClientContract, + adminClient: AdminKafkaClientContract) (implicit ec: ExecutionContext) extends KafkaClientContract { import KafkaClient._ - private implicit val _clientTimeout: Duration = clientTimeout.toJava - - private lazy val adminClient: AdminClient = createAdminClient(cluster, clientTimeout) - private lazy val consumer: KafkaConsumer[Byte,Byte] = createConsumerClient(cluster, groupId, clientTimeout) - - private lazy val listGroupOptions = new ListConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt) - private lazy val describeGroupOptions = new DescribeConsumerGroupsOptions().timeoutMs(_clientTimeout.toMillis().toInt) - private lazy val listConsumerGroupsOptions = new ListConsumerGroupOffsetsOptions().timeoutMs(_clientTimeout.toMillis().toInt) - /** * Get a list of consumer groups */ def getGroups(): Future[(List[String], List[Domain.GroupTopicPartition])] = { for { - groups <- kafkaFuture(adminClient.listConsumerGroups(listGroupOptions).all()) - groupIds = groups.asScala.map(_.groupId()).toList - groupDescriptions <- kafkaFuture(adminClient.describeConsumerGroups(groupIds.asJava, describeGroupOptions).all()) + groups <- adminClient.listConsumerGroups() + groupIds = getGroupIds(groups) + groupDescriptions <- adminClient.describeConsumerGroups(groupIds) } yield { val gtps = groupDescriptions.asScala.flatMap { case (id, desc) => groupTopicPartitions(id, desc) }.toList (groupIds, gtps) } } + private[kafkalagexporter] def getGroupIds(groups: util.Collection[ConsumerGroupListing]): List[String] = + groups.asScala.map(_.groupId()).toList.filter(g => cluster.groupWhitelist.exists(r => g.matches(r))) + private[kafkalagexporter] def groupTopicPartitions(groupId: String, desc: ConsumerGroupDescription): List[Domain.GroupTopicPartition] = { val groupTopicPartitions = for { member <- desc.members().asScala @@ -135,7 +174,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, * Get latest offsets for a set of topic partitions. */ def getLatestOffsets(now: Long, topicPartitions: Set[Domain.TopicPartition]): Try[PartitionOffsets] = Try { - val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava, _clientTimeout) + val offsets: util.Map[KafkaTopicPartition, lang.Long] = consumer.endOffsets(topicPartitions.map(_.asKafka).asJava) topicPartitions.map(tp => tp -> LookupTable.Point(offsets.get(tp.asKafka).toLong,now)).toMap } @@ -148,7 +187,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, val groupOffsetsF: Future[List[GroupOffsets]] = Future.sequence { groups.map { group => val gtps = allGtps.filter(_.id == group) - getListConsumerGroupOffsets(group) + adminClient.listConsumerGroupOffsets(group) .map(offsetMap => getGroupOffsets(now, gtps, offsetMap.asScala.toMap)) } } @@ -162,12 +201,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, * Call to `AdminClient` to get group offset info. This is only its own method so it can be mocked out in a test * because it's not possible to instantiate or mock the `ListConsumerGroupOffsetsResult` type for some reason. */ - private[kafkalagexporter] def getListConsumerGroupOffsets(group: String): Future[util.Map[KafkaTopicPartition, OffsetAndMetadata]] = - kafkaFuture { - adminClient - .listConsumerGroupOffsets(group, listConsumerGroupsOptions) - .partitionsToOffsetAndMetadata() - } + /** * Backfill any group topic partitions with no offset as None @@ -189,7 +223,7 @@ class KafkaClient private[kafkalagexporter](cluster: KafkaCluster, } yield gtp -> Some(LookupTable.Point(offset, now))).toMap def close(): Unit = { - adminClient.close(_clientTimeout) - consumer.close(_clientTimeout) + adminClient.close() + consumer.close() } } diff --git a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala index 071029ed..39ca972d 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/AppConfigSpec.scala @@ -16,6 +16,8 @@ class AppConfigSpec extends FreeSpec with Matchers { | { | name = "clusterA" | bootstrap-brokers = "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092" + | group-whitelist = ["group-a", "group-b"] + | topic-whitelist = ["topic-a", "topic-b"] | consumer-properties = { | client.id = "consumer-client-id" | } @@ -46,6 +48,8 @@ class AppConfigSpec extends FreeSpec with Matchers { appConfig.clusters.length shouldBe 3 appConfig.clusters(0).name shouldBe "clusterA" appConfig.clusters(0).bootstrapBrokers shouldBe "b-1.cluster-a.xyzcorp.com:9092,b-2.cluster-a.xyzcorp.com:9092" + appConfig.clusters(0).groupWhitelist shouldBe List("group-a", "group-b") + appConfig.clusters(0).topicWhitelist shouldBe List("topic-a", "topic-b") appConfig.clusters(0).consumerProperties("client.id") shouldBe "consumer-client-id" appConfig.clusters(0).adminClientProperties("client.id") shouldBe "admin-client-id" appConfig.clusters(0).labels("environment") shouldBe "integration" diff --git a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala index 79bb77de..c3213390 100644 --- a/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala +++ b/src/test/scala/com/lightbend/kafkalagexporter/KafkaClientSpec.scala @@ -7,96 +7,89 @@ package com.lightbend.kafkalagexporter import java.util.Optional import com.lightbend.kafkalagexporter.Domain.GroupOffsets -import com.lightbend.kafkalagexporter.KafkaClient.KafkaTopicPartitionOps - -import org.apache.kafka.clients.admin.{ ConsumerGroupDescription, MemberAssignment, MemberDescription } +import com.lightbend.kafkalagexporter.KafkaClient.{AdminKafkaClientContract, ConsumerKafkaClientContract, KafkaTopicPartitionOps} +import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.{ ConsumerGroupState, TopicPartition => TP } - -import org.mockito.MockitoSugar - +import org.apache.kafka.common.{ConsumerGroupState, TopicPartition => TP} +import org.mockito.{ArgumentMatchersSugar, MockitoSugar} import org.scalatest.concurrent.ScalaFutures import org.scalatest.{FreeSpec, Matchers} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} -import scala.concurrent.duration.FiniteDuration -class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoSugar with ScalaFutures { +class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoSugar with ArgumentMatchersSugar with ScalaFutures { "KafkaClient" - { - implicit val ec: ExecutionContextExecutor = ExecutionContext.global + "getGroupOffsets" - { + "returns None offsets for missing partitions and doesn't overwrite results for shared topic partitions" in { + val groupId0 = "testGroupId0" + val groupId1 = "testGroupId1" - "getGroupOffsets returns None offsets for missing partitions and doesn't overwrite results for shared topic partitions" in { - val groupId0 = "testGroupId0" - val groupId1 = "testGroupId1" + val groups = List(groupId0, groupId1) - val groups = List(groupId0, groupId1) + val gtp0_0 = gtp0.copy(id = groupId0) + val gtp1_0 = gtp1.copy(id = groupId0) + val gtp2_0 = gtp2.copy(id = groupId0) + val gtp0_1 = gtp0.copy(id = groupId1) - val gtp0_0 = gtp0.copy(id = groupId0) - val gtp1_0 = gtp1.copy(id = groupId0) - val gtp2_0 = gtp2.copy(id = groupId0) - val gtp0_1 = gtp0.copy(id = groupId1) + val gtps = List(gtp0_0, gtp1_0, gtp2_0, gtp0_1) - val gtps = List(gtp0_0, gtp1_0, gtp2_0, gtp0_1) + val (adminKafkaClient, _, client) = getClient() - val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms"))) + val groupId0Results = Future.successful(Map( + topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "") + // missing topicPartition1 + // missing topicPartition2 + ).asJava) + when(adminKafkaClient.listConsumerGroupOffsets(groupId0)).thenReturn(groupId0Results) - val groupId0Results = Future.successful(Map( - topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "") - // missing topicPartition1 - // missing topicPartition2 - ).asJava) - doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0) + val groupId1Results = Future.successful(Map( + topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "") + ).asJava) + when(adminKafkaClient.listConsumerGroupOffsets(groupId1)).thenReturn(groupId1Results) - val groupId1Results = Future.successful(Map( - topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "") - ).asJava) + val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue - doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1) + groupOffsets.size shouldEqual 4 + groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0)) + groupOffsets(gtp1_0) shouldEqual None // missing partition + groupOffsets(gtp2_0) shouldEqual None // missing partition + groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0)) + } - val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue + "returns distinct offsets when multiple groups subscribe to same partitions" in { + val groupId0 = "testGroupId0" + val groupId1 = "testGroupId1" - groupOffsets.size shouldEqual 4 - groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0)) - groupOffsets(gtp1_0) shouldEqual None // missing partition - groupOffsets(gtp2_0) shouldEqual None // missing partition - groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0)) + val groups = List(groupId0, groupId1) - } + val gtp0_0 = gtp0.copy(id = groupId0) + val gtp0_1 = gtp0.copy(id = groupId1) - "getGroupOffsets returns distinct offsets when multiple groups subscribe to same partitions" in { - val groupId0 = "testGroupId0" - val groupId1 = "testGroupId1" - - val groups = List(groupId0, groupId1) - - val gtp0_0 = gtp0.copy(id = groupId0) - val gtp0_1 = gtp0.copy(id = groupId1) + val gtps = List(gtp0_0, gtp0_1) - val gtps = List(gtp0_0, gtp0_1) + val (adminKafkaClient, _, client) = getClient() - val client = spy(new KafkaClient(cluster, groupId, FiniteDuration(0, "ms"))) + val groupId0Results = Future.successful(Map( + topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "") + ).asJava) + when(adminKafkaClient.listConsumerGroupOffsets(groupId0)).thenReturn(groupId0Results) - val groupId0Results = Future.successful(Map( - topicPartition0.asKafka -> new OffsetAndMetadata(0, Optional.empty(), "") - ).asJava) - doReturn(groupId0Results).when(client).getListConsumerGroupOffsets(groupId0) + val groupId1Results = Future.successful(Map( + topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "") + ).asJava) + when(adminKafkaClient.listConsumerGroupOffsets(groupId1)).thenReturn(groupId1Results) - val groupId1Results = Future.successful(Map( - topicPartition0.asKafka -> new OffsetAndMetadata(1, Optional.empty(), "") - ).asJava) - doReturn(groupId1Results).when(client).getListConsumerGroupOffsets(groupId1) + val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue - val groupOffsets = client.getGroupOffsets(0, groups, gtps).futureValue - - groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0)) - groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0)) + groupOffsets(gtp0_0) shouldEqual Some(LookupTable.Point(0, 0)) + groupOffsets(gtp0_1) shouldEqual Some(LookupTable.Point(1, 0)) + } } "getOffsetOrZero returns offsets of None (Option[Point]) for missing partitions" in { - implicit val ec = ExecutionContext.global - val client = new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")) + val (_,_,client) = getClient() // create offsetMap with missing partition 2 val offsetMap = GroupOffsets( @@ -110,51 +103,98 @@ class KafkaClientSpec extends FreeSpec with Matchers with TestData with MockitoS groupOffsets(gtp2) shouldEqual None } - "groupTopicPartition will default to fetching all topics" in { + "groupTopicPartition" - { val tmpHost = "brokers" - val client = new KafkaClient(cluster, groupId, FiniteDuration(0, "ms")) - val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava - val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + "will default to fetching all topics" in { + val (_,_,client) = getClient() + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava - val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) - client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost)) - } + client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost)) + } - "groupTopicPartition will only fetch whitelisted topic when whitelist contains a single topic" in { - val tmpHost = "brokers" - val tmpCluster = cluster.copy(topicWhitelist = List(topic2)) - val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms")) - val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava - val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + "will only fetch whitelisted topic when whitelist contains a single topic" in { + val tmpCluster = cluster.copy(topicWhitelist = List(topic2)) + val (_,_,client) = getClient(tmpCluster) + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava - val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) - client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost)) - } + client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gt2p0).map(gtp => gtp.copy(host = tmpHost)) + } - "groupTopicPartition will only fetch whitelisted topics when whitelist is a regex against multiple topics" in { - val tmpHost = "brokers" - val tmpCluster = cluster.copy(topicWhitelist = List("test.+")) - val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms")) - val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava - val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + "will only fetch whitelisted topics when whitelist is a regex against multiple topics" in { + val tmpCluster = cluster.copy(topicWhitelist = List("test.+")) + val (_,_,client) = getClient(tmpCluster) + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava - val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) - client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost)) - } + client.groupTopicPartitions(groupId, description) should contain theSameElementsAs List(gtp0, gtp1, gtp2, gt2p0).map(gtp => gtp.copy(host = tmpHost)) + } - "groupTopicPartition will not return any topics if the whitelist is empty" in { - val tmpHost = "brokers" - val tmpCluster = cluster.copy(topicWhitelist = List.empty) - val client = new KafkaClient(tmpCluster, groupId, FiniteDuration(0, "ms")) - val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava - val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava + "will not return any topics if the whitelist is empty" in { + val tmpCluster = cluster.copy(topicWhitelist = List.empty) + val (_,_,client) = getClient(tmpCluster) + val tps = Set(topicPartition0, topicPartition1, topicPartition2, topic2Partition0).map(tp => new TP(tp.topic, tp.partition)).asJava + val members = List(new MemberDescription(consumerId, clientId, tmpHost, new MemberAssignment(tps))).asJava - val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) + val description = new ConsumerGroupDescription(groupId, true, members, "", ConsumerGroupState.STABLE, node) - client.groupTopicPartitions(groupId, description) shouldBe empty + client.groupTopicPartitions(groupId, description) shouldBe empty + } } + + "getGroupIds" - { + val groupId1 = "testGroupId1" + val groupId2 = "testGroupId2" + val groupIdIotData = "iot-data" + + val groups = List( + new ConsumerGroupListing(groupId1, false), + new ConsumerGroupListing(groupId2, false), + new ConsumerGroupListing(groupIdIotData, false) + ).asJavaCollection + + "will default to fetching all group IDs" in { + val tmpCluster = cluster + val (_,_,client) = getClient(tmpCluster) + val groupIds = client.getGroupIds(groups) + groupIds should contain theSameElementsAs List(groupId1, groupId2, groupIdIotData) + } + + "will only return one group when whitelist contains a single group" in { + val tmpCluster = cluster.copy(groupWhitelist = List(groupId1)) + val (_,_,client) = getClient(tmpCluster) + val groupIds = client.getGroupIds(groups) + groupIds should contain theSameElementsAs List(groupId1) + } + + "will only return groups for whitelist that contains a regex that matches multiple groups" in { + val tmpCluster = cluster.copy(groupWhitelist = List("testGroupId[0-9]")) + val (_,_,client) = getClient(tmpCluster) + val groupIds = client.getGroupIds(groups) + groupIds should contain theSameElementsAs List(groupId1, groupId2) + } + + "will not return any groups when whitelist is empty" in { + val tmpCluster = cluster.copy(groupWhitelist = List.empty) + val (_,_,client) = getClient(tmpCluster) + val groupIds = client.getGroupIds(groups) + groupIds shouldBe List.empty + } + } + } + + def getClient(cluster: KafkaCluster = cluster, groupId: String = groupId) = { + implicit val ec: ExecutionContextExecutor = ExecutionContext.global + val adminKafkaClient = mock[AdminKafkaClientContract] + val consumer = mock[ConsumerKafkaClientContract] + val client = spy(new KafkaClient(cluster, consumer, adminKafkaClient)) + (adminKafkaClient, consumer, client) } }