diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 8814c2120..9b5abc0ae 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -19,6 +19,7 @@ import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId import akka.persistence.typed.ReplicationId +import akka.persistence.typed.internal.ReplicatedEventMetadata import akka.projection.grpc.internal.proto.EntityIdOffset import akka.projection.grpc.internal.proto.FilterCriteria import akka.projection.grpc.internal.proto.PersistenceIdSeqNr @@ -268,9 +269,8 @@ import org.slf4j.LoggerFactory private def replicaIdHandledByThisStream(pid: String): Boolean = { replicaId match { - case None => true - case Some(id) => - !ReplicationId.isReplicationId(pid) || ReplicationId.fromString(pid).replicaId == id + case None => true + case Some(id) => ReplicationId.fromString(pid).replicaId == id } } @@ -427,7 +427,7 @@ import org.slf4j.LoggerFactory val pid = env.persistenceId // replicaId is used for validation of replay requests, to avoid replay for other replicas - if (replicaId.isEmpty && ReplicationId.isReplicationId(pid)) + if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata])) replicaId = Some(ReplicationId.fromString(pid).replicaId) if (producerFilter(env) && filter.matches(env)) {