Skip to content

Commit

Permalink
Add MetadataClient getCommittedOffsets
Browse files Browse the repository at this point in the history
* To avoid the use of the deprecated consumer.committed method
* Deprecate the `GetCommittedOffset` variant
  • Loading branch information
ennru authored Mar 5, 2020
1 parent b8a9871 commit 843db85
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 6 deletions.
28 changes: 28 additions & 0 deletions core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ object Metadata {
/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `GetCommittedOffsets`", "2.0.3")
final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded

@deprecated("use `CommittedOffsets`", "2.0.3")
final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition)
extends Response
with NoSerializationVerificationNeeded {
Expand All @@ -185,5 +188,30 @@ object Metadata {
* Java API:
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `createGetCommittedOffsets`", "2.0.3")
def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
final case class GetCommittedOffsets(partitions: Set[TopicPartition])
extends Request
with NoSerializationVerificationNeeded
final case class CommittedOffsets(response: Try[Map[TopicPartition, OffsetAndMetadata]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[TopicPartition, OffsetAndMetadata]] =
Optional.ofNullable(response.toOption.map(_.asJava).orNull)
}

/**
* Java API:
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
def createGetCommittedOffsets(partitions: java.util.Set[TopicPartition]): GetCommittedOffsets =
GetCommittedOffsets(partitions.asScala.toSet)
}
11 changes: 11 additions & 0 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,17 @@ import scala.util.control.NonFatal
consumer.offsetsForTimes(search, settings.getMetadataRequestTimeout).asScala.toMap
})

case Metadata.GetCommittedOffsets(partitions) =>
Metadata.CommittedOffsets(
Try {
consumer
.committed(partitions.asJava, settings.getMetadataRequestTimeout)
.asScala
.filterNot(_._2 == null)
.toMap
}
)

case Metadata.GetCommittedOffset(partition) =>
Metadata.CommittedOffset(
Try { consumer.committed(partition, settings.getMetadataRequestTimeout) },
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

@deprecated("use `getCommittedOffsets`", "2.0.3")
def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] =
metadataClient
.getCommittedOffset(partition)
.toJava

def getCommittedOffsets(
partitions: java.util.Set[TopicPartition]
): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] =
metadataClient
.getCommittedOffsets(partitions.asScala.toSet)
.map { committedOffsets =>
committedOffsets.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def close(): Unit =
metadataClient.close()
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

package akka.kafka.scaladsl

import java.util.Collections

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.ExecutionContexts
import akka.kafka.Metadata.{
BeginningOffsets,
CommittedOffset,
CommittedOffsets,
EndOffsets,
GetBeginningOffsets,
GetCommittedOffset,
GetCommittedOffsets,
GetEndOffsets,
GetPartitionsFor,
ListTopics,
Expand Down Expand Up @@ -76,6 +80,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

@deprecated("use `getCommittedOffsets`", "2.0.3")
def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] =
(consumerActor ? GetCommittedOffset(partition))(timeout)
.mapTo[CommittedOffset]
Expand All @@ -85,6 +90,15 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] =
(consumerActor ? GetCommittedOffsets(partitions))(timeout)
.mapTo[CommittedOffsets]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def close(): Unit =
if (managedActor) {
consumerActor ! KafkaConsumerActor.Stop
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/paradox/consumer-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The supported metadata are
| Partitions | @scala[Future[List[PartitionInfo]]]@java[CompletionStage[java.util.List[PartitionInfo]]] |
| Beginning offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] |
| End offsets | @scala[Future[Map[TopicPartition, Long]]]@java[CompletionStage[java.util.Map[TopicPartition, java.lang.Long]]] |
| Committed offset | @scala[Future[OffsetAndMetadata]]@java[CompletionStage[OffsetAndMetadata]] |
| Committed offsets | @scala[Future[Map[TopicPartition, OffsetAndMetadata]]]@java[CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]]] |

@@@ warning

Expand Down Expand Up @@ -59,7 +59,7 @@ The supported metadata are
| GetBeginningOffsets | BeginningOffsets |
| GetEndOffsets | EndOffsets |
| GetOffsetsForTimes | OffsetsForTimes |
| GetCommittedOffset | CommittedOffset |
| GetCommittedOffsets | CommittedOffsets |

These requests are blocking within the Kafka client library up to a timeout configured by `metadata-request-timeout` or `ConsumerSettings.withMetadataRequestTimeout` respectively.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside
assert(offsetAndTs.offset() == 0, "Wrong offset in OffsetsForTimes (beginning)")
}

// GetCommittedOffset
inside(Await.result(consumer ? GetCommittedOffset(partition0), 10.seconds)) {
case CommittedOffset(Success(offsetMeta), _) =>
assert(offsetMeta == null, "Wrong offset in GetCommittedOffset")
// GetCommittedOffsets
inside(Await.result(consumer ? GetCommittedOffsets(Set(partition0)), 10.seconds)) {
case CommittedOffsets(Success(offsetMeta)) =>
assert(offsetMeta.isEmpty, "Wrong offsets in GetCommittedOffsets")
}

// verify that consumption still works
Expand Down

0 comments on commit 843db85

Please sign in to comment.