diff --git a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala new file mode 100644 index 000000000..bc87d00d5 --- /dev/null +++ b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.javadsl + +import java.util.concurrent.{CompletionStage, Executor} + +import akka.actor.ActorSystem +import akka.kafka.ConsumerSettings +import akka.util.Timeout +import org.apache.kafka.common.TopicPartition + +import scala.concurrent.ExecutionContext +import scala.compat.java8.FutureConverters._ +import scala.collection.JavaConverters._ + +object MetadataClient { + + def getBeginningOffsets[K, V]( + consumerSettings: ConsumerSettings[K, V], + partitions: java.util.Set[TopicPartition], + timeout: Timeout, + system: ActorSystem, + executor: Executor + ): CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] = { + val ec = ExecutionContext.fromExecutor(executor) + akka.kafka.scaladsl.MetadataClient + .getBeginningOffsets(consumerSettings, partitions.asScala.toSet, timeout)(system, ec) + .map { beginningOffsets => + val scalaMapWithJavaValues = beginningOffsets.mapValues(long2Long) + scalaMapWithJavaValues.asJava + }(ec) + .toJava + } + + def getBeginningOffsetForPartition[K, V]( + consumerSettings: ConsumerSettings[K, V], + partition: TopicPartition, + timeout: Timeout, + system: ActorSystem, + executor: Executor + ): CompletionStage[java.lang.Long] = + getBeginningOffsets(consumerSettings, Set(partition).asJava, timeout, system, executor) + .thenApply(_.get(partition)) +} diff --git a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala new file mode 100644 index 000000000..dadccd895 --- /dev/null +++ b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.scaladsl +import akka.actor.{ActorRef, ActorSystem} +import akka.kafka.{ConsumerSettings, KafkaConsumerActor} +import akka.kafka.Metadata.{BeginningOffsets, GetBeginningOffsets} +import akka.pattern.ask +import akka.util.Timeout +import org.apache.kafka.common.TopicPartition + +import scala.concurrent.{ExecutionContext, Future} + +object MetadataClient { + + private val consumerActors = scala.collection.mutable.Map[ConsumerSettings[Any, Any], ActorRef]() + + def getBeginningOffsets[K, V]( + consumerSettings: ConsumerSettings[K, V], + partitions: Set[TopicPartition], + timeout: Timeout + )(implicit system: ActorSystem, ec: ExecutionContext): Future[Map[TopicPartition, Long]] = { + val consumerActor = getConsumerActor(consumerSettings) + (consumerActor ? GetBeginningOffsets(partitions))(timeout) + .mapTo[BeginningOffsets] + .map(_.response.get) + } + + def getBeginningOffsetForPartition[K, V]( + consumerSettings: ConsumerSettings[K, V], + partition: TopicPartition, + timeout: Timeout + )(implicit system: ActorSystem, ec: ExecutionContext): Future[Long] = + getBeginningOffsets(consumerSettings, Set(partition), timeout) + .map(beginningOffsets => beginningOffsets(partition)) + + private def getConsumerActor[K, V](consumerSettings: ConsumerSettings[K, V])(implicit system: ActorSystem) = { + val consumerActor = consumerActors.get(consumerSettings.asInstanceOf[ConsumerSettings[Any, Any]]) + if (consumerActor.isEmpty) { + val newConsumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings)) + consumerActors.put(consumerSettings.asInstanceOf[ConsumerSettings[Any, Any]], newConsumerActor) + newConsumerActor + } else { + consumerActor.get + } + } +} diff --git a/tests/src/main/scala/akka/kafka/KafkaPorts.scala b/tests/src/main/scala/akka/kafka/KafkaPorts.scala index d9bb08091..2cdcf5a0d 100644 --- a/tests/src/main/scala/akka/kafka/KafkaPorts.scala +++ b/tests/src/main/scala/akka/kafka/KafkaPorts.scala @@ -25,4 +25,5 @@ object KafkaPorts { val ProducerExamplesTest = 9112 val KafkaConnectionCheckerTest = 9122 val PartitionAssignmentHandlerSpec = 9132 + val MetadataClientTest = 9142 } diff --git a/tests/src/test/java/docs/javadsl/MetadataClientTest.java b/tests/src/test/java/docs/javadsl/MetadataClientTest.java new file mode 100644 index 000000000..2548d4b3a --- /dev/null +++ b/tests/src/test/java/docs/javadsl/MetadataClientTest.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package docs.javadsl; + +import akka.actor.ActorSystem; +import akka.kafka.ConsumerSettings; +import akka.kafka.KafkaPorts; +import akka.kafka.javadsl.MetadataClient; +import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.testkit.javadsl.TestKit; +import akka.util.Timeout; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class MetadataClientTest extends EmbeddedKafkaJunit4Test { + + private static final ActorSystem sys = ActorSystem.create("MetadataClientTest"); + private static final Materializer mat = ActorMaterializer.create(sys); + private static final Executor ec = Executors.newSingleThreadExecutor(); + + public MetadataClientTest() { + super(sys, mat, KafkaPorts.MetadataClientTest()); + } + + @Test + public void shouldFetchBeginningOffsetsForGivenPartitionsTest() { + final String topic1 = createTopic(); + final String group1 = createGroupId(); + final TopicPartition partition0 = new TopicPartition(topic1, 0); + final ConsumerSettings consumerSettings = + consumerDefaults().withGroupId(group1); + final Set partitions = Collections.singleton(partition0); + final Timeout timeout = new Timeout(1, TimeUnit.SECONDS); + + final CompletionStage> response = + MetadataClient.getBeginningOffsets(consumerSettings, partitions, timeout, sys, ec); + final Map beginningOffsets = response.toCompletableFuture().join(); + + assertThat(beginningOffsets.get(partition0), is(0L)); + } + + @Test + public void shouldFetchBeginningOffsetForGivenPartitionTest() { + final String topic1 = createTopic(); + final String group1 = createGroupId(); + final TopicPartition partition0 = new TopicPartition(topic1, 0); + final ConsumerSettings consumerSettings = + consumerDefaults().withGroupId(group1); + final Timeout timeout = new Timeout(1, TimeUnit.SECONDS); + + final CompletionStage response = + MetadataClient.getBeginningOffsetForPartition( + consumerSettings, partition0, timeout, sys, ec); + final Long beginningOffset = response.toCompletableFuture().join(); + + assertThat(beginningOffset, is(0L)); + } + + @AfterClass + public static void afterClass() { + TestKit.shutdownActorSystem(sys); + } +} diff --git a/tests/src/test/scala/akka/kafka/scaladsl/MetadataClientSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/MetadataClientSpec.scala new file mode 100644 index 000000000..22163e18a --- /dev/null +++ b/tests/src/test/scala/akka/kafka/scaladsl/MetadataClientSpec.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2014 - 2016 Softwaremill + * Copyright (C) 2016 - 2019 Lightbend Inc. + */ + +package akka.kafka.scaladsl + +import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped +import org.apache.kafka.common.TopicPartition + +import scala.concurrent.Await +import scala.language.postfixOps +import scala.concurrent.duration._ + +class MetadataClientSpec extends SpecBase with TestcontainersKafkaLike { + + "MetadataClient" must { + "fetch beginning offsets for given partitions" in assertAllStagesStopped { + val topic1 = createTopic(1) + val group1 = createGroupId(1) + val partition0 = new TopicPartition(topic1, 0) + val consumerSettings = consumerDefaults.withGroupId(group1) + + val beginningOffsetsFuture = MetadataClient + .getBeginningOffsets(consumerSettings, Set(partition0), 1 seconds) + val beginningOffsets = Await.result(beginningOffsetsFuture, 1 seconds) + + beginningOffsets(partition0) shouldBe 0 + } + + "fetch beginning offset for given partition" in assertAllStagesStopped { + val topic1 = createTopic(1) + val group1 = createGroupId(1) + val partition0 = new TopicPartition(topic1, 0) + val consumerSettings = consumerDefaults.withGroupId(group1) + + val beginningOffsetFuture = MetadataClient + .getBeginningOffsetForPartition(consumerSettings, partition0, 1 seconds) + val beginningOffset = Await.result(beginningOffsetFuture, 1 seconds) + + beginningOffset shouldBe 0 + } + } +}