From 7e8c649859ce2009074b3609206f95f653cf542d Mon Sep 17 00:00:00 2001 From: Eric Palacios Date: Mon, 2 Mar 2020 14:30:02 +0100 Subject: [PATCH] KafkaConsumerActor: don't use deprecated consumer.committed method --- core/src/main/scala/akka/kafka/Metadata.scala | 32 +++++++++++++++++++ .../kafka/internal/KafkaConsumerActor.scala | 11 +++++++ .../akka/kafka/javadsl/MetadataClient.scala | 11 +++++++ .../akka/kafka/scaladsl/MetadataClient.scala | 14 ++++++++ .../akka/kafka/scaladsl/IntegrationSpec.scala | 8 ++--- 5 files changed, 72 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/akka/kafka/Metadata.scala b/core/src/main/scala/akka/kafka/Metadata.scala index 2d7b81050..c67036dbd 100644 --- a/core/src/main/scala/akka/kafka/Metadata.scala +++ b/core/src/main/scala/akka/kafka/Metadata.scala @@ -170,7 +170,10 @@ object Metadata { /** * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] */ + @deprecated("use `GetCommittedOffsets`", "2.0.3") final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded + + @deprecated("use `CommittedOffsets`", "2.0.3") final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition) extends Response with NoSerializationVerificationNeeded { @@ -185,5 +188,34 @@ object Metadata { * Java API: * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] */ + @deprecated("use `createGetCommittedOffsets`", "2.0.3") def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition) + + /** + * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] + */ + final case class GetCommittedOffsets(partitions: Set[TopicPartition]) + extends Request + with NoSerializationVerificationNeeded + final case class CommittedOffsets(response: Try[Map[TopicPartition, OffsetAndMetadata]]) + extends Response + with NoSerializationVerificationNeeded { + + /** + * Java API + */ + def getResponse: Optional[java.util.Map[TopicPartition, OffsetAndMetadata]] = + response + .map { m => + Optional.of(m.asJava) + } + .getOrElse(Optional.empty()) + } + + /** + * Java API: + * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] + */ + def createGetCommittedOffsets(partitions: java.util.Set[TopicPartition]): GetCommittedOffsets = + GetCommittedOffsets(partitions.asScala.toSet) } diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index 3f3554299..716449a08 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -676,6 +676,17 @@ import scala.util.control.NonFatal consumer.offsetsForTimes(search, settings.getMetadataRequestTimeout).asScala.toMap }) + case Metadata.GetCommittedOffsets(partitions) => + Metadata.CommittedOffsets( + Try { + consumer + .committed(partitions.asJava, settings.getMetadataRequestTimeout) + .asScala + .filterNot(_._2 == null) + .toMap + } + ) + case Metadata.GetCommittedOffset(partition) => Metadata.CommittedOffset( Try { consumer.committed(partition, settings.getMetadataRequestTimeout) }, diff --git a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala index c8117ccc4..95110fcf0 100644 --- a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala @@ -69,11 +69,22 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient }(ExecutionContexts.sameThreadExecutionContext) .toJava + @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] = metadataClient .getCommittedOffset(partition) .toJava + def getCommittedOffsets( + partitions: java.util.Set[TopicPartition] + ): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] = + metadataClient + .getCommittedOffsets(partitions.asScala.toSet) + .map { committedOffsets => + committedOffsets.asJava + }(ExecutionContexts.sameThreadExecutionContext) + .toJava + def close(): Unit = metadataClient.close() } diff --git a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala index 5d88885ee..21342e370 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala @@ -5,14 +5,18 @@ package akka.kafka.scaladsl +import java.util.Collections + import akka.actor.{ActorRef, ActorSystem} import akka.dispatch.ExecutionContexts import akka.kafka.Metadata.{ BeginningOffsets, CommittedOffset, + CommittedOffsets, EndOffsets, GetBeginningOffsets, GetCommittedOffset, + GetCommittedOffsets, GetEndOffsets, GetPartitionsFor, ListTopics, @@ -76,6 +80,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed case Failure(e) => Future.failed(e) }(ExecutionContexts.sameThreadExecutionContext) + @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] = (consumerActor ? GetCommittedOffset(partition))(timeout) .mapTo[CommittedOffset] @@ -85,6 +90,15 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed case Failure(e) => Future.failed(e) }(ExecutionContexts.sameThreadExecutionContext) + def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] = + (consumerActor ? GetCommittedOffsets(partitions))(timeout) + .mapTo[CommittedOffsets] + .map(_.response) + .flatMap { + case Success(res) => Future.successful(res) + case Failure(e) => Future.failed(e) + }(ExecutionContexts.sameThreadExecutionContext) + def close(): Unit = if (managedActor) { consumerActor ! KafkaConsumerActor.Stop diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index 2131e3889..be57e9108 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -308,10 +308,10 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside assert(offsetAndTs.offset() == 0, "Wrong offset in OffsetsForTimes (beginning)") } - // GetCommittedOffset - inside(Await.result(consumer ? GetCommittedOffset(partition0), 10.seconds)) { - case CommittedOffset(Success(offsetMeta), _) => - assert(offsetMeta == null, "Wrong offset in GetCommittedOffset") + // GetCommittedOffsets + inside(Await.result(consumer ? GetCommittedOffsets(Set(partition0)), 10.seconds)) { + case CommittedOffsets(Success(offsetMeta)) => + assert(offsetMeta.isEmpty, "Wrong offsets in GetCommittedOffsets") } // verify that consumption still works