Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't trigger replay for duplicates #837

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ import scala.util.matching.Regex

private case class ReplayEnvelope(entityId: String, env: Option[EventEnvelope[Any]])

private case class ReplaySession(fromSeqNr: Long, queue: SinkQueueWithCancel[EventEnvelope[Any]])

}

/**
Expand Down Expand Up @@ -120,7 +122,7 @@ import scala.util.matching.Regex
private var replayHasBeenPulled = false
private var pendingReplayRequests: Vector[EntityIdOffset] = Vector.empty
// several replay streams may be in progress at the same time
private var replayInProgress: Map[String, SinkQueueWithCancel[EventEnvelope[Any]]] = Map.empty
private var replayInProgress: Map[String, ReplaySession] = Map.empty
private val replayCallback = getAsyncCallback[Try[ReplayEnvelope]] {
case Success(replayEnv) => onReplay(replayEnv)
case Failure(exc) => failStage(exc)
Expand Down Expand Up @@ -160,7 +162,7 @@ import scala.util.matching.Regex
logPrefix,
env.persistenceId,
env.sequenceNr)
val queue = replayInProgress(entityId)
val queue = replayInProgress(entityId).queue
queue.cancel()
replayCompleted()
}
Expand Down Expand Up @@ -254,19 +256,33 @@ import scala.util.matching.Regex
val fromSeqNr = entityOffset.seqNr
val pid = PersistenceId(entityType, entityId)
if (handledByThisStream(pid)) {
replayInProgress.get(entityId).foreach { replay =>
log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId)
replay.queue.cancel()
replayInProgress -= entityId
}
val sameInProgress =
replayInProgress.get(entityId) match {
case Some(replay) if replay.fromSeqNr == fromSeqNr =>
// no point in cancel and starting new from same seqNr
true
case Some(replay) =>
log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId)
replay.queue.cancel()
replayInProgress -= entityId
false
case None =>
false
}

if (replayInProgress.size < ReplayParallelism) {
if (sameInProgress) {
log.debugN(
"Stream [{}]: Replay of entityId [{}] already in progress from seqNr [{}], replaced by new replay",
logPrefix,
entityId,
fromSeqNr)
} else if (replayInProgress.size < ReplayParallelism) {
log.debugN("Stream [{}]: Starting replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr)
val queue =
currentEventsByPersistenceIdQuery
.currentEventsByPersistenceIdTyped[Any](pid.id, fromSeqNr, Long.MaxValue)
.runWith(Sink.queue())(materializer)
replayInProgress = replayInProgress.updated(entityId, queue)
replayInProgress = replayInProgress.updated(entityId, ReplaySession(fromSeqNr, queue))
tryPullReplay(entityId)
} else {
log.debugN("Stream [{}]: Queueing replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ class R2dbcTimestampOffsetStoreSpec
}

"accept known sequence numbers and reject unknown" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
Expand All @@ -393,97 +394,84 @@ class R2dbcTimestampOffsetStoreSpec

// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
offsetStore.isAccepted(env1).futureValue shouldBe true
offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe true
offsetStore.isAccepted(env1).futureValue shouldBe Accepted
offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
offsetStore.isAccepted(env1Later).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe false
offsetStore.isAccepted(env1Later).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2")
offsetStore.isAccepted(env2).futureValue shouldBe true
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe true
offsetStore.isAccepted(env2).futureValue shouldBe Accepted
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// but not when gap
val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4")
offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe false
offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe RejectedSeqNr
// hard reject when gap from backtracking
(offsetStore
.isAccepted(backtrackingEnvelope(envP4SeqNr4))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*")
offsetStore.isAccepted(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr
// reject filtered event when gap
offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe false
offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedSeqNr
// hard reject when filtered event with gap from backtracking
(offsetStore
offsetStore
.isAccepted(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*")
.futureValue shouldBe RejectedBacktrackingSeqNr
// and not if later already inflight, seqNr 2 was accepted
offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate

// +1 to known is accepted
val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4")
offsetStore.isAccepted(env3).futureValue shouldBe true
offsetStore.isAccepted(env3).futureValue shouldBe Accepted
// but not same
offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)
// and then it's not accepted again
offsetStore.isAccepted(env3).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe false
offsetStore.isAccepted(env3).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe Duplicate
// and not when later seqNr is inflight
offsetStore.isAccepted(env2).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe false
offsetStore.isAccepted(env2).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate

// +1 to known, and then also subsequent are accepted (needed for grouped)
val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6")
offsetStore.isAccepted(env4).futureValue shouldBe true
offsetStore.isAccepted(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7")
offsetStore.isAccepted(env5).futureValue shouldBe true
offsetStore.isAccepted(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8")
offsetStore.isAccepted(env6).futureValue shouldBe true
offsetStore.isAccepted(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// reject unknown
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
offsetStore.isAccepted(env7).futureValue shouldBe false
(offsetStore.isAccepted(backtrackingEnvelope(env7)).failed.futureValue.getMessage should fullyMatch)
.regex("Rejected envelope from backtracking.*unknown sequence number.*")
offsetStore.isAccepted(env7).futureValue shouldBe RejectedSeqNr
offsetStore.isAccepted(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.isAccepted(env8).futureValue shouldBe true
offsetStore.isAccepted(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.isAccepted(env9).futureValue shouldBe true
offsetStore.isAccepted(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
offsetStore.isAccepted(env10).futureValue shouldBe false
offsetStore.isAccepted(env10).futureValue shouldBe RejectedSeqNr
// hard reject when unknown from backtracking
(offsetStore
.isAccepted(backtrackingEnvelope(env10))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*")
offsetStore.isAccepted(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr
// hard reject when unknown filtered event from backtracking
(offsetStore
offsetStore
.isAccepted(backtrackingEnvelope(filteredEnvelope(env10)))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*")
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" -> 2L, "p5" -> 8)
Expand All @@ -498,6 +486,7 @@ class R2dbcTimestampOffsetStoreSpec
}

"update inflight on error and re-accept element" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

Expand All @@ -508,7 +497,7 @@ class R2dbcTimestampOffsetStoreSpec
val envelope3 = createEnvelope("p1", 3L, startTime.plusMillis(2), "e1-2")

// seqNr 1 is always accepted
offsetStore.isAccepted(envelope1).futureValue shouldBe true
offsetStore.isAccepted(envelope1).futureValue shouldBe Accepted
offsetStore.addInflight(envelope1)
offsetStore.getInflight() shouldBe Map("p1" -> 1L)
offsetStore
Expand All @@ -517,19 +506,19 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe empty

// seqNr 2 is accepts since it follows seqNr 1 that is stored in state
offsetStore.isAccepted(envelope2).futureValue shouldBe true
offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted
// simulate envelope processing error by not adding envelope2 to inflight

// seqNr 3 is not accepted, still waiting for seqNr 2
offsetStore.isAccepted(envelope3).futureValue shouldBe false
offsetStore.isAccepted(envelope3).futureValue shouldBe RejectedSeqNr

// offer seqNr 2 once again
offsetStore.isAccepted(envelope2).futureValue shouldBe true
offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted
offsetStore.addInflight(envelope2)
offsetStore.getInflight() shouldBe Map("p1" -> 2L)

// offer seqNr 3 once more
offsetStore.isAccepted(envelope3).futureValue shouldBe true
offsetStore.isAccepted(envelope3).futureValue shouldBe Accepted
offsetStore.addInflight(envelope3)
offsetStore.getInflight() shouldBe Map("p1" -> 3L)

Expand All @@ -540,7 +529,8 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe empty
}

"filter accepted" in {
"mapIsAccepted" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val startTime = TestClock.nowMicros().instant()
val offsetStore = createOffsetStore(projectionId)
Expand All @@ -561,11 +551,17 @@ class R2dbcTimestampOffsetStoreSpec
// but not when previous is unknown
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(5), "e3-7")

offsetStore.filterAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List(env1, env2, env4)
offsetStore.mapIsAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List(
env1 -> Accepted,
env2 -> Accepted,
env3 -> RejectedSeqNr,
env4 -> Accepted,
env5 -> RejectedSeqNr)

}

"accept new revisions for durable state" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

Expand All @@ -577,50 +573,50 @@ class R2dbcTimestampOffsetStoreSpec

// seqNr 1 is always accepted
val env1 = createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1")
offsetStore.isAccepted(env1).futureValue shouldBe true
offsetStore.isAccepted(env1).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
offsetStore
.isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1"))
.futureValue shouldBe false
.futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createUpdatedDurableState("p4", 2L, startTime.plusMillis(2), "s4-2")
offsetStore.isAccepted(env2).futureValue shouldBe true
offsetStore.isAccepted(env2).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// and also ok with gap
offsetStore
.isAccepted(createUpdatedDurableState("p4", 4L, startTime.plusMillis(3), "s4-4"))
.futureValue shouldBe true
.futureValue shouldBe Accepted
// and not if later already inflight, seqNr 2 was accepted
offsetStore
.isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1"))
.futureValue shouldBe false
.futureValue shouldBe Duplicate

// greater than known is accepted
val env3 = createUpdatedDurableState("p1", 4L, startTime.plusMillis(4), "s1-4")
offsetStore.isAccepted(env3).futureValue shouldBe true
offsetStore.isAccepted(env3).futureValue shouldBe Accepted
// but not same
offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)

// greater than known, and then also subsequent are accepted (needed for grouped)
val env4 = createUpdatedDurableState("p3", 8L, startTime.plusMillis(5), "s3-6")
offsetStore.isAccepted(env4).futureValue shouldBe true
offsetStore.isAccepted(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createUpdatedDurableState("p3", 9L, startTime.plusMillis(6), "s3-7")
offsetStore.isAccepted(env5).futureValue shouldBe true
offsetStore.isAccepted(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createUpdatedDurableState("p3", 20L, startTime.plusMillis(7), "s3-8")
offsetStore.isAccepted(env6).futureValue shouldBe true
offsetStore.isAccepted(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// accept unknown
val env7 = createUpdatedDurableState("p5", 7L, startTime.plusMillis(8), "s5-7")
offsetStore.isAccepted(env7).futureValue shouldBe true
offsetStore.isAccepted(env7).futureValue shouldBe Accepted
offsetStore.addInflight(env7)

// it's keeping the inflight that are not in the "stored" state
Expand Down
Loading