From 4ab19bf445284c38fae603e7b3be806c79d1c00e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 Oct 2023 08:21:09 +0200 Subject: [PATCH] fix: EventWriter snapshot event * The EventWriter.Write now takes a isSnapshotEvent parameter, which means that sequence number gaps before the snapshot will be filled with filtered event payloads * The EventEnvelope.source is "SN" for snapshot events * No dependency to akka-persistence-r2dbc from akka-projection-grpc so hardcoding source "SN" (can't use EnvelopeOrigin) --- .../EventPusherConsumerServiceImpl.scala | 17 ++++++++++++++++- project/Dependencies.scala | 2 +- samples/grpc/iot-service-scala/build.sbt | 2 +- samples/grpc/local-drone-control-java/pom.xml | 2 +- .../grpc/local-drone-control-scala/build.sbt | 2 +- .../pom.xml | 2 +- .../build.sbt | 2 +- .../shopping-analytics-service-java/pom.xml | 2 +- .../shopping-analytics-service-scala/build.sbt | 2 +- samples/grpc/shopping-cart-service-java/pom.xml | 2 +- .../grpc/shopping-cart-service-scala/build.sbt | 2 +- .../shopping-cart-service-java/pom.xml | 2 +- .../shopping-cart-service-scala/build.sbt | 2 +- 13 files changed, 28 insertions(+), 13 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 786033495..0c023a177 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -25,10 +25,23 @@ import akka.projection.grpc.internal.proto.EventConsumerServicePowerApi import akka.stream.scaladsl.Source import io.grpc.Status import org.slf4j.LoggerFactory - import scala.concurrent.ExecutionContext import scala.concurrent.Promise +import akka.persistence.query.typed.EventEnvelope + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object EventPusherConsumerServiceImpl { + // See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency + // to akka-persistence-r2dbc here + def fromSnapshot(env: EventEnvelope[_]): Boolean = + env.source == "SN" + +} + /** * INTERNAL API * @@ -40,6 +53,7 @@ private[akka] final class EventPusherConsumerServiceImpl( preferProtobuf: ProtoAnySerialization.Prefer)(implicit system: ActorSystem[_]) extends EventConsumerServicePowerApi { + import EventPusherConsumerServiceImpl.fromSnapshot import ProtobufProtocolConversions._ private val logger = LoggerFactory.getLogger(classOf[EventPusherConsumerServiceImpl]) @@ -134,6 +148,7 @@ private[akka] final class EventPusherConsumerServiceImpl( transformedEventEnvelope.persistenceId, transformedEventEnvelope.sequenceNr, transformedEventEnvelope.eventOption.getOrElse(FilteredPayload), + isSnapshotEvent = fromSnapshot(transformedEventEnvelope), transformedEventEnvelope.eventMetadata, transformedEventEnvelope.tags, _))(destination.eventProducerPushDestination.settings.journalWriteTimeout, system.scheduler) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4d9d33b86..2002b4a94 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -19,7 +19,7 @@ object Dependencies { val AkkaProjectionVersionInDocs = "1.5" object Versions { - val akka = sys.props.getOrElse("build.akka.version", "2.9.0-M3") + val akka = sys.props.getOrElse("build.akka.version", "2.9.0-M3+24-4e2bc5b5-SNAPSHOT") val akkaPersistenceCassandra = "1.2.0-M1" val akkaPersistenceJdbc = "5.3.0-M1" val akkaPersistenceR2dbc = "1.2.0-M7" diff --git a/samples/grpc/iot-service-scala/build.sbt b/samples/grpc/iot-service-scala/build.sbt index 2fff10770..f20781453 100644 --- a/samples/grpc/iot-service-scala/build.sbt +++ b/samples/grpc/iot-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0-M3+24-4e2bc5b5-SNAPSHOT" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/local-drone-control-java/pom.xml b/samples/grpc/local-drone-control-java/pom.xml index 334b0375f..6678c7c9c 100644 --- a/samples/grpc/local-drone-control-java/pom.xml +++ b/samples/grpc/local-drone-control-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0-M3+24-4e2bc5b5-SNAPSHOT 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index 2879e738c..58c7baa2b 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -31,7 +31,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0-M3+24-4e2bc5b5-SNAPSHOT" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml index 11d581e24..ae30d6c31 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml +++ b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0-M3+24-4e2bc5b5-SNAPSHOT 1.5.0-M5 1.2.0-M6 1.5.0-M1 diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index c96b158cd..cc26433ef 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0-M3+24-4e2bc5b5-SNAPSHOT" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M6" diff --git a/samples/grpc/shopping-analytics-service-java/pom.xml b/samples/grpc/shopping-analytics-service-java/pom.xml index d3e6f518e..40c9942ef 100644 --- a/samples/grpc/shopping-analytics-service-java/pom.xml +++ b/samples/grpc/shopping-analytics-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0-M3+24-4e2bc5b5-SNAPSHOT 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index 7ef402bb1..c2bff6e50 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0-M3+24-4e2bc5b5-SNAPSHOT" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/grpc/shopping-cart-service-java/pom.xml b/samples/grpc/shopping-cart-service-java/pom.xml index 3831b892d..69ca83d0e 100644 --- a/samples/grpc/shopping-cart-service-java/pom.xml +++ b/samples/grpc/shopping-cart-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0-M3+24-4e2bc5b5-SNAPSHOT 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/grpc/shopping-cart-service-scala/build.sbt b/samples/grpc/shopping-cart-service-scala/build.sbt index 54912c5ab..f0e1f4a6c 100644 --- a/samples/grpc/shopping-cart-service-scala/build.sbt +++ b/samples/grpc/shopping-cart-service-scala/build.sbt @@ -28,7 +28,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = "2.9.0-M3" +val AkkaVersion = "2.9.0-M3+24-4e2bc5b5-SNAPSHOT" val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7" diff --git a/samples/replicated/shopping-cart-service-java/pom.xml b/samples/replicated/shopping-cart-service-java/pom.xml index 918c7df3a..4d32d42c6 100644 --- a/samples/replicated/shopping-cart-service-java/pom.xml +++ b/samples/replicated/shopping-cart-service-java/pom.xml @@ -17,7 +17,7 @@ UTF-8 - 2.9.0-M3 + 2.9.0-M3+24-4e2bc5b5-SNAPSHOT 1.5.0-M5 1.2.0-M7 1.5.0-M1 diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt index 1aa75bcaf..db45817f9 100644 --- a/samples/replicated/shopping-cart-service-scala/build.sbt +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -29,7 +29,7 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = sys.props.getOrElse("akka.version", "2.9.0-M3") +val AkkaVersion = sys.props.getOrElse("akka.version", "2.9.0-M3+24-4e2bc5b5-SNAPSHOT") val AkkaHttpVersion = "10.6.0-M2" val AkkaManagementVersion = "1.5.0-M1" val AkkaPersistenceR2dbcVersion = "1.2.0-M7"