Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wrapper for Metadata requests #497 #900

Merged
merged 33 commits into from
Nov 27, 2019
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d8d9665
WIP: Add wrapper for Metadata requests #497
jewertow Sep 8, 2019
0509d60
Code review fixes.
jewertow Sep 14, 2019
abeba92
Fix CI compilation error.
jewertow Sep 16, 2019
fc16270
Fix CI compilation error.
jewertow Sep 16, 2019
0e6d4a9
Fix CI compilation error.
jewertow Sep 17, 2019
887c267
Fix CI compilation error.
jewertow Sep 17, 2019
47a0c47
MetadataClientTest extends TestcontainersKafkaJunit4Test instead of E…
jewertow Sep 18, 2019
9ad60c0
Add stopping consumer actor in tests.
jewertow Sep 23, 2019
64858bb
Add failing cases to MetadataClientTest and MetadataClientSpec.
jewertow Sep 25, 2019
5d9c31c
Fix compilation errors.
jewertow Sep 29, 2019
9fdac7b
Add MetadataClient.getEndOffsets and MetadataClient.getEndOffsetForPa…
jewertow Sep 29, 2019
bb6f27e
Add scaladsl.MetadataClient.getListTopics
jewertow Sep 29, 2019
2be13e6
Add javadsl.MetadataClient.getListTopics
jewertow Sep 29, 2019
42c916d
Remove unnecessary tests
jewertow Sep 29, 2019
dd2c27c
Rename method getListTopics to listTopics
jewertow Sep 29, 2019
0740da7
Add scaladsl.MetadataClient.getPartitionsFor
jewertow Sep 29, 2019
eadc7ba
Add javadsl.MetadataClient.getPartitionsFor
jewertow Sep 29, 2019
74e3093
WIP: Proposal of MetadataClient as a class.
jewertow Oct 7, 2019
ba80d6d
WIP: Proposal of MetadataClient as a companion object.
jewertow Oct 8, 2019
bedd7d7
Refactoring
jewertow Oct 8, 2019
0426290
Merge branch 'master' into wip/497-metadata-requests-wrapper
jewertow Oct 24, 2019
b8a55a6
Add MetadataClient.getEndOffsets and MetadataClient.getEndOffsetForPa…
jewertow Oct 27, 2019
b0ea82a
Add MetadataClient.listTopics
jewertow Oct 27, 2019
7a4b783
Add MetadataClient.getPartitionsFor
jewertow Oct 27, 2019
261b778
Add MetadataClient.getCommittedOffset
jewertow Nov 6, 2019
66ac730
Stop actors created by MetadataClient
jewertow Nov 7, 2019
00d227e
Update docs
jewertow Nov 22, 2019
c2e7a86
Fix docs
jewertow Nov 22, 2019
dc9cc47
Remove dumplicated comments
jewertow Nov 25, 2019
dde1d38
Rename method MetadataClient#stop to close
jewertow Nov 25, 2019
845e1ff
Response types in the reader's preferred language
jewertow Nov 25, 2019
18ca14a
Docs for MetadataClient in consumer-metadata.md
jewertow Nov 26, 2019
efd03ad
Merge branch 'master' into wip/497-metadata-requests-wrapper
jewertow Nov 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add javadsl.MetadataClient.getListTopics
  • Loading branch information
jewertow committed Sep 29, 2019
commit 2be13e6630906e9f2c521434cff2b2789cf30b3f
16 changes: 15 additions & 1 deletion core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ import java.util.concurrent.{CompletionStage, Executor}

import akka.actor.ActorRef
import akka.util.Timeout
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.compat.java8.FutureConverters._
@@ -73,4 +73,18 @@ object MetadataClient {
.map(Long.box)
.toJava
}

def getListTopics(
consumerActor: ActorRef,
timeout: Timeout,
executor: Executor
): CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]] = {
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
akka.kafka.scaladsl.MetadataClient
.getListTopics(consumerActor, timeout)
.map { topics =>
topics.view.mapValues(partitionsInfo => partitionsInfo.asJava).toMap.asJava
}
.toJava
}
}
33 changes: 33 additions & 0 deletions tests/src/test/java/docs/javadsl/MetadataClientTest.java
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
import akka.stream.Materializer;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.hamcrest.core.IsInstanceOf;
import org.junit.AfterClass;
@@ -23,6 +24,7 @@
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionException;
@@ -31,7 +33,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.util.stream.Collectors.toSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;

public class MetadataClientTest extends TestcontainersKafkaJunit4Test {
@@ -214,6 +218,35 @@ public void shouldFailInCaseOfAnExceptionDuringFetchEndOffsetForNonExistingTopic
response.toCompletableFuture().join();
}

@Test
public void shouldFetchTopicList() {
final String group = createGroupId();
final String topic1 = createTopic(1, 2);
final String topic2 = createTopic(2, 1);
final ConsumerSettings<String, String> consumerSettings = consumerDefaults().withGroupId(group);
final Timeout timeout = new Timeout(1, TimeUnit.SECONDS);
final ActorRef consumerActor = system().actorOf(KafkaConsumerActor.props(consumerSettings));

produceString(topic1, 10, 0).toCompletableFuture().join();
produceString(topic1, 10, 1).toCompletableFuture().join();
produceString(topic2, 10, 0).toCompletableFuture().join();

final CompletionStage<Map<String, List<PartitionInfo>>> response =
MetadataClient.getListTopics(consumerActor, timeout, ec);
final Map<String, List<PartitionInfo>> topics = response.toCompletableFuture().join();

final Set<Integer> partitionsForTopic1 =
topics.get(topic1).stream().map(PartitionInfo::partition).collect(toSet());

final Set<Integer> partitionsForTopic2 =
topics.get(topic2).stream().map(PartitionInfo::partition).collect(toSet());

assertThat(partitionsForTopic1, containsInAnyOrder(0, 1));
assertThat(partitionsForTopic2, containsInAnyOrder(0));

consumerActor.tell(KafkaConsumerActor.stop(), ActorRef.noSender());
}

@AfterClass
public static void afterClass() {
TestKit.shutdownActorSystem(sys);