Skip to content

Commit

Permalink
WIP: Add wrapper for Metadata requests akka#497
Browse files Browse the repository at this point in the history
  • Loading branch information
jewertow committed Sep 8, 2019
1 parent d2f0a73 commit d8d9665
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 0 deletions.
47 changes: 47 additions & 0 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.javadsl

import java.util.concurrent.{CompletionStage, Executor}

import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.util.Timeout
import org.apache.kafka.common.TopicPartition

import scala.concurrent.ExecutionContext
import scala.compat.java8.FutureConverters._
import scala.collection.JavaConverters._

object MetadataClient {

def getBeginningOffsets[K, V](
consumerSettings: ConsumerSettings[K, V],
partitions: java.util.Set[TopicPartition],
timeout: Timeout,
system: ActorSystem,
executor: Executor
): CompletionStage[java.util.Map[TopicPartition, java.lang.Long]] = {
val ec = ExecutionContext.fromExecutor(executor)
akka.kafka.scaladsl.MetadataClient
.getBeginningOffsets(consumerSettings, partitions.asScala.toSet, timeout)(system, ec)
.map { beginningOffsets =>
val scalaMapWithJavaValues = beginningOffsets.mapValues(long2Long)
scalaMapWithJavaValues.asJava
}(ec)
.toJava
}

def getBeginningOffsetForPartition[K, V](
consumerSettings: ConsumerSettings[K, V],
partition: TopicPartition,
timeout: Timeout,
system: ActorSystem,
executor: Executor
): CompletionStage[java.lang.Long] =
getBeginningOffsets(consumerSettings, Set(partition).asJava, timeout, system, executor)
.thenApply(_.get(partition))
}
49 changes: 49 additions & 0 deletions core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.scaladsl
import akka.actor.{ActorRef, ActorSystem}
import akka.kafka.{ConsumerSettings, KafkaConsumerActor}
import akka.kafka.Metadata.{BeginningOffsets, GetBeginningOffsets}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.kafka.common.TopicPartition

import scala.concurrent.{ExecutionContext, Future}

object MetadataClient {

private val consumerActors = scala.collection.mutable.Map[ConsumerSettings[Any, Any], ActorRef]()

def getBeginningOffsets[K, V](
consumerSettings: ConsumerSettings[K, V],
partitions: Set[TopicPartition],
timeout: Timeout
)(implicit system: ActorSystem, ec: ExecutionContext): Future[Map[TopicPartition, Long]] = {
val consumerActor = getConsumerActor(consumerSettings)
(consumerActor ? GetBeginningOffsets(partitions))(timeout)
.mapTo[BeginningOffsets]
.map(_.response.get)
}

def getBeginningOffsetForPartition[K, V](
consumerSettings: ConsumerSettings[K, V],
partition: TopicPartition,
timeout: Timeout
)(implicit system: ActorSystem, ec: ExecutionContext): Future[Long] =
getBeginningOffsets(consumerSettings, Set(partition), timeout)
.map(beginningOffsets => beginningOffsets(partition))

private def getConsumerActor[K, V](consumerSettings: ConsumerSettings[K, V])(implicit system: ActorSystem) = {
val consumerActor = consumerActors.get(consumerSettings.asInstanceOf[ConsumerSettings[Any, Any]])
if (consumerActor.isEmpty) {
val newConsumerActor = system.actorOf(KafkaConsumerActor.props(consumerSettings))
consumerActors.put(consumerSettings.asInstanceOf[ConsumerSettings[Any, Any]], newConsumerActor)
newConsumerActor
} else {
consumerActor.get
}
}
}
1 change: 1 addition & 0 deletions tests/src/main/scala/akka/kafka/KafkaPorts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ object KafkaPorts {
val ProducerExamplesTest = 9112
val KafkaConnectionCheckerTest = 9122
val PartitionAssignmentHandlerSpec = 9132
val MetadataClientTest = 9142
}
80 changes: 80 additions & 0 deletions tests/src/test/java/docs/javadsl/MetadataClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package docs.javadsl;

import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.KafkaPorts;
import akka.kafka.javadsl.MetadataClient;
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

public class MetadataClientTest extends EmbeddedKafkaJunit4Test {

private static final ActorSystem sys = ActorSystem.create("MetadataClientTest");
private static final Materializer mat = ActorMaterializer.create(sys);
private static final Executor ec = Executors.newSingleThreadExecutor();

public MetadataClientTest() {
super(sys, mat, KafkaPorts.MetadataClientTest());
}

@Test
public void shouldFetchBeginningOffsetsForGivenPartitionsTest() {
final String topic1 = createTopic();
final String group1 = createGroupId();
final TopicPartition partition0 = new TopicPartition(topic1, 0);
final ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(group1);
final Set<TopicPartition> partitions = Collections.singleton(partition0);
final Timeout timeout = new Timeout(1, TimeUnit.SECONDS);

final CompletionStage<Map<TopicPartition, Long>> response =
MetadataClient.getBeginningOffsets(consumerSettings, partitions, timeout, sys, ec);
final Map<TopicPartition, Long> beginningOffsets = response.toCompletableFuture().join();

assertThat(beginningOffsets.get(partition0), is(0L));
}

@Test
public void shouldFetchBeginningOffsetForGivenPartitionTest() {
final String topic1 = createTopic();
final String group1 = createGroupId();
final TopicPartition partition0 = new TopicPartition(topic1, 0);
final ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(group1);
final Timeout timeout = new Timeout(1, TimeUnit.SECONDS);

final CompletionStage<Long> response =
MetadataClient.getBeginningOffsetForPartition(
consumerSettings, partition0, timeout, sys, ec);
final Long beginningOffset = response.toCompletableFuture().join();

assertThat(beginningOffset, is(0L));
}

@AfterClass
public static void afterClass() {
TestKit.shutdownActorSystem(sys);
}
}
45 changes: 45 additions & 0 deletions tests/src/test/scala/akka/kafka/scaladsl/MetadataClientSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.scaladsl

import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Await
import scala.language.postfixOps
import scala.concurrent.duration._

class MetadataClientSpec extends SpecBase with TestcontainersKafkaLike {

"MetadataClient" must {
"fetch beginning offsets for given partitions" in assertAllStagesStopped {
val topic1 = createTopic(1)
val group1 = createGroupId(1)
val partition0 = new TopicPartition(topic1, 0)
val consumerSettings = consumerDefaults.withGroupId(group1)

val beginningOffsetsFuture = MetadataClient
.getBeginningOffsets(consumerSettings, Set(partition0), 1 seconds)
val beginningOffsets = Await.result(beginningOffsetsFuture, 1 seconds)

beginningOffsets(partition0) shouldBe 0
}

"fetch beginning offset for given partition" in assertAllStagesStopped {
val topic1 = createTopic(1)
val group1 = createGroupId(1)
val partition0 = new TopicPartition(topic1, 0)
val consumerSettings = consumerDefaults.withGroupId(group1)

val beginningOffsetFuture = MetadataClient
.getBeginningOffsetForPartition(consumerSettings, partition0, 1 seconds)
val beginningOffset = Await.result(beginningOffsetFuture, 1 seconds)

beginningOffset shouldBe 0
}
}
}

0 comments on commit d8d9665

Please sign in to comment.