From edb4b3197a32a0fa28f6c18ca65f9f7245de8b5b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Apr 2023 15:26:36 +0200 Subject: [PATCH] perf: avoid replay requests for another replica (#843) * optimization to avoid fruitless replay queries that are for another RES replica --- .../grpc/internal/FilterStageSpec.scala | 29 +++++++++++++++++++ .../consumer/scaladsl/GrpcReadJournal.scala | 2 -- .../grpc/internal/FilterStage.scala | 18 ++++++++++-- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala index 68ae23a41..e51c16df2 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala @@ -19,6 +19,8 @@ import akka.persistence.query.TimestampOffset import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.projection.grpc.internal.proto.EntityIdOffset import akka.projection.grpc.internal.proto.ExcludeEntityIds import akka.projection.grpc.internal.proto.ExcludeRegexEntityIds @@ -247,6 +249,33 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" outProbe.expectNext().event shouldBe "d2" } + "replay from ReplayReq with RES ReplicaId" in new Setup { + // some more envelopes + override lazy val allEnvelopes = Vector( + createEnvelope(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId, 1, "a1"), + createEnvelope(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId, 1, "b1"), + createEnvelope(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId, 2, "a2")) + + envPublisher.sendNext(allEnvelopes.last) + outProbe.request(10) + outProbe.expectNext().event shouldBe "a2" + + inPublisher.sendNext( + StreamIn(StreamIn.Message.Replay( + ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId.id, 1L)))))) + // it will not emit replayed event until there is some progress from the ordinary envSource, probably ok + envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1")) + outProbe.expectNext().event shouldBe "e1" + outProbe.expectNext().event shouldBe "a1" + outProbe.expectNext().event shouldBe "a2" + + // but ignored if it's a request for another replicaId + inPublisher.sendNext( + StreamIn(StreamIn.Message.Replay( + ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId.id, 1L)))))) + outProbe.expectNoMessage() + } + "handle many replay requests" in new Setup { lazy val entityIds = (1 to 20).map(n => s"entity-$n") override lazy val allEnvelopes = envelopes ++ diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 5213b9285..76445293e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -293,8 +293,6 @@ final class GrpcReadJournal private ( StreamIn(StreamIn.Message.Filter(FilterReq(protoCriteria))) case ConsumerFilter.Replay(`streamId`, persistenceIdOffsets) => - // FIXME for RES, would it be possible to skip replicaId not handled by this stream here? - val protoPersistenceIdOffsets = persistenceIdOffsets.collect { case ConsumerFilter.PersistenceIdOffset(pid, seqNr) if sliceHandledByThisStream(pid) => PersistenceIdSeqNr(pid, seqNr) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 3dc1a4681..584dc28cd 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -17,6 +17,7 @@ import akka.persistence.Persistence import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicaId import akka.persistence.typed.ReplicationId import akka.projection.grpc.internal.proto.EntityIdOffset import akka.projection.grpc.internal.proto.FilterCriteria @@ -130,6 +131,8 @@ import org.slf4j.LoggerFactory case Failure(exc) => failStage(exc) } + private var replicaId: Option[ReplicaId] = None + private val logPrefix = s"$streamId (${sliceRange.min}-${sliceRange.max})" override def preStart(): Unit = { @@ -221,6 +224,14 @@ import org.slf4j.LoggerFactory log.trace2("Stream [{}]: updated filter to [{}}]", logPrefix, filter) } + private def replicaIdHandledByThisStream(pid: String): Boolean = { + replicaId match { + case None => true + case Some(id) => + !ReplicationId.isReplicationId(pid) || ReplicationId.fromString(pid).replicaId == id + } + } + private def sliceHandledByThisStream(pid: String): Boolean = { val slice = persistence.sliceForPersistenceId(pid) sliceRange.contains(slice) @@ -272,9 +283,8 @@ import org.slf4j.LoggerFactory private def replay(persistenceIdOffset: PersistenceIdSeqNr): Unit = { val fromSeqNr = persistenceIdOffset.seqNr - // FIXME what about the replicaId when using RES? val pid = persistenceIdOffset.persistenceId - if (sliceHandledByThisStream(pid)) { + if (replicaIdHandledByThisStream(pid) && sliceHandledByThisStream(pid)) { val sameInProgress = replayInProgress.get(pid) match { case Some(replay) if replay.fromSeqNr == fromSeqNr => @@ -361,6 +371,10 @@ import org.slf4j.LoggerFactory val env = grab(inEnv) val pid = env.persistenceId + // replicaId is used for validation of replay requests, to avoid replay for other replicas + if (replicaId.isEmpty && ReplicationId.isReplicationId(pid)) + replicaId = Some(ReplicationId.fromString(pid).replicaId) + // Note that the producer filter has higher priority - if a producer decides to filter events out the consumer // can never include them if (producerFilter(env) && filter.matches(pid)) {