diff --git a/core/src/main/scala/akka/kafka/Metadata.scala b/core/src/main/scala/akka/kafka/Metadata.scala index 5ab588396..151c7e9c7 100644 --- a/core/src/main/scala/akka/kafka/Metadata.scala +++ b/core/src/main/scala/akka/kafka/Metadata.scala @@ -170,7 +170,10 @@ object Metadata { /** * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] */ + @deprecated("use `GetCommittedOffsets`", "2.0.3") final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded + + @deprecated("use `CommittedOffsets`", "2.0.3") final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition) extends Response with NoSerializationVerificationNeeded { @@ -185,5 +188,30 @@ object Metadata { * Java API: * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] */ + @deprecated("use `createGetCommittedOffsets`", "2.0.3") def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition) + + /** + * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] + */ + final case class GetCommittedOffsets(partitions: Set[TopicPartition]) + extends Request + with NoSerializationVerificationNeeded + final case class CommittedOffsets(response: Try[Map[TopicPartition, OffsetAndMetadata]]) + extends Response + with NoSerializationVerificationNeeded { + + /** + * Java API + */ + def getResponse: Optional[java.util.Map[TopicPartition, OffsetAndMetadata]] = + Optional.ofNullable(response.toOption.map(_.asJava).orNull) + } + + /** + * Java API: + * [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]] + */ + def createGetCommittedOffsets(partitions: java.util.Set[TopicPartition]): GetCommittedOffsets = + GetCommittedOffsets(partitions.asScala.toSet) } diff --git a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala index fa8dbfdac..57a257d25 100644 --- a/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala +++ b/core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala @@ -676,6 +676,17 @@ import scala.util.control.NonFatal consumer.offsetsForTimes(search, settings.getMetadataRequestTimeout).asScala.toMap }) + case Metadata.GetCommittedOffsets(partitions) => + Metadata.CommittedOffsets( + Try { + consumer + .committed(partitions.asJava, settings.getMetadataRequestTimeout) + .asScala + .filterNot(_._2 == null) + .toMap + } + ) + case Metadata.GetCommittedOffset(partition) => Metadata.CommittedOffset( Try { consumer.committed(partition, settings.getMetadataRequestTimeout) }, diff --git a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala index 413889488..caa39600c 100644 --- a/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala @@ -69,11 +69,22 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient }(ExecutionContexts.sameThreadExecutionContext) .toJava + @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] = metadataClient .getCommittedOffset(partition) .toJava + def getCommittedOffsets( + partitions: java.util.Set[TopicPartition] + ): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] = + metadataClient + .getCommittedOffsets(partitions.asScala.toSet) + .map { committedOffsets => + committedOffsets.asJava + }(ExecutionContexts.sameThreadExecutionContext) + .toJava + def close(): Unit = metadataClient.close() } diff --git a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala index 8d15eb46f..fa65caa44 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala @@ -5,14 +5,18 @@ package akka.kafka.scaladsl +import java.util.Collections + import akka.actor.{ActorRef, ActorSystem} import akka.dispatch.ExecutionContexts import akka.kafka.Metadata.{ BeginningOffsets, CommittedOffset, + CommittedOffsets, EndOffsets, GetBeginningOffsets, GetCommittedOffset, + GetCommittedOffsets, GetEndOffsets, GetPartitionsFor, ListTopics, @@ -76,6 +80,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed case Failure(e) => Future.failed(e) }(ExecutionContexts.sameThreadExecutionContext) + @deprecated("use `getCommittedOffsets`", "2.0.3") def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] = (consumerActor ? GetCommittedOffset(partition))(timeout) .mapTo[CommittedOffset] @@ -85,6 +90,15 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed case Failure(e) => Future.failed(e) }(ExecutionContexts.sameThreadExecutionContext) + def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] = + (consumerActor ? GetCommittedOffsets(partitions))(timeout) + .mapTo[CommittedOffsets] + .map(_.response) + .flatMap { + case Success(res) => Future.successful(res) + case Failure(e) => Future.failed(e) + }(ExecutionContexts.sameThreadExecutionContext) + def close(): Unit = if (managedActor) { consumerActor ! KafkaConsumerActor.Stop diff --git a/docs/src/main/paradox/consumer-metadata.md b/docs/src/main/paradox/consumer-metadata.md index bcbf73363..243ffe2b4 100644 --- a/docs/src/main/paradox/consumer-metadata.md +++ b/docs/src/main/paradox/consumer-metadata.md @@ -23,7 +23,7 @@ The supported metadata are | Partitions | @scala[Future[List[PartitionInfo]]]@java[CompletionStage[java.util.List[PartitionInfo]]] | | Beginning offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] | | End offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] | -| Committed offset | @scala[Future[OffsetAndMetadata]]@java[CompletionStage[OffsetAndMetadata]] | +| Committed offsets | @scala[Future[Map[TopicPartition, OffsetAndMetadata]]]@java[CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]]] | @@@ warning @@ -59,7 +59,7 @@ The supported metadata are | GetBeginningOffsets | BeginningOffsets | | GetEndOffsets | EndOffsets | | GetOffsetsForTimes | OffsetsForTimes | -| GetCommittedOffset | CommittedOffset | +| GetCommittedOffsets | CommittedOffsets | These requests are blocking within the Kafka client library up to a timeout configured by `metadata-request-timeout` or `ConsumerSettings.withMetadataRequestTimeout` respectively. diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index a8e0ac39a..0ee470d55 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -308,10 +308,10 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside assert(offsetAndTs.offset() == 0, "Wrong offset in OffsetsForTimes (beginning)") } - // GetCommittedOffset - inside(Await.result(consumer ? GetCommittedOffset(partition0), 10.seconds)) { - case CommittedOffset(Success(offsetMeta), _) => - assert(offsetMeta == null, "Wrong offset in GetCommittedOffset") + // GetCommittedOffsets + inside(Await.result(consumer ? GetCommittedOffsets(Set(partition0)), 10.seconds)) { + case CommittedOffsets(Success(offsetMeta)) => + assert(offsetMeta.isEmpty, "Wrong offsets in GetCommittedOffsets") } // verify that consumption still works