Skip to content

Commit

Permalink
Add wrapper for Metadata requests #497 (#900)
Browse files Browse the repository at this point in the history
  • Loading branch information
jewertow authored and ennru committed Nov 27, 2019
1 parent ada44cd commit 97c1cff
Show file tree
Hide file tree
Showing 6 changed files with 668 additions and 12 deletions.
97 changes: 97 additions & 0 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.javadsl

import java.util.concurrent.{CompletionStage, Executor}

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.ExecutionContexts
import akka.kafka.ConsumerSettings
import akka.util.Timeout
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.compat.java8.FutureConverters._
import scala.collection.compat._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContextExecutor

class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient) {

def getBeginningOffsets[K, V](
partitions: java.util.Set[TopicPartition]
): CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] =
metadataClient
.getBeginningOffsets(partitions.asScala.toSet)
.map { beginningOffsets =>
beginningOffsets.view.mapValues(Long.box).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def getBeginningOffsetForPartition[K, V](partition: TopicPartition): CompletionStage[java.lang.Long] =
metadataClient
.getBeginningOffsetForPartition(partition)
.map(Long.box)(ExecutionContexts.sameThreadExecutionContext)
.toJava

def getEndOffsets(
partitions: java.util.Set[TopicPartition]
): CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] =
metadataClient
.getEndOffsets(partitions.asScala.toSet)
.map { endOffsets =>
endOffsets.view.mapValues(Long.box).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def getEndOffsetForPartition(partition: TopicPartition): CompletionStage[java.lang.Long] =
metadataClient
.getEndOffsetForPartition(partition)
.map(Long.box)(ExecutionContexts.sameThreadExecutionContext)
.toJava

def listTopics(): CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]] =
metadataClient
.listTopics()
.map { topics =>
topics.view.mapValues(partitionsInfo => partitionsInfo.asJava).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def getPartitionsFor(topic: java.lang.String): CompletionStage[java.util.List[PartitionInfo]] =
metadataClient
.getPartitionsFor(topic)
.map { partitionsInfo =>
partitionsInfo.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] =
metadataClient
.getCommittedOffset(partition)
.toJava

def close(): Unit =
metadataClient.close()
}

object MetadataClient {

def create(consumerActor: ActorRef, timeout: Timeout, executor: Executor): MetadataClient = {
implicit val ec: ExecutionContextExecutor = ExecutionContexts.fromExecutor(executor)
val metadataClient = akka.kafka.scaladsl.MetadataClient.create(consumerActor, timeout)
new MetadataClient(metadataClient)
}

def create[K, V](consumerSettings: ConsumerSettings[K, V],
timeout: Timeout,
system: ActorSystem,
executor: Executor): MetadataClient = {
val metadataClient = akka.kafka.scaladsl.MetadataClient
.create(consumerSettings, timeout)(system, ExecutionContexts.fromExecutor(executor))
new MetadataClient(metadataClient)
}
}
106 changes: 106 additions & 0 deletions core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.scaladsl

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.ExecutionContexts
import akka.kafka.Metadata.{
BeginningOffsets,
CommittedOffset,
EndOffsets,
GetBeginningOffsets,
GetCommittedOffset,
GetEndOffsets,
GetPartitionsFor,
ListTopics,
PartitionsFor,
Topics
}
import akka.kafka.{ConsumerSettings, KafkaConsumerActor}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managedActor: Boolean)(
implicit ec: ExecutionContext
) {

def getBeginningOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, Long]] =
(consumerActor ? GetBeginningOffsets(partitions))(timeout)
.mapTo[BeginningOffsets]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getBeginningOffsetForPartition(partition: TopicPartition): Future[Long] =
getBeginningOffsets(Set(partition))
.map(beginningOffsets => beginningOffsets(partition))

def getEndOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, Long]] =
(consumerActor ? GetEndOffsets(partitions))(timeout)
.mapTo[EndOffsets]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getEndOffsetForPartition(partition: TopicPartition): Future[Long] =
getEndOffsets(Set(partition))
.map(endOffsets => endOffsets(partition))

def listTopics(): Future[Map[String, List[PartitionInfo]]] =
(consumerActor ? ListTopics)(timeout)
.mapTo[Topics]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getPartitionsFor(topic: String): Future[List[PartitionInfo]] =
(consumerActor ? GetPartitionsFor(topic))(timeout)
.mapTo[PartitionsFor]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] =
(consumerActor ? GetCommittedOffset(partition))(timeout)
.mapTo[CommittedOffset]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def close(): Unit =
if (managedActor) {
consumerActor ! KafkaConsumerActor.Stop
}
}

object MetadataClient {

def create(consumerActor: ActorRef, timeout: Timeout)(implicit ec: ExecutionContext): MetadataClient =
new MetadataClient(consumerActor, timeout, false)

def create[K, V](
consumerSettings: ConsumerSettings[K, V],
timeout: Timeout
)(implicit system: ActorSystem, ec: ExecutionContext): MetadataClient = {
val consumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings))
new MetadataClient(consumerActor, timeout, true)
}
}
61 changes: 49 additions & 12 deletions docs/src/main/paradox/consumer-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,52 @@ project.description: Access Kafka consumer metadata by sending messages to the a
---
# Consumer Metadata

To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @scaladoc[Metadata](akka.kafka.Metadata$) to it.
## Metadata Client

`MetadataClient` is a thin wrapper for @apidoc[akka.kafka.KafkaConsumerActor$] hiding the ask calls and mapping to the correct response types.

To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) pass it to `MetadataClient`'s factory method `create`.

Another approach to create metadata client is passing the `ConsumerSettings` and `ActorSystem` objects to the factory method. Then the metadata client manages the internal actor and stops it when the `close` method is called.

The metadata the Kafka Consumer provides is documented in the @javadoc[KafkaConsumer](org.apache.kafka.clients.consumer.KafkaConsumer) API.
The metadata the `MetadataClient` provides is documented in the @javadoc[Kafka Consumer API](org.apache.kafka.clients.consumer.KafkaConsumer) API.

## Supported metadata
## Supported metadata by MetadataClient

The supported metadata are

| Metadata | Response type |
|-------| ------- |
| Topics list | @scala[Future[Map[String, List[PartitionInfo]]]]@java[CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]]] |
| Partitions | @scala[Future[List[PartitionInfo]]]@java[CompletionStage[java.util.List[PartitionInfo]]] |
| Beginning offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] |
| End offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] |
| Committed offset | @scala[Future[OffsetAndMetadata]]@java[CompletionStage[OffsetAndMetadata]] |

@@@ warning

Processing of these requests blocks the actor loop. The @apidoc[akka.kafka.KafkaConsumerActor$] is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.

Please consider to use a dedicated @apidoc[akka.kafka.KafkaConsumerActor$] to create metadata client requests against.

@@@

Example:

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala) { #metadataClient }

Java
: @@ snip [snip](/tests/src/test/java/docs/javadsl/MetadataClientTest.java) { #metadataClient }


## Accessing metadata using KafkaConsumerActor

To access the Kafka consumer metadata you need to create the @apidoc[akka.kafka.KafkaConsumerActor$] as described in the @ref[Consumer documentation](consumer.md#sharing-the-kafkaconsumer-instance) and send messages from @scaladoc[Metadata](akka.kafka.Metadata$) to it.

## Supported metadata by KafkaConsumerActor

The supported metadata are

Expand All @@ -20,19 +61,15 @@ The supported metadata are
| GetOffsetsForTimes | OffsetsForTimes |
| GetCommittedOffset | CommittedOffset |

These requests are blocking within the Kafka client library up to a timeout configured by `metadata-request-timeout` or @apidoc[ConsumerSettings.withMetadataRequestTimeout](ConsumerSettings) { java="#withMetadataRequestTimeout(metadataRequestTimeout:java.time.Duration):akka.kafka.ConsumerSettings[K,V]" scala="#withMetadataRequestTimeout(metadataRequestTimeout:scala.concurrent.duration.FiniteDuration):akka.kafka.ConsumerSettings[K,V]" } respectively.

@@@ warning
These requests are blocking within the Kafka client library up to a timeout configured by `metadata-request-timeout` or `ConsumerSettings.withMetadataRequestTimeout` respectively.

Processing of these requests blocks the actor loop. The @apidoc[akka.kafka.KafkaConsumerActor$] is configured to run on its own dispatcher, so just as the other remote calls to Kafka, the blocking happens within a designated thread pool.

However, calling these during consuming might affect performance and even cause timeouts in extreme cases.
@@@ warning

Please consider to use a dedicated @apidoc[akka.kafka.KafkaConsumerActor$] to run metadata requests against.
Accessing the Kafka consumer metadata using the `KafkaConsumerActor` is not a recommended approach. It is reasonable only when you need to perform a request `GetOffsetsForTimes` which is not supported by the `MetadataClient` yet.

@@@
@@@

## Example
Example:

Scala
: @@ snip [snip](/tests/src/test/scala/docs/scaladsl/FetchMetadata.scala) { #metadata }
Expand Down
Loading

0 comments on commit 97c1cff

Please sign in to comment.