From be6d25a2cd02df3b3859120b94a200efc1bfd5ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 31 Aug 2023 08:10:26 +0200 Subject: [PATCH] fix: Java ProducerPushDestination transformation and interceptor broken (#978) --- .../EventProducerPushDestinationSpec.scala | 46 +++++++++++++++++++ .../EventProducerPushDestination.scala | 5 +- 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationSpec.scala diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationSpec.scala new file mode 100644 index 000000000..7a704cb4e --- /dev/null +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestinationSpec.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.consumer.javadsl + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.grpc.scaladsl.MetadataBuilder +import org.scalatest.wordspec.AnyWordSpecLike + +import java.util.Collections +import java.util.concurrent.CompletableFuture +import scala.annotation.nowarn + +class EventProducerPushDestinationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { + + "The Java DSL EventProducerPushDestination" should { + + "correctly map the transformationForOrigin to accept Scala GRPC metadata" in { + @nowarn("msg=never used") + val epsd = EventProducerPushDestination + .create("origin-id", Collections.emptyList(), system) + .withTransformationForOrigin((originId, metadata) => Transformation.identity) + val scalaEpsd = epsd.asScala + + val transformation = scalaEpsd.transformationForOrigin("someOrigin", MetadataBuilder.empty) + + transformation should be(Transformation.identity.delegate) + } + + "correctly map the interceptor to accept Scala GRPC metadata" in { + @nowarn("msg=never used") + val epsd = EventProducerPushDestination + .create("origin-id", Collections.emptyList(), system) + .withInterceptor((originId, metadata) => CompletableFuture.completedFuture(Done)) + val scalaEpsd = epsd.asScala + + val interceptResult = scalaEpsd.interceptor.get.intercept("someOrigin", MetadataBuilder.empty) + + interceptResult.futureValue should be(Done) + } + + } + +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala index 248d3bbe3..f37897fc0 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala @@ -8,6 +8,7 @@ import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.grpc.internal.JavaMetadataImpl import akka.grpc.javadsl.Metadata import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpResponse @@ -155,9 +156,9 @@ final class EventProducerPushDestination private ( new scaladsl.EventProducerPushDestination( journalPluginId.asScala, acceptedStreamId, - (origin, meta) => transformationForOrigin.apply(origin, meta.asInstanceOf[akka.grpc.javadsl.Metadata]).delegate, + (origin, meta) => transformationForOrigin.apply(origin, new JavaMetadataImpl(meta)).delegate, interceptor.asScala.map(javaInterceptor => - (streamId, meta) => javaInterceptor.intercept(streamId, meta.asInstanceOf[akka.grpc.javadsl.Metadata]).toScala), + (streamId, meta) => javaInterceptor.intercept(streamId, new JavaMetadataImpl(meta)).toScala), filters.asScala.toVector, protobufDescriptors.asScala.toVector, settings)