Skip to content

Commit

Permalink
KafkaConsumerActor: don't use deprecated consumer.committed method
Browse files Browse the repository at this point in the history
  • Loading branch information
epalace authored and Eric Palacios committed Mar 3, 2020
1 parent 7690e69 commit 7e8c649
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 4 deletions.
32 changes: 32 additions & 0 deletions core/src/main/scala/akka/kafka/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ object Metadata {
/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `GetCommittedOffsets`", "2.0.3")
final case class GetCommittedOffset(partition: TopicPartition) extends Request with NoSerializationVerificationNeeded

@deprecated("use `CommittedOffsets`", "2.0.3")
final case class CommittedOffset(response: Try[OffsetAndMetadata], requestedPartition: TopicPartition)
extends Response
with NoSerializationVerificationNeeded {
Expand All @@ -185,5 +188,34 @@ object Metadata {
* Java API:
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
@deprecated("use `createGetCommittedOffsets`", "2.0.3")
def createGetCommittedOffset(partition: TopicPartition): GetCommittedOffset = GetCommittedOffset(partition)

/**
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
final case class GetCommittedOffsets(partitions: Set[TopicPartition])
extends Request
with NoSerializationVerificationNeeded
final case class CommittedOffsets(response: Try[Map[TopicPartition, OffsetAndMetadata]])
extends Response
with NoSerializationVerificationNeeded {

/**
* Java API
*/
def getResponse: Optional[java.util.Map[TopicPartition, OffsetAndMetadata]] =
response
.map { m =>
Optional.of(m.asJava)
}
.getOrElse(Optional.empty())
}

/**
* Java API:
* [[org.apache.kafka.clients.consumer.KafkaConsumer#committed()]]
*/
def createGetCommittedOffsets(partitions: java.util.Set[TopicPartition]): GetCommittedOffsets =
GetCommittedOffsets(partitions.asScala.toSet)
}
11 changes: 11 additions & 0 deletions core/src/main/scala/akka/kafka/internal/KafkaConsumerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,17 @@ import scala.util.control.NonFatal
consumer.offsetsForTimes(search, settings.getMetadataRequestTimeout).asScala.toMap
})

case Metadata.GetCommittedOffsets(partitions) =>
Metadata.CommittedOffsets(
Try {
consumer
.committed(partitions.asJava, settings.getMetadataRequestTimeout)
.asScala
.filterNot(_._2 == null)
.toMap
}
)

case Metadata.GetCommittedOffset(partition) =>
Metadata.CommittedOffset(
Try { consumer.committed(partition, settings.getMetadataRequestTimeout) },
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,22 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

@deprecated("use `getCommittedOffsets`", "2.0.3")
def getCommittedOffset(partition: TopicPartition): CompletionStage[OffsetAndMetadata] =
metadataClient
.getCommittedOffset(partition)
.toJava

def getCommittedOffsets(
partitions: java.util.Set[TopicPartition]
): CompletionStage[java.util.Map[TopicPartition, OffsetAndMetadata]] =
metadataClient
.getCommittedOffsets(partitions.asScala.toSet)
.map { committedOffsets =>
committedOffsets.asJava
}(ExecutionContexts.sameThreadExecutionContext)
.toJava

def close(): Unit =
metadataClient.close()
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

package akka.kafka.scaladsl

import java.util.Collections

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.ExecutionContexts
import akka.kafka.Metadata.{
BeginningOffsets,
CommittedOffset,
CommittedOffsets,
EndOffsets,
GetBeginningOffsets,
GetCommittedOffset,
GetCommittedOffsets,
GetEndOffsets,
GetPartitionsFor,
ListTopics,
Expand Down Expand Up @@ -76,6 +80,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

@deprecated("use `getCommittedOffsets`", "2.0.3")
def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] =
(consumerActor ? GetCommittedOffset(partition))(timeout)
.mapTo[CommittedOffset]
Expand All @@ -85,6 +90,15 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] =
(consumerActor ? GetCommittedOffsets(partitions))(timeout)
.mapTo[CommittedOffsets]
.map(_.response)
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)

def close(): Unit =
if (managedActor) {
consumerActor ! KafkaConsumerActor.Stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside
assert(offsetAndTs.offset() == 0, "Wrong offset in OffsetsForTimes (beginning)")
}

// GetCommittedOffset
inside(Await.result(consumer ? GetCommittedOffset(partition0), 10.seconds)) {
case CommittedOffset(Success(offsetMeta), _) =>
assert(offsetMeta == null, "Wrong offset in GetCommittedOffset")
// GetCommittedOffsets
inside(Await.result(consumer ? GetCommittedOffsets(Set(partition0)), 10.seconds)) {
case CommittedOffsets(Success(offsetMeta)) =>
assert(offsetMeta.isEmpty, "Wrong offsets in GetCommittedOffsets")
}

// verify that consumption still works
Expand Down

0 comments on commit 7e8c649

Please sign in to comment.