Skip to content

Commit

Permalink
perf: avoid replay requests for another replica (#843)
Browse files Browse the repository at this point in the history
* optimization to avoid fruitless replay queries that are for another RES replica
  • Loading branch information
patriknw authored Apr 5, 2023
1 parent 3cfac26 commit edb4b31
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.projection.grpc.internal.proto.EntityIdOffset
import akka.projection.grpc.internal.proto.ExcludeEntityIds
import akka.projection.grpc.internal.proto.ExcludeRegexEntityIds
Expand Down Expand Up @@ -247,6 +249,33 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
outProbe.expectNext().event shouldBe "d2"
}

"replay from ReplayReq with RES ReplicaId" in new Setup {
// some more envelopes
override lazy val allEnvelopes = Vector(
createEnvelope(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId, 1, "a1"),
createEnvelope(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId, 1, "b1"),
createEnvelope(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId, 2, "a2"))

envPublisher.sendNext(allEnvelopes.last)
outProbe.request(10)
outProbe.expectNext().event shouldBe "a2"

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId.id, 1L))))))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1"))
outProbe.expectNext().event shouldBe "e1"
outProbe.expectNext().event shouldBe "a1"
outProbe.expectNext().event shouldBe "a2"

// but ignored if it's a request for another replicaId
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId.id, 1L))))))
outProbe.expectNoMessage()
}

"handle many replay requests" in new Setup {
lazy val entityIds = (1 to 20).map(n => s"entity-$n")
override lazy val allEnvelopes = envelopes ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,6 @@ final class GrpcReadJournal private (
StreamIn(StreamIn.Message.Filter(FilterReq(protoCriteria)))

case ConsumerFilter.Replay(`streamId`, persistenceIdOffsets) =>
// FIXME for RES, would it be possible to skip replicaId not handled by this stream here?

val protoPersistenceIdOffsets = persistenceIdOffsets.collect {
case ConsumerFilter.PersistenceIdOffset(pid, seqNr) if sliceHandledByThisStream(pid) =>
PersistenceIdSeqNr(pid, seqNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.persistence.Persistence
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.projection.grpc.internal.proto.EntityIdOffset
import akka.projection.grpc.internal.proto.FilterCriteria
Expand Down Expand Up @@ -130,6 +131,8 @@ import org.slf4j.LoggerFactory
case Failure(exc) => failStage(exc)
}

private var replicaId: Option[ReplicaId] = None

private val logPrefix = s"$streamId (${sliceRange.min}-${sliceRange.max})"

override def preStart(): Unit = {
Expand Down Expand Up @@ -221,6 +224,14 @@ import org.slf4j.LoggerFactory
log.trace2("Stream [{}]: updated filter to [{}}]", logPrefix, filter)
}

private def replicaIdHandledByThisStream(pid: String): Boolean = {
replicaId match {
case None => true
case Some(id) =>
!ReplicationId.isReplicationId(pid) || ReplicationId.fromString(pid).replicaId == id
}
}

private def sliceHandledByThisStream(pid: String): Boolean = {
val slice = persistence.sliceForPersistenceId(pid)
sliceRange.contains(slice)
Expand Down Expand Up @@ -272,9 +283,8 @@ import org.slf4j.LoggerFactory

private def replay(persistenceIdOffset: PersistenceIdSeqNr): Unit = {
val fromSeqNr = persistenceIdOffset.seqNr
// FIXME what about the replicaId when using RES?
val pid = persistenceIdOffset.persistenceId
if (sliceHandledByThisStream(pid)) {
if (replicaIdHandledByThisStream(pid) && sliceHandledByThisStream(pid)) {
val sameInProgress =
replayInProgress.get(pid) match {
case Some(replay) if replay.fromSeqNr == fromSeqNr =>
Expand Down Expand Up @@ -361,6 +371,10 @@ import org.slf4j.LoggerFactory
val env = grab(inEnv)
val pid = env.persistenceId

// replicaId is used for validation of replay requests, to avoid replay for other replicas
if (replicaId.isEmpty && ReplicationId.isReplicationId(pid))
replicaId = Some(ReplicationId.fromString(pid).replicaId)

// Note that the producer filter has higher priority - if a producer decides to filter events out the consumer
// can never include them
if (producerFilter(env) && filter.matches(pid)) {
Expand Down

0 comments on commit edb4b31

Please sign in to comment.