diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala index cc332641b..b454419fc 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala @@ -28,6 +28,7 @@ import akka.projection.grpc.internal.proto.ExcludeTags import akka.projection.grpc.internal.proto.FilterCriteria import akka.projection.grpc.internal.proto.FilterReq import akka.projection.grpc.internal.proto.IncludeEntityIds +import akka.projection.grpc.internal.proto.IncludeTags import akka.projection.grpc.internal.proto.PersistenceIdSeqNr import akka.projection.grpc.internal.proto.ReplayReq import akka.projection.grpc.internal.proto.StreamIn @@ -236,7 +237,13 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" override lazy val allEnvelopes = envelopes ++ Vector( createEnvelope(PersistenceId(entityType, "d"), 1, "d1"), - createEnvelope(PersistenceId(entityType, "d"), 2, "d2")) + createEnvelope(PersistenceId(entityType, "d"), 2, "d2", tags = Set("WIP"))) + + // filter should not exclude events from replay, e.g. d1 without the WIP tag + val filterCriteria = List( + FilterCriteria(FilterCriteria.Message.ExcludeMatchingEntityIds(ExcludeRegexEntityIds(List(".*")))), + FilterCriteria(FilterCriteria.Message.IncludeTags(IncludeTags(List("WIP"))))) + inPublisher.sendNext(StreamIn(StreamIn.Message.Filter(FilterReq(filterCriteria)))) inPublisher.sendNext( StreamIn( @@ -253,7 +260,7 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" StreamIn(StreamIn.Message.Replay(ReplayReq(List(PersistenceIdSeqNr(PersistenceId(entityType, "d").id, 1L)))))) // it will not emit replayed event until there is some progress from the ordinary envSource, probably ok outProbe.expectNoMessage() - envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1")) + envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1", tags = Set("WIP"))) outProbe.expectNext().event shouldBe "e1" outProbe.expectNext().event shouldBe "d1" outProbe.expectNext().event shouldBe "d2" diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 03e31d4ad..eeeb670cf 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -183,26 +183,15 @@ import org.slf4j.LoggerFactory replayHasBeenPulled = false replayEnv match { - case ReplayEnvelope(persistenceId, Some(env)) => + case ReplayEnvelope(_, Some(env)) => // the predicate to replay events from start for a given pid - // Note: we do not apply the producer filter here as that may be what triggered the replay - if (filter.matches(env)) { - log.traceN( - "Stream [{}]: Push replayed event persistenceId [{}], seqNr [{}]", - logPrefix, - env.persistenceId, - env.sequenceNr) - push(outEnv, env) - } else { - log.debugN( - "Stream [{}]: Filter out replayed event persistenceId [{}], seqNr [{}]. Cancel remaining replay.", - logPrefix, - env.persistenceId, - env.sequenceNr) - val queue = replayInProgress(persistenceId).queue - queue.cancel() - replayCompleted() - } + // Note: we do not apply the filter here as that may be what triggered the replay + log.traceN( + "Stream [{}]: Push replayed event persistenceId [{}], seqNr [{}]", + logPrefix, + env.persistenceId, + env.sequenceNr) + push(outEnv, env) case ReplayEnvelope(persistenceId, None) => log.debug2("Stream [{}]: Completed replay of persistenceId [{}]", logPrefix, persistenceId)