Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't filter events from replay #883

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.projection.grpc.internal.proto.ExcludeTags
import akka.projection.grpc.internal.proto.FilterCriteria
import akka.projection.grpc.internal.proto.FilterReq
import akka.projection.grpc.internal.proto.IncludeEntityIds
import akka.projection.grpc.internal.proto.IncludeTags
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.ReplayReq
import akka.projection.grpc.internal.proto.StreamIn
Expand Down Expand Up @@ -236,7 +237,13 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
override lazy val allEnvelopes = envelopes ++
Vector(
createEnvelope(PersistenceId(entityType, "d"), 1, "d1"),
createEnvelope(PersistenceId(entityType, "d"), 2, "d2"))
createEnvelope(PersistenceId(entityType, "d"), 2, "d2", tags = Set("WIP")))

// filter should not exclude events from replay, e.g. d1 without the WIP tag
val filterCriteria = List(
FilterCriteria(FilterCriteria.Message.ExcludeMatchingEntityIds(ExcludeRegexEntityIds(List(".*")))),
FilterCriteria(FilterCriteria.Message.IncludeTags(IncludeTags(List("WIP")))))
inPublisher.sendNext(StreamIn(StreamIn.Message.Filter(FilterReq(filterCriteria))))

inPublisher.sendNext(
StreamIn(
Expand All @@ -253,7 +260,7 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
StreamIn(StreamIn.Message.Replay(ReplayReq(List(PersistenceIdSeqNr(PersistenceId(entityType, "d").id, 1L))))))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
outProbe.expectNoMessage()
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1"))
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1", tags = Set("WIP")))
outProbe.expectNext().event shouldBe "e1"
outProbe.expectNext().event shouldBe "d1"
outProbe.expectNext().event shouldBe "d2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,15 @@ import org.slf4j.LoggerFactory
replayHasBeenPulled = false

replayEnv match {
case ReplayEnvelope(persistenceId, Some(env)) =>
case ReplayEnvelope(_, Some(env)) =>
// the predicate to replay events from start for a given pid
// Note: we do not apply the producer filter here as that may be what triggered the replay
if (filter.matches(env)) {
log.traceN(
"Stream [{}]: Push replayed event persistenceId [{}], seqNr [{}]",
logPrefix,
env.persistenceId,
env.sequenceNr)
push(outEnv, env)
} else {
log.debugN(
"Stream [{}]: Filter out replayed event persistenceId [{}], seqNr [{}]. Cancel remaining replay.",
logPrefix,
env.persistenceId,
env.sequenceNr)
val queue = replayInProgress(persistenceId).queue
queue.cancel()
replayCompleted()
}
// Note: we do not apply the filter here as that may be what triggered the replay
log.traceN(
"Stream [{}]: Push replayed event persistenceId [{}], seqNr [{}]",
logPrefix,
env.persistenceId,
env.sequenceNr)
push(outEnv, env)

case ReplayEnvelope(persistenceId, None) =>
log.debug2("Stream [{}]: Completed replay of persistenceId [{}]", logPrefix, persistenceId)
Expand Down