Skip to content

Commit

Permalink
fix: Don't trigger replay for duplicates
Browse files Browse the repository at this point in the history
* duplicates, less seqNr, have been processed and shouldn't trigger replay
* need more details from R2dbcOffsetStore.isAccepted so changed return value to adt
  • Loading branch information
patriknw committed Mar 30, 2023
1 parent f10894b commit 1eebb60
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ import scala.util.matching.Regex

private case class ReplayEnvelope(entityId: String, env: Option[EventEnvelope[Any]])

private case class ReplaySession(fromSeqNr: Long, queue: SinkQueueWithCancel[EventEnvelope[Any]])

}

/**
Expand Down Expand Up @@ -120,7 +122,7 @@ import scala.util.matching.Regex
private var replayHasBeenPulled = false
private var pendingReplayRequests: Vector[EntityIdOffset] = Vector.empty
// several replay streams may be in progress at the same time
private var replayInProgress: Map[String, SinkQueueWithCancel[EventEnvelope[Any]]] = Map.empty
private var replayInProgress: Map[String, ReplaySession] = Map.empty
private val replayCallback = getAsyncCallback[Try[ReplayEnvelope]] {
case Success(replayEnv) => onReplay(replayEnv)
case Failure(exc) => failStage(exc)
Expand Down Expand Up @@ -160,7 +162,7 @@ import scala.util.matching.Regex
logPrefix,
env.persistenceId,
env.sequenceNr)
val queue = replayInProgress(entityId)
val queue = replayInProgress(entityId).queue
queue.cancel()
replayCompleted()
}
Expand Down Expand Up @@ -254,19 +256,33 @@ import scala.util.matching.Regex
val fromSeqNr = entityOffset.seqNr
val pid = PersistenceId(entityType, entityId)
if (handledByThisStream(pid)) {
replayInProgress.get(entityId).foreach { replay =>
log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId)
replay.queue.cancel()
replayInProgress -= entityId
}
val sameInProgress =
replayInProgress.get(entityId) match {
case Some(replay) if replay.fromSeqNr == fromSeqNr =>
// no point in cancel and starting new from same seqNr
true
case Some(replay) =>
log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId)
replay.queue.cancel()
replayInProgress -= entityId
false
case None =>
false
}

if (replayInProgress.size < ReplayParallelism) {
if (sameInProgress) {
log.debugN(
"Stream [{}]: Replay of entityId [{}] already in progress from seqNr [{}], replaced by new replay",
logPrefix,
entityId,
fromSeqNr)
} else if (replayInProgress.size < ReplayParallelism) {
log.debugN("Stream [{}]: Starting replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr)
val queue =
currentEventsByPersistenceIdQuery
.currentEventsByPersistenceIdTyped[Any](pid.id, fromSeqNr, Long.MaxValue)
.runWith(Sink.queue())(materializer)
replayInProgress = replayInProgress.updated(entityId, queue)
replayInProgress = replayInProgress.updated(entityId, ReplaySession(fromSeqNr, queue))
tryPullReplay(entityId)
} else {
log.debugN("Stream [{}]: Queueing replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ class R2dbcTimestampOffsetStoreSpec
}

"accept known sequence numbers and reject unknown" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
Expand All @@ -393,97 +394,84 @@ class R2dbcTimestampOffsetStoreSpec

// seqNr 1 is always accepted
val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
offsetStore.isAccepted(env1).futureValue shouldBe true
offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe true
offsetStore.isAccepted(env1).futureValue shouldBe Accepted
offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")
offsetStore.isAccepted(env1Later).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe false
offsetStore.isAccepted(env1Later).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2")
offsetStore.isAccepted(env2).futureValue shouldBe true
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe true
offsetStore.isAccepted(env2).futureValue shouldBe Accepted
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// but not when gap
val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4")
offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe false
offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe RejectedSeqNr
// hard reject when gap from backtracking
(offsetStore
.isAccepted(backtrackingEnvelope(envP4SeqNr4))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*")
offsetStore.isAccepted(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr
// reject filtered event when gap
offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe false
offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedSeqNr
// hard reject when filtered event with gap from backtracking
(offsetStore
offsetStore
.isAccepted(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4)))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*")
.futureValue shouldBe RejectedBacktrackingSeqNr
// and not if later already inflight, seqNr 2 was accepted
offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate

// +1 to known is accepted
val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4")
offsetStore.isAccepted(env3).futureValue shouldBe true
offsetStore.isAccepted(env3).futureValue shouldBe Accepted
// but not same
offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe false
offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)
// and then it's not accepted again
offsetStore.isAccepted(env3).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe false
offsetStore.isAccepted(env3).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe Duplicate
// and not when later seqNr is inflight
offsetStore.isAccepted(env2).futureValue shouldBe false
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe false
offsetStore.isAccepted(env2).futureValue shouldBe Duplicate
offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate

// +1 to known, and then also subsequent are accepted (needed for grouped)
val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6")
offsetStore.isAccepted(env4).futureValue shouldBe true
offsetStore.isAccepted(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7")
offsetStore.isAccepted(env5).futureValue shouldBe true
offsetStore.isAccepted(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8")
offsetStore.isAccepted(env6).futureValue shouldBe true
offsetStore.isAccepted(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// reject unknown
val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7")
offsetStore.isAccepted(env7).futureValue shouldBe false
(offsetStore.isAccepted(backtrackingEnvelope(env7)).failed.futureValue.getMessage should fullyMatch)
.regex("Rejected envelope from backtracking.*unknown sequence number.*")
offsetStore.isAccepted(env7).futureValue shouldBe RejectedSeqNr
offsetStore.isAccepted(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr
// but ok when previous is old
eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600))
val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7")
offsetStore.isAccepted(env8).futureValue shouldBe true
offsetStore.isAccepted(env8).futureValue shouldBe Accepted
eventTimestampQueryClock.setInstant(startTime)
offsetStore.addInflight(env8)
// and subsequent seqNr is accepted
val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8")
offsetStore.isAccepted(env9).futureValue shouldBe true
offsetStore.isAccepted(env9).futureValue shouldBe Accepted
offsetStore.addInflight(env9)

// reject unknown filtered
val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7"))
offsetStore.isAccepted(env10).futureValue shouldBe false
offsetStore.isAccepted(env10).futureValue shouldBe RejectedSeqNr
// hard reject when unknown from backtracking
(offsetStore
.isAccepted(backtrackingEnvelope(env10))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*")
offsetStore.isAccepted(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr
// hard reject when unknown filtered event from backtracking
(offsetStore
offsetStore
.isAccepted(backtrackingEnvelope(filteredEnvelope(env10)))
.failed
.futureValue
.getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*")
.futureValue shouldBe RejectedBacktrackingSeqNr

// it's keeping the inflight that are not in the "stored" state
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" -> 2L, "p5" -> 8)
Expand All @@ -498,6 +486,7 @@ class R2dbcTimestampOffsetStoreSpec
}

"update inflight on error and re-accept element" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

Expand All @@ -508,7 +497,7 @@ class R2dbcTimestampOffsetStoreSpec
val envelope3 = createEnvelope("p1", 3L, startTime.plusMillis(2), "e1-2")

// seqNr 1 is always accepted
offsetStore.isAccepted(envelope1).futureValue shouldBe true
offsetStore.isAccepted(envelope1).futureValue shouldBe Accepted
offsetStore.addInflight(envelope1)
offsetStore.getInflight() shouldBe Map("p1" -> 1L)
offsetStore
Expand All @@ -517,19 +506,19 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe empty

// seqNr 2 is accepts since it follows seqNr 1 that is stored in state
offsetStore.isAccepted(envelope2).futureValue shouldBe true
offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted
// simulate envelope processing error by not adding envelope2 to inflight

// seqNr 3 is not accepted, still waiting for seqNr 2
offsetStore.isAccepted(envelope3).futureValue shouldBe false
offsetStore.isAccepted(envelope3).futureValue shouldBe RejectedSeqNr

// offer seqNr 2 once again
offsetStore.isAccepted(envelope2).futureValue shouldBe true
offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted
offsetStore.addInflight(envelope2)
offsetStore.getInflight() shouldBe Map("p1" -> 2L)

// offer seqNr 3 once more
offsetStore.isAccepted(envelope3).futureValue shouldBe true
offsetStore.isAccepted(envelope3).futureValue shouldBe Accepted
offsetStore.addInflight(envelope3)
offsetStore.getInflight() shouldBe Map("p1" -> 3L)

Expand All @@ -540,7 +529,8 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe empty
}

"filter accepted" in {
"mapIsAccepted" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val startTime = TestClock.nowMicros().instant()
val offsetStore = createOffsetStore(projectionId)
Expand All @@ -561,11 +551,17 @@ class R2dbcTimestampOffsetStoreSpec
// but not when previous is unknown
val env5 = createEnvelope("p3", 7L, startTime.plusMillis(5), "e3-7")

offsetStore.filterAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List(env1, env2, env4)
offsetStore.mapIsAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List(
env1 -> Accepted,
env2 -> Accepted,
env3 -> RejectedSeqNr,
env4 -> Accepted,
env5 -> RejectedSeqNr)

}

"accept new revisions for durable state" in {
import R2dbcOffsetStore.IsAcceptedResult._
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)

Expand All @@ -577,50 +573,50 @@ class R2dbcTimestampOffsetStoreSpec

// seqNr 1 is always accepted
val env1 = createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1")
offsetStore.isAccepted(env1).futureValue shouldBe true
offsetStore.isAccepted(env1).futureValue shouldBe Accepted
// but not if already inflight, seqNr 1 was accepted
offsetStore.addInflight(env1)
offsetStore
.isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1"))
.futureValue shouldBe false
.futureValue shouldBe Duplicate
// subsequent seqNr is accepted
val env2 = createUpdatedDurableState("p4", 2L, startTime.plusMillis(2), "s4-2")
offsetStore.isAccepted(env2).futureValue shouldBe true
offsetStore.isAccepted(env2).futureValue shouldBe Accepted
offsetStore.addInflight(env2)
// and also ok with gap
offsetStore
.isAccepted(createUpdatedDurableState("p4", 4L, startTime.plusMillis(3), "s4-4"))
.futureValue shouldBe true
.futureValue shouldBe Accepted
// and not if later already inflight, seqNr 2 was accepted
offsetStore
.isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1"))
.futureValue shouldBe false
.futureValue shouldBe Duplicate

// greater than known is accepted
val env3 = createUpdatedDurableState("p1", 4L, startTime.plusMillis(4), "s1-4")
offsetStore.isAccepted(env3).futureValue shouldBe true
offsetStore.isAccepted(env3).futureValue shouldBe Accepted
// but not same
offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe Duplicate
// but not same, even if it's 1
offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe Duplicate
// and not less
offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe false
offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe Duplicate
offsetStore.addInflight(env3)

// greater than known, and then also subsequent are accepted (needed for grouped)
val env4 = createUpdatedDurableState("p3", 8L, startTime.plusMillis(5), "s3-6")
offsetStore.isAccepted(env4).futureValue shouldBe true
offsetStore.isAccepted(env4).futureValue shouldBe Accepted
offsetStore.addInflight(env4)
val env5 = createUpdatedDurableState("p3", 9L, startTime.plusMillis(6), "s3-7")
offsetStore.isAccepted(env5).futureValue shouldBe true
offsetStore.isAccepted(env5).futureValue shouldBe Accepted
offsetStore.addInflight(env5)
val env6 = createUpdatedDurableState("p3", 20L, startTime.plusMillis(7), "s3-8")
offsetStore.isAccepted(env6).futureValue shouldBe true
offsetStore.isAccepted(env6).futureValue shouldBe Accepted
offsetStore.addInflight(env6)

// accept unknown
val env7 = createUpdatedDurableState("p5", 7L, startTime.plusMillis(8), "s5-7")
offsetStore.isAccepted(env7).futureValue shouldBe true
offsetStore.isAccepted(env7).futureValue shouldBe Accepted
offsetStore.addInflight(env7)

// it's keeping the inflight that are not in the "stored" state
Expand Down
Loading

0 comments on commit 1eebb60

Please sign in to comment.