Skip to content

Commit

Permalink
isReplicationId adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 12, 2024
1 parent 23df9b6 commit 81b00fd
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.internal.ReplicatedEventMetadata
import akka.projection.grpc.internal.proto.EntityIdOffset
import akka.projection.grpc.internal.proto.FilterCriteria
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
Expand Down Expand Up @@ -268,9 +269,8 @@ import org.slf4j.LoggerFactory

private def replicaIdHandledByThisStream(pid: String): Boolean = {
replicaId match {
case None => true
case Some(id) =>
!ReplicationId.isReplicationId(pid) || ReplicationId.fromString(pid).replicaId == id
case None => true
case Some(id) => ReplicationId.fromString(pid).replicaId == id
}
}

Expand Down Expand Up @@ -427,7 +427,7 @@ import org.slf4j.LoggerFactory
val pid = env.persistenceId

// replicaId is used for validation of replay requests, to avoid replay for other replicas
if (replicaId.isEmpty && ReplicationId.isReplicationId(pid))
if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata]))
replicaId = Some(ReplicationId.fromString(pid).replicaId)

if (producerFilter(env) && filter.matches(env)) {
Expand Down

0 comments on commit 81b00fd

Please sign in to comment.