From 219b19b9567ad8ef649379bb6b8741d6d302b3f2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Jun 2024 13:53:03 +0200 Subject: [PATCH] integration test --- ...hReplicationMigrationIntegrationSpec.scala | 329 ++++++++++++++++++ .../ReplicationMigrationIntegrationSpec.scala | 317 +++++++++++++++++ .../grpc/internal/EventPusher.scala | 34 +- build.sbt | 4 + project/Dependencies.scala | 2 + 5 files changed, 671 insertions(+), 15 deletions(-) create mode 100644 akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/PushReplicationMigrationIntegrationSpec.scala create mode 100644 akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationMigrationIntegrationSpec.scala diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/PushReplicationMigrationIntegrationSpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/PushReplicationMigrationIntegrationSpec.scala new file mode 100644 index 000000000..dd285f14f --- /dev/null +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/PushReplicationMigrationIntegrationSpec.scala @@ -0,0 +1,329 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.Replication.EdgeReplication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.testkit.SocketUtil +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +object PushReplicationMigrationIntegrationSpec { + + private def config(dc: ReplicaId): Config = { + val journalTable = if (dc.id == "") "event_journal" else s"event_journal_${dc.id}" + val timestampOffsetTable = + if (dc.id == "") "akka_projection_timestamp_offset_store" else s"akka_projection_timestamp_offset_store_${dc.id}" + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + journal.table = "$journalTable" + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "$timestampOffsetTable" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + } + + private val DCA = ReplicaId("DCA") + private val DCB = ReplicaId("DCB") + private val EdgeReplicaA = ReplicaId("") + +} + +class PushReplicationMigrationIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecA", + PushReplicationMigrationIntegrationSpec + .config(PushReplicationMigrationIntegrationSpec.DCA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing { + import PushReplicationMigrationIntegrationSpec._ + import ReplicationMigrationIntegrationSpec.HelloWorld + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[PushReplicationMigrationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecB", + PushReplicationMigrationIntegrationSpec + .config(PushReplicationMigrationIntegrationSpec.DCB) + .withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecEdgeA", + PushReplicationMigrationIntegrationSpec + .config(PushReplicationMigrationIntegrationSpec.EdgeReplicaA) + .withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(2, "127.0.0.1").map(_.getPort) + def grpcClientSettings(index: Int) = + GrpcClientSettings.connectToServiceAt("127.0.0.1", grpcPorts(index)).withTls(false) + private val replicaA = Replica(DCA, 2, grpcClientSettings(0)) + private val replicaB = Replica(DCB, 2, grpcClientSettings(1)) + private val allCloudReplicas = Set(replicaA, replicaB) + + /* + private val _ = Replica( + EdgeReplicaA, + 2, + // Note: there is no way to actively connect to this replica, instead the GrpcClientSettings would be how _it_ connects + // (to DCA in this case). The normal replicas does not have the Replica in their lists of all replicas + replicaA.grpcClientSettings) + */ + + private val testKitsPerDc = + Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), EdgeReplicaA -> ActorTestKit(systems(2))) + private val systemPerDc = Map(DCA -> system, DCB -> systems(1), EdgeReplicaA -> systems(2)) + private var replicationA: Replication[HelloWorld.Command] = _ + private var replicationB: Replication[HelloWorld.Command] = _ + private var edgeReplicationA: EdgeReplication[HelloWorld.Command] = _ + private val entityIdOne = "one" + private val entityIdTwo = "two" + + override protected def beforeAll(): Unit = { + super.beforeAll() + systemPerDc.values.foreach(beforeAllDeleteFromTables) + } + + def startReplica(replicaSystem: ActorSystem[_], replica: Replica): Future[Replication[HelloWorld.Command]] = { + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + replicaSystem.name, + replica.grpcClientSettings.defaultPort) + + val grpcPort = replica.grpcClientSettings.defaultPort + val settings = ReplicationSettings[HelloWorld.Command]( + HelloWorld.EntityType, + replica.replicaId, + EventProducerSettings(replicaSystem), + allCloudReplicas, + 10.seconds, + 8, + R2dbcReplication()).withEdgeReplication(true) + val started = + Replication.grpcReplication(settings)(HelloWorld.replicated)(replicaSystem) + + // start producer server + Http()(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => started) + } + + def startEdgeReplica( + replicaSystem: ActorSystem[_], + selfReplicaId: ReplicaId, + connectTo: Replica): EdgeReplication[HelloWorld.Command] = { + val settings = ReplicationSettings[HelloWorld.Command]( + HelloWorld.EntityType, + selfReplicaId, + EventProducerSettings(replicaSystem), + Set(connectTo), + 10.seconds, + // few on edge node (but 2 rather than 1 here to make sure test actually covers parallel updates) + 2, + R2dbcReplication()).withEdgeReplication(true) + Replication.grpcEdgeReplication(settings)(HelloWorld.replicated)(replicaSystem) + } + + "Replication over gRPC" should { + "form one edge cluster" in { + // Probably not realistic to start with non-replicated at edge, and then make it replicated, + // but that will test the push aspect, which is most different compared to ReplicationMigrationIntegrationSpec + val testKit = testKitsPerDc(EdgeReplicaA) + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + + "persist events with non-replicated EventSourcedBehavior" in { + val testKit = testKitsPerDc(EdgeReplicaA) + Set(entityIdOne, entityIdTwo).foreach { entityId => + withClue(s"for entity id $entityId") { + val entity = testKit.spawn(HelloWorld.nonReplicated(PersistenceId.of(HelloWorld.EntityType.name, entityId))) + + import akka.actor.typed.scaladsl.AskPattern._ + entity.ask(HelloWorld.SetGreeting("hello 1 from non-replicated", _)).futureValue + entity.ask(HelloWorld.SetGreeting("hello 2 from non-replicated", _)).futureValue + testKit.stop(entity) + } + } + } + + "form three one node clusters" in { + testKitsPerDc.foreach { + case (dc, testKit) => + if (dc != EdgeReplicaA) { + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + } + + "start two cloud replicas and one edge replica" in { + replicationA = startReplica(systemPerDc(DCA), replicaA).futureValue + replicationB = startReplica(systemPerDc(DCB), replicaB).futureValue + edgeReplicationA = startEdgeReplica(systemPerDc(EdgeReplicaA), EdgeReplicaA, replicaA) + logger.info("All three replication/producer services bound") + } + + "recover from non-replicated events" in { + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + Set(entityIdOne, entityIdTwo).foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(HelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===("hello 2 from non-replicated") + }, 10.seconds) + } + } + } + } + } + + "replicate writes directly from cloud to edge" in { + for { + n <- 1 to 5 + entityId <- Set(entityIdOne, entityIdTwo) + } { + logger.infoN("Updating greeting {} for [{}] from dc [{}]", n, entityId, DCA) + replicationA + .entityRefFactory(entityId) + .ask(HelloWorld.SetGreeting(s"hello $n from ${DCA.id}", _)) + .futureValue + + val edgeEntityRef = edgeReplicationA.entityRefFactory(entityId) + val probe = testKit.createTestProbe() + probe.awaitAssert({ + edgeEntityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===(s"hello $n from ${DCA.id}") + }, 10.seconds) + + // and also B ofc (unrelated to edge replication but for good measure) + val dcBEntityRef = replicationB.entityRefFactory(entityId) + probe.awaitAssert({ + dcBEntityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===(s"hello $n from ${DCA.id}") + }, 10.seconds) + } + } + + "replicate writes from edge node to cloud" in { + for { + n <- 6 to 10 + entityId <- Set(entityIdOne, entityIdTwo) + } { + logger.infoN("Updating greeting {} for [{}] from edge dc", n, entityId) + edgeReplicationA + .entityRefFactory(entityId) + .ask(HelloWorld.SetGreeting(s"hello $n from edge", _)) + .futureValue + + val probe = testKit.createTestProbe() + // should reach the direct replica + val dcAEntityRef = replicationA.entityRefFactory(entityId) + probe.awaitAssert({ + dcAEntityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===(s"hello $n from edge") + }, 10.seconds) + + // then indirectly replica B + val dcBEntityRef = replicationB.entityRefFactory(entityId) + probe.awaitAssert({ + dcBEntityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===(s"hello $n from edge") + }, 10.seconds) + } + } + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all three DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationMigrationIntegrationSpec.scala b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationMigrationIntegrationSpec.scala new file mode 100644 index 000000000..11cbdd303 --- /dev/null +++ b/akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/replication/ReplicationMigrationIntegrationSpec.scala @@ -0,0 +1,317 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ + +package akka.projection.grpc.replication + +import java.time.Instant + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import akka.Done +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.grpc.GrpcClientSettings +import akka.http.scaladsl.Http +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.projection.grpc.TestContainerConf +import akka.projection.grpc.TestDbLifecycle +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.replication.scaladsl.Replica +import akka.projection.grpc.replication.scaladsl.ReplicatedBehaviors +import akka.projection.grpc.replication.scaladsl.Replication +import akka.projection.grpc.replication.scaladsl.ReplicationSettings +import akka.projection.r2dbc.scaladsl.R2dbcReplication +import akka.serialization.jackson.JsonSerializable +import akka.testkit.SocketUtil +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + +object ReplicationMigrationIntegrationSpec { + + private def config(dc: ReplicaId): Config = { + val journalTable = if (dc.id == "") "event_journal" else s"event_journal_${dc.id}" + val timestampOffsetTable = + if (dc.id == "") "akka_projection_timestamp_offset_store" else s"akka_projection_timestamp_offset_store_${dc.id}" + ConfigFactory.parseString(s""" + akka.actor.provider = cluster + akka.http.server.preview.enable-http2 = on + akka.persistence.r2dbc { + journal.table = "$journalTable" + query { + refresh-interval = 500 millis + # reducing this to have quicker test, triggers backtracking earlier + backtracking.behind-current-time = 3 seconds + } + } + akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } + } + akka.projection.r2dbc.offset-store { + timestamp-offset-table = "$timestampOffsetTable" + } + akka.remote.artery.canonical.host = "127.0.0.1" + akka.remote.artery.canonical.port = 0 + akka.actor.testkit.typed { + filter-leeway = 10s + system-shutdown-default = 30s + } + """) + } + + private val DCA = ReplicaId("") + private val DCB = ReplicaId("DCB") + private val DCC = ReplicaId("DCC") + + object HelloWorld { + + val EntityType: EntityTypeKey[Command] = EntityTypeKey[Command]("hello-world") + + sealed trait Command extends JsonSerializable + final case class Get(replyTo: ActorRef[String]) extends Command + final case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) extends Command + + sealed trait Event extends JsonSerializable + final case class GreetingChanged(greeting: String, timestamp: Instant) extends Event + + object State { + val initial = + State("Hello world", Instant.EPOCH) + } + + case class State(greeting: String, timestamp: Instant) extends JsonSerializable + + def replicated(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = + replicatedBehaviors.setup { replicationContext => + nonReplicated(replicationContext.persistenceId) + } + + def nonReplicated(persistenceId: PersistenceId): EventSourcedBehavior[Command, Event, State] = + EventSourcedBehavior[Command, Event, State]( + persistenceId, + State.initial, { + case (State(greeting, _), Get(replyTo)) => + replyTo ! greeting + Effect.none + case (_, SetGreeting(greeting, replyTo)) => + Effect + .persist(GreetingChanged(greeting, Instant.now())) + .thenRun((_: State) => replyTo ! Done) + }, { + case (currentState, GreetingChanged(newGreeting, newTimestamp)) => + if (newTimestamp.isAfter(currentState.timestamp)) + currentState.copy(newGreeting, newTimestamp) + else currentState + }) + } +} + +class ReplicationMigrationIntegrationSpec(testContainerConf: TestContainerConf) + extends ScalaTestWithActorTestKit( + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecA", + ReplicationMigrationIntegrationSpec + .config(ReplicationMigrationIntegrationSpec.DCA) + .withFallback(testContainerConf.config)) + .toTyped) + with AnyWordSpecLike + with TestDbLifecycle + with BeforeAndAfterAll + with LogCapturing { + import ReplicationMigrationIntegrationSpec._ + implicit val ec: ExecutionContext = system.executionContext + + def this() = this(new TestContainerConf) + + private val logger = LoggerFactory.getLogger(classOf[ReplicationMigrationIntegrationSpec]) + override def typedSystem: ActorSystem[_] = testKit.system + + private val systems = Seq[ActorSystem[_]]( + typedSystem, + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecB", + ReplicationMigrationIntegrationSpec.config(DCB).withFallback(testContainerConf.config)) + .toTyped, + akka.actor + .ActorSystem( + "ReplicationMigrationIntegrationSpecC", + ReplicationMigrationIntegrationSpec.config(DCC).withFallback(testContainerConf.config)) + .toTyped) + + private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, "127.0.0.1").map(_.getPort) + private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts) + private val allReplicas = allDcsAndPorts.map { + case (id, port) => + Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", port).withTls(false)) + }.toSet + + private val testKitsPerDc = Map(DCA -> testKit, DCB -> ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2))) + private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> systems(2)) + private val entityIds = Set("one", "two", "three") + + override protected def beforeAll(): Unit = { + super.beforeAll() + systemPerDc.values.foreach(beforeAllDeleteFromTables) + } + + def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): Replication[HelloWorld.Command] = { + val settings = ReplicationSettings[HelloWorld.Command]( + HelloWorld.EntityType.name, + selfReplicaId, + EventProducerSettings(replicaSystem), + allReplicas, + 10.seconds, + 8, + R2dbcReplication()) + Replication.grpcReplication(settings)(ReplicationMigrationIntegrationSpec.HelloWorld.replicated)(replicaSystem) + } + + "Replication over gRPC" should { + "form one cluster" in { + val testKit = testKitsPerDc.values.head + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + + "persist events with non-replicated EventSourcedBehavior" in { + val testKit = testKitsPerDc.values.head + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entity = testKit.spawn(HelloWorld.nonReplicated(PersistenceId.of(HelloWorld.EntityType.name, entityId))) + + import akka.actor.typed.scaladsl.AskPattern._ + entity.ask(HelloWorld.SetGreeting("hello 1 from non-replicated", _)).futureValue + entity.ask(HelloWorld.SetGreeting("hello 2 from non-replicated", _)).futureValue + testKit.stop(entity) + } + } + } + + "form three one node clusters" in { + // one has already been started + testKitsPerDc.values.drop(1).foreach { testKit => + val cluster = Cluster(testKit.system) + cluster.manager ! Join(cluster.selfMember.address) + testKit.createTestProbe().awaitAssert { + cluster.selfMember.status should ===(MemberStatus.Up) + } + } + } + + "start three replicas" in { + val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map { + case (replica, index) => + val system = systems(index) + logger + .infoN( + "Starting replica [{}], system [{}] on port [{}]", + replica.replicaId, + system.name, + replica.grpcClientSettings.defaultPort) + val started = startReplica(system, replica.replicaId) + val grpcPort = grpcPorts(index) + + // start producer server + Http()(system) + .newServerAt("127.0.0.1", grpcPort) + .bind(started.createSingleServiceHandler()) + .map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext) + .map(_ => replica.replicaId -> started) + }) + + replicasStarted.futureValue + logger.info("All three replication/producer services bound") + } + + "recover from non-replicated events" in { + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(HelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===("hello 2 from non-replicated") + }, 10.seconds) + } + } + } + } + } + + "replicate writes from one dc to the other two" in { + systemPerDc.keys.foreach { dc => + withClue(s"from ${dc.id}") { + Future + .sequence(entityIds.map { entityId => + logger.infoN("Updating greeting for [{}] from dc [{}]", entityId, dc.id) + ClusterSharding(systemPerDc(dc)) + .entityRefFor(HelloWorld.EntityType, entityId) + .ask(HelloWorld.SetGreeting(s"hello 3 from ${dc.id}", _)) + }) + .futureValue + + testKitsPerDc.values.foreach { testKit => + withClue(s"on ${testKit.system.name}") { + val probe = testKit.createTestProbe() + + entityIds.foreach { entityId => + withClue(s"for entity id $entityId") { + val entityRef = ClusterSharding(testKit.system) + .entityRefFor(HelloWorld.EntityType, entityId) + + probe.awaitAssert({ + entityRef + .ask(HelloWorld.Get.apply) + .futureValue should ===(s"hello 3 from ${dc.id}") + }, 10.seconds) + } + } + } + } + } + } + } + } + + protected override def afterAll(): Unit = { + logger.info("Shutting down all three DCs") + systems.foreach(_.terminate()) // speed up termination by terminating all at the once + // and then make sure they are completely shutdown + systems.foreach { system => + ActorTestKit.shutdown(system) + } + super.afterAll() + } +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala index 078903c2b..626a4b81d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala @@ -108,24 +108,30 @@ private[akka] object EventPusher { Flow[(EventEnvelope[Event], ProjectionContext)] .mapAsync(eps.settings.transformationParallelism) { case (envelope, projectionContext) => + val envelopeWithMetadata = + eps.replicatedEventMetadataTransformation(envelope.asInstanceOf[EventEnvelope[Any]]) match { + case None => envelope + case Some(metadata) => envelope.withMetadata(metadata) + } + val filteredTransformed = - if (replicatedEventOriginFilter(envelope) && eps.producerFilter( - envelope.asInstanceOf[EventEnvelope[Any]]) && - consumerFilter.matches(envelope)) { + if (replicatedEventOriginFilter(envelopeWithMetadata) && eps.producerFilter( + envelopeWithMetadata.asInstanceOf[EventEnvelope[Any]]) && + consumerFilter.matches(envelopeWithMetadata)) { if (logger.isTraceEnabled()) logger.trace( "Pushing event persistence id [{}], sequence number [{}]{}", - envelope.persistenceId, - envelope.sequenceNr, + envelopeWithMetadata.persistenceId, + envelopeWithMetadata.sequenceNr, startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) - transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization) + transformAndEncodeEvent(eps.transformation, envelopeWithMetadata, protoAnySerialization) } else { if (logger.isTraceEnabled()) logger.trace( "Filtering event persistence id [{}], sequence number [{}]{}", - envelope.persistenceId, - envelope.sequenceNr, + envelopeWithMetadata.persistenceId, + envelopeWithMetadata.sequenceNr, startMessage.replicaInfo.fold("")(ri => s", remote replica [${ri.replicaId}]")) Future.successful(None) @@ -135,13 +141,11 @@ private[akka] object EventPusher { case None => // Filtered or transformed to None, we still need to push a placeholder to not get seqnr gaps on the receiving side ( - ConsumeEventIn( - ConsumeEventIn.Message.FilteredEvent( - FilteredEvent( - persistenceId = envelope.persistenceId, - seqNr = envelope.sequenceNr, - slice = envelope.slice, - offset = offsetToProtoOffset(envelope.offset)))), + ConsumeEventIn(ConsumeEventIn.Message.FilteredEvent(FilteredEvent( + persistenceId = envelopeWithMetadata.persistenceId, + seqNr = envelopeWithMetadata.sequenceNr, + slice = envelopeWithMetadata.slice, + offset = offsetToProtoOffset(envelopeWithMetadata.offset)))), projectionContext) } } diff --git a/build.sbt b/build.sbt index 3a6b778e7..fe03c426d 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,10 @@ ThisBuild / dynverSeparator := "-" // append -SNAPSHOT to version when isSnapshot ThisBuild / dynverSonatypeSnapshots := true ThisBuild / resolvers += "Akka library repository".at("https://repo.akka.io/maven") +ThisBuild / resolvers ++= + (if (Dependencies.Versions.akka.endsWith("-SNAPSHOT")) + Seq("Akka library snapshot repository".at("https://repo.akka.io/snapshots")) + else Seq.empty) lazy val core = Project(id = "akka-projection-core", base = file("akka-projection-core")) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 492b2e4bb..253844ddd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -77,6 +77,7 @@ object Dependencies { val persistenceTestkit = "com.typesafe.akka" %% "akka-persistence-testkit" % Versions.akka % sbt.Test val akkaDiscovery = "com.typesafe.akka" %% "akka-discovery" % Versions.akka % sbt.Test val akkaClusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % Versions.akka % sbt.Test + val akkaPki = "com.typesafe.akka" %% "akka-pki" % Versions.akka % sbt.Test val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest % sbt.Test val scalatestJUnit = "org.scalatestplus" %% "junit-4-13" % (Versions.scalaTest + ".0") % sbt.Test @@ -258,6 +259,7 @@ object Dependencies { Test.logback, Test.scalatest, Test.akkaDiscovery, + Test.akkaPki, Test.postgresDriver, Test.h2Driver, Compile.r2dbcH2,