From 1eebb608e0437f609abee46126bf0c3c36475387 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 30 Mar 2023 11:46:17 +0200 Subject: [PATCH] fix: Don't trigger replay for duplicates * duplicates, less seqNr, have been processed and shouldn't trigger replay * need more details from R2dbcOffsetStore.isAccepted so changed return value to adt --- .../grpc/internal/FilterStage.scala | 34 +++-- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 124 +++++++++--------- .../r2dbc/internal/R2dbcOffsetStore.scala | 68 ++++++---- .../r2dbc/internal/R2dbcProjectionImpl.scala | 119 ++++++++++++----- 4 files changed, 213 insertions(+), 132 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 931252cb9..8a2c35c45 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -87,6 +87,8 @@ import scala.util.matching.Regex private case class ReplayEnvelope(entityId: String, env: Option[EventEnvelope[Any]]) + private case class ReplaySession(fromSeqNr: Long, queue: SinkQueueWithCancel[EventEnvelope[Any]]) + } /** @@ -120,7 +122,7 @@ import scala.util.matching.Regex private var replayHasBeenPulled = false private var pendingReplayRequests: Vector[EntityIdOffset] = Vector.empty // several replay streams may be in progress at the same time - private var replayInProgress: Map[String, SinkQueueWithCancel[EventEnvelope[Any]]] = Map.empty + private var replayInProgress: Map[String, ReplaySession] = Map.empty private val replayCallback = getAsyncCallback[Try[ReplayEnvelope]] { case Success(replayEnv) => onReplay(replayEnv) case Failure(exc) => failStage(exc) @@ -160,7 +162,7 @@ import scala.util.matching.Regex logPrefix, env.persistenceId, env.sequenceNr) - val queue = replayInProgress(entityId) + val queue = replayInProgress(entityId).queue queue.cancel() replayCompleted() } @@ -254,19 +256,33 @@ import scala.util.matching.Regex val fromSeqNr = entityOffset.seqNr val pid = PersistenceId(entityType, entityId) if (handledByThisStream(pid)) { - replayInProgress.get(entityId).foreach { replay => - log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId) - replay.queue.cancel() - replayInProgress -= entityId - } + val sameInProgress = + replayInProgress.get(entityId) match { + case Some(replay) if replay.fromSeqNr == fromSeqNr => + // no point in cancel and starting new from same seqNr + true + case Some(replay) => + log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId) + replay.queue.cancel() + replayInProgress -= entityId + false + case None => + false + } - if (replayInProgress.size < ReplayParallelism) { + if (sameInProgress) { + log.debugN( + "Stream [{}]: Replay of entityId [{}] already in progress from seqNr [{}], replaced by new replay", + logPrefix, + entityId, + fromSeqNr) + } else if (replayInProgress.size < ReplayParallelism) { log.debugN("Stream [{}]: Starting replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr) val queue = currentEventsByPersistenceIdQuery .currentEventsByPersistenceIdTyped[Any](pid.id, fromSeqNr, Long.MaxValue) .runWith(Sink.queue())(materializer) - replayInProgress = replayInProgress.updated(entityId, queue) + replayInProgress = replayInProgress.updated(entityId, ReplaySession(fromSeqNr, queue)) tryPullReplay(entityId) } else { log.debugN("Stream [{}]: Queueing replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr) diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index b09ca9e12..e829ba614 100644 --- a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -381,6 +381,7 @@ class R2dbcTimestampOffsetStoreSpec } "accept known sequence numbers and reject unknown" in { + import R2dbcOffsetStore.IsAcceptedResult._ val projectionId = genRandomProjectionId() val eventTimestampQueryClock = TestClock.nowMicros() val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) @@ -393,97 +394,84 @@ class R2dbcTimestampOffsetStoreSpec // seqNr 1 is always accepted val env1 = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1") - offsetStore.isAccepted(env1).futureValue shouldBe true - offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe true + offsetStore.isAccepted(env1).futureValue shouldBe Accepted + offsetStore.isAccepted(backtrackingEnvelope(env1)).futureValue shouldBe Accepted // but not if already inflight, seqNr 1 was accepted offsetStore.addInflight(env1) val env1Later = createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1") - offsetStore.isAccepted(env1Later).futureValue shouldBe false - offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe false + offsetStore.isAccepted(env1Later).futureValue shouldBe Duplicate + offsetStore.isAccepted(backtrackingEnvelope(env1Later)).futureValue shouldBe Duplicate // subsequent seqNr is accepted val env2 = createEnvelope("p4", 2L, startTime.plusMillis(2), "e4-2") - offsetStore.isAccepted(env2).futureValue shouldBe true - offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe true + offsetStore.isAccepted(env2).futureValue shouldBe Accepted + offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Accepted offsetStore.addInflight(env2) // but not when gap val envP4SeqNr4 = createEnvelope("p4", 4L, startTime.plusMillis(3), "e4-4") - offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe false + offsetStore.isAccepted(envP4SeqNr4).futureValue shouldBe RejectedSeqNr // hard reject when gap from backtracking - (offsetStore - .isAccepted(backtrackingEnvelope(envP4SeqNr4)) - .failed - .futureValue - .getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*") + offsetStore.isAccepted(backtrackingEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedBacktrackingSeqNr // reject filtered event when gap - offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe false + offsetStore.isAccepted(filteredEnvelope(envP4SeqNr4)).futureValue shouldBe RejectedSeqNr // hard reject when filtered event with gap from backtracking - (offsetStore + offsetStore .isAccepted(backtrackingEnvelope(filteredEnvelope(envP4SeqNr4))) - .failed - .futureValue - .getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unexpected sequence number.*") + .futureValue shouldBe RejectedBacktrackingSeqNr // and not if later already inflight, seqNr 2 was accepted - offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe false + offsetStore.isAccepted(createEnvelope("p4", 1L, startTime.plusMillis(1), "e4-1")).futureValue shouldBe Duplicate // +1 to known is accepted val env3 = createEnvelope("p1", 4L, startTime.plusMillis(4), "e1-4") - offsetStore.isAccepted(env3).futureValue shouldBe true + offsetStore.isAccepted(env3).futureValue shouldBe Accepted // but not same - offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe false + offsetStore.isAccepted(createEnvelope("p3", 5L, startTime, "e3-5")).futureValue shouldBe Duplicate // but not same, even if it's 1 - offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe false + offsetStore.isAccepted(createEnvelope("p2", 1L, startTime, "e2-1")).futureValue shouldBe Duplicate // and not less - offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe false + offsetStore.isAccepted(createEnvelope("p3", 4L, startTime, "e3-4")).futureValue shouldBe Duplicate offsetStore.addInflight(env3) // and then it's not accepted again - offsetStore.isAccepted(env3).futureValue shouldBe false - offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe false + offsetStore.isAccepted(env3).futureValue shouldBe Duplicate + offsetStore.isAccepted(backtrackingEnvelope(env3)).futureValue shouldBe Duplicate // and not when later seqNr is inflight - offsetStore.isAccepted(env2).futureValue shouldBe false - offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe false + offsetStore.isAccepted(env2).futureValue shouldBe Duplicate + offsetStore.isAccepted(backtrackingEnvelope(env2)).futureValue shouldBe Duplicate // +1 to known, and then also subsequent are accepted (needed for grouped) val env4 = createEnvelope("p3", 6L, startTime.plusMillis(5), "e3-6") - offsetStore.isAccepted(env4).futureValue shouldBe true + offsetStore.isAccepted(env4).futureValue shouldBe Accepted offsetStore.addInflight(env4) val env5 = createEnvelope("p3", 7L, startTime.plusMillis(6), "e3-7") - offsetStore.isAccepted(env5).futureValue shouldBe true + offsetStore.isAccepted(env5).futureValue shouldBe Accepted offsetStore.addInflight(env5) val env6 = createEnvelope("p3", 8L, startTime.plusMillis(7), "e3-8") - offsetStore.isAccepted(env6).futureValue shouldBe true + offsetStore.isAccepted(env6).futureValue shouldBe Accepted offsetStore.addInflight(env6) // reject unknown val env7 = createEnvelope("p5", 7L, startTime.plusMillis(8), "e5-7") - offsetStore.isAccepted(env7).futureValue shouldBe false - (offsetStore.isAccepted(backtrackingEnvelope(env7)).failed.futureValue.getMessage should fullyMatch) - .regex("Rejected envelope from backtracking.*unknown sequence number.*") + offsetStore.isAccepted(env7).futureValue shouldBe RejectedSeqNr + offsetStore.isAccepted(backtrackingEnvelope(env7)).futureValue shouldBe RejectedBacktrackingSeqNr // but ok when previous is old eventTimestampQueryClock.setInstant(startTime.minusSeconds(3600)) val env8 = createEnvelope("p5", 7L, startTime.plusMillis(5), "e5-7") - offsetStore.isAccepted(env8).futureValue shouldBe true + offsetStore.isAccepted(env8).futureValue shouldBe Accepted eventTimestampQueryClock.setInstant(startTime) offsetStore.addInflight(env8) // and subsequent seqNr is accepted val env9 = createEnvelope("p5", 8L, startTime.plusMillis(9), "e5-8") - offsetStore.isAccepted(env9).futureValue shouldBe true + offsetStore.isAccepted(env9).futureValue shouldBe Accepted offsetStore.addInflight(env9) // reject unknown filtered val env10 = filteredEnvelope(createEnvelope("p6", 7L, startTime.plusMillis(10), "e6-7")) - offsetStore.isAccepted(env10).futureValue shouldBe false + offsetStore.isAccepted(env10).futureValue shouldBe RejectedSeqNr // hard reject when unknown from backtracking - (offsetStore - .isAccepted(backtrackingEnvelope(env10)) - .failed - .futureValue - .getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*") + offsetStore.isAccepted(backtrackingEnvelope(env10)).futureValue shouldBe RejectedBacktrackingSeqNr // hard reject when unknown filtered event from backtracking - (offsetStore + offsetStore .isAccepted(backtrackingEnvelope(filteredEnvelope(env10))) - .failed - .futureValue - .getMessage should fullyMatch).regex("Rejected envelope from backtracking.*unknown sequence number.*") + .futureValue shouldBe RejectedBacktrackingSeqNr // it's keeping the inflight that are not in the "stored" state offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8, "p4" -> 2L, "p5" -> 8) @@ -498,6 +486,7 @@ class R2dbcTimestampOffsetStoreSpec } "update inflight on error and re-accept element" in { + import R2dbcOffsetStore.IsAcceptedResult._ val projectionId = genRandomProjectionId() val offsetStore = createOffsetStore(projectionId) @@ -508,7 +497,7 @@ class R2dbcTimestampOffsetStoreSpec val envelope3 = createEnvelope("p1", 3L, startTime.plusMillis(2), "e1-2") // seqNr 1 is always accepted - offsetStore.isAccepted(envelope1).futureValue shouldBe true + offsetStore.isAccepted(envelope1).futureValue shouldBe Accepted offsetStore.addInflight(envelope1) offsetStore.getInflight() shouldBe Map("p1" -> 1L) offsetStore @@ -517,19 +506,19 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe empty // seqNr 2 is accepts since it follows seqNr 1 that is stored in state - offsetStore.isAccepted(envelope2).futureValue shouldBe true + offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted // simulate envelope processing error by not adding envelope2 to inflight // seqNr 3 is not accepted, still waiting for seqNr 2 - offsetStore.isAccepted(envelope3).futureValue shouldBe false + offsetStore.isAccepted(envelope3).futureValue shouldBe RejectedSeqNr // offer seqNr 2 once again - offsetStore.isAccepted(envelope2).futureValue shouldBe true + offsetStore.isAccepted(envelope2).futureValue shouldBe Accepted offsetStore.addInflight(envelope2) offsetStore.getInflight() shouldBe Map("p1" -> 2L) // offer seqNr 3 once more - offsetStore.isAccepted(envelope3).futureValue shouldBe true + offsetStore.isAccepted(envelope3).futureValue shouldBe Accepted offsetStore.addInflight(envelope3) offsetStore.getInflight() shouldBe Map("p1" -> 3L) @@ -540,7 +529,8 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe empty } - "filter accepted" in { + "mapIsAccepted" in { + import R2dbcOffsetStore.IsAcceptedResult._ val projectionId = genRandomProjectionId() val startTime = TestClock.nowMicros().instant() val offsetStore = createOffsetStore(projectionId) @@ -561,11 +551,17 @@ class R2dbcTimestampOffsetStoreSpec // but not when previous is unknown val env5 = createEnvelope("p3", 7L, startTime.plusMillis(5), "e3-7") - offsetStore.filterAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List(env1, env2, env4) + offsetStore.mapIsAccepted(List(env1, env2, env3, env4, env5)).futureValue shouldBe List( + env1 -> Accepted, + env2 -> Accepted, + env3 -> RejectedSeqNr, + env4 -> Accepted, + env5 -> RejectedSeqNr) } "accept new revisions for durable state" in { + import R2dbcOffsetStore.IsAcceptedResult._ val projectionId = genRandomProjectionId() val offsetStore = createOffsetStore(projectionId) @@ -577,50 +573,50 @@ class R2dbcTimestampOffsetStoreSpec // seqNr 1 is always accepted val env1 = createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1") - offsetStore.isAccepted(env1).futureValue shouldBe true + offsetStore.isAccepted(env1).futureValue shouldBe Accepted // but not if already inflight, seqNr 1 was accepted offsetStore.addInflight(env1) offsetStore .isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1")) - .futureValue shouldBe false + .futureValue shouldBe Duplicate // subsequent seqNr is accepted val env2 = createUpdatedDurableState("p4", 2L, startTime.plusMillis(2), "s4-2") - offsetStore.isAccepted(env2).futureValue shouldBe true + offsetStore.isAccepted(env2).futureValue shouldBe Accepted offsetStore.addInflight(env2) // and also ok with gap offsetStore .isAccepted(createUpdatedDurableState("p4", 4L, startTime.plusMillis(3), "s4-4")) - .futureValue shouldBe true + .futureValue shouldBe Accepted // and not if later already inflight, seqNr 2 was accepted offsetStore .isAccepted(createUpdatedDurableState("p4", 1L, startTime.plusMillis(1), "s4-1")) - .futureValue shouldBe false + .futureValue shouldBe Duplicate // greater than known is accepted val env3 = createUpdatedDurableState("p1", 4L, startTime.plusMillis(4), "s1-4") - offsetStore.isAccepted(env3).futureValue shouldBe true + offsetStore.isAccepted(env3).futureValue shouldBe Accepted // but not same - offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe false + offsetStore.isAccepted(createUpdatedDurableState("p3", 5L, startTime, "s3-5")).futureValue shouldBe Duplicate // but not same, even if it's 1 - offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe false + offsetStore.isAccepted(createUpdatedDurableState("p2", 1L, startTime, "s2-1")).futureValue shouldBe Duplicate // and not less - offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe false + offsetStore.isAccepted(createUpdatedDurableState("p3", 4L, startTime, "s3-4")).futureValue shouldBe Duplicate offsetStore.addInflight(env3) // greater than known, and then also subsequent are accepted (needed for grouped) val env4 = createUpdatedDurableState("p3", 8L, startTime.plusMillis(5), "s3-6") - offsetStore.isAccepted(env4).futureValue shouldBe true + offsetStore.isAccepted(env4).futureValue shouldBe Accepted offsetStore.addInflight(env4) val env5 = createUpdatedDurableState("p3", 9L, startTime.plusMillis(6), "s3-7") - offsetStore.isAccepted(env5).futureValue shouldBe true + offsetStore.isAccepted(env5).futureValue shouldBe Accepted offsetStore.addInflight(env5) val env6 = createUpdatedDurableState("p3", 20L, startTime.plusMillis(7), "s3-8") - offsetStore.isAccepted(env6).futureValue shouldBe true + offsetStore.isAccepted(env6).futureValue shouldBe Accepted offsetStore.addInflight(env6) // accept unknown val env7 = createUpdatedDurableState("p5", 7L, startTime.plusMillis(8), "s5-7") - offsetStore.isAccepted(env7).futureValue shouldBe true + offsetStore.isAccepted(env7).futureValue shouldBe Accepted offsetStore.addInflight(env7) // it's keeping the inflight that are not in the "stored" state diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index b914c27f6..0300b585a 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -168,6 +168,20 @@ private[projection] object R2dbcOffsetStore { final class RejectedEnvelope(message: String) extends IllegalStateException(message) + sealed trait IsAcceptedResult + + object IsAcceptedResult { + case object Accepted extends IsAcceptedResult + case object Duplicate extends IsAcceptedResult + case object RejectedSeqNr extends IsAcceptedResult + case object RejectedBacktrackingSeqNr extends IsAcceptedResult + + val FutureAccepted: Future[IsAcceptedResult] = Future.successful(Accepted) + val FutureDuplicate: Future[IsAcceptedResult] = Future.successful(Duplicate) + val FutureRejectedSeqNr: Future[IsAcceptedResult] = Future.successful(RejectedSeqNr) + val FutureRejectedBacktrackingSeqNr: Future[IsAcceptedResult] = Future.successful(RejectedBacktrackingSeqNr) + } + val FutureDone: Future[Done] = Future.successful(Done) val FutureTrue: Future[Boolean] = Future.successful(true) val FutureFalse: Future[Boolean] = Future.successful(false) @@ -626,23 +640,25 @@ private[projection] class R2dbcOffsetStore( def isDuplicate(record: Record): Boolean = getState().isDuplicate(record) - def filterAccepted[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[Envelope]] = { + def mapIsAccepted[Envelope]( + envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[(Envelope, IsAcceptedResult)]] = { + import IsAcceptedResult._ envelopes - .foldLeft(Future.successful((getInflight(), Vector.empty[Envelope]))) { (acc, envelope) => + .foldLeft(Future.successful((getInflight(), Vector.empty[(Envelope, IsAcceptedResult)]))) { (acc, envelope) => acc.flatMap { case (inflight, filteredEnvelopes) => createRecordWithOffset(envelope) match { case Some(recordWithOffset) => isAccepted(recordWithOffset, inflight).map { - case true => + case Accepted => ( inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr), - filteredEnvelopes :+ envelope) - case false => - (inflight, filteredEnvelopes) + filteredEnvelopes :+ (envelope, Accepted)) + case rejected => + (inflight, filteredEnvelopes :+ (envelope, rejected)) } case None => - Future.successful((inflight, filteredEnvelopes :+ envelope)) + Future.successful((inflight, filteredEnvelopes :+ (envelope, Accepted))) } } } @@ -656,14 +672,17 @@ private[projection] class R2dbcOffsetStore( * Completed with `true` if accepted, otherwise `false` if rejected or failed with `EnvelopeRejected` if * backtracking envelope is rejected. */ - def isAccepted[Envelope](envelope: Envelope): Future[Boolean] = { + def isAccepted[Envelope](envelope: Envelope): Future[IsAcceptedResult] = { createRecordWithOffset(envelope) match { case Some(recordWithOffset) => isAccepted(recordWithOffset, getInflight()) - case None => FutureTrue + case None => IsAcceptedResult.FutureAccepted } } - private def isAccepted(recordWithOffset: RecordWithOffset, currentInflight: Map[Pid, SeqNr]): Future[Boolean] = { + private def isAccepted( + recordWithOffset: RecordWithOffset, + currentInflight: Map[Pid, SeqNr]): Future[IsAcceptedResult] = { + import IsAcceptedResult._ val pid = recordWithOffset.record.pid val seqNr = recordWithOffset.record.seqNr val currentState = getState() @@ -672,7 +691,7 @@ private[projection] class R2dbcOffsetStore( if (duplicate) { logger.trace("Filtering out duplicate sequence number [{}] for pid [{}]", seqNr, pid) - FutureFalse + FutureDuplicate } else if (recordWithOffset.strictSeqNr) { // strictSeqNr == true is for event sourced val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) @@ -728,25 +747,22 @@ private[projection] class R2dbcOffsetStore( // expecting seqNr to be +1 of previously known val ok = seqNr == prevSeqNr + 1 if (ok) { - FutureTrue + FutureAccepted } else if (seqNr <= currentInflight.getOrElse(pid, 0L)) { // currentInFlight contains those that have been processed or about to be processed in Flow, // but offset not saved yet => ok to handle as duplicate - FutureFalse + FutureDuplicate } else if (!recordWithOffset.fromBacktracking) { logUnexpected() - FutureFalse + FutureRejectedSeqNr } else { logUnexpected() // This will result in projection restart (with normal configuration) - Future.failed( - new RejectedEnvelope( - s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr] " + - "due to unexpected sequence number.")) + FutureRejectedBacktrackingSeqNr } } else if (seqNr == 1) { // always accept first event if no other event for that pid has been seen - FutureTrue + FutureAccepted } else { // Haven't see seen this pid within the time window. Since events can be missed // when read at the tail we will only accept it if the event with previous seqNr has timestamp @@ -763,20 +779,18 @@ private[projection] class R2dbcOffsetStore( seqNr, previousTimestamp, before) - true + Accepted } else if (!recordWithOffset.fromBacktracking) { logUnknown() - false + RejectedSeqNr } else { logUnknown() // This will result in projection restart (with normal configuration) - throw new RejectedEnvelope( - s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr], " + - "due to unknown sequence number.") + RejectedBacktrackingSeqNr } case None => // previous not found, could have been deleted - true + Accepted } } } else { @@ -785,10 +799,10 @@ private[projection] class R2dbcOffsetStore( val ok = seqNr > prevSeqNr if (ok) { - FutureTrue + FutureAccepted } else { logger.traceN("Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", seqNr, pid, prevSeqNr) - FutureFalse + FutureDuplicate } } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 6246a268b..420ba79e6 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -184,10 +184,11 @@ private[projection] object R2dbcProjectionImpl { new AdaptedR2dbcHandler(handlerFactory()) { override def process(envelope: Envelope): Future[Done] = { + import R2dbcOffsetStore.IsAcceptedResult._ offsetStore .isAccepted(envelope) .flatMap { - case true => + case Accepted => if (isFilteredEvent(envelope)) { val offset = extractOffsetPidSeqNr(sourceProvider, envelope) offsetStore.saveOffset(offset) @@ -205,14 +206,16 @@ private[projection] object R2dbcProjectionImpl { } } } - case false => + case Duplicate => + FutureDone + case RejectedSeqNr => triggerReplayIfPossible(sourceProvider, offsetStore, envelope) FutureDone - } - .recover { - case ex: RejectedEnvelope => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) Done - else throw ex + case RejectedBacktrackingSeqNr => + if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) + FutureDone + else + throwRejectedEnvelope(sourceProvider, envelope) } } } @@ -229,7 +232,23 @@ private[projection] object R2dbcProjectionImpl { new AdaptedR2dbcHandler(handlerFactory()) { override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = { - offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes => + import R2dbcOffsetStore.IsAcceptedResult._ + offsetStore.mapIsAccepted(envelopes).flatMap { isAcceptedEnvelopes => + isAcceptedEnvelopes.foreach { + case (env, RejectedSeqNr) => + triggerReplayIfPossible(sourceProvider, offsetStore, env) + case (env, RejectedBacktrackingSeqNr) => + if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) + FutureDone + else + throwRejectedEnvelope(sourceProvider, env) + case _ => + } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => env + } + if (acceptedEnvelopes.isEmpty) { FutureDone } else { @@ -263,10 +282,11 @@ private[projection] object R2dbcProjectionImpl { () => new AdaptedR2dbcHandler(handlerFactory()) { override def process(envelope: Envelope): Future[Done] = { + import R2dbcOffsetStore.IsAcceptedResult._ offsetStore .isAccepted(envelope) .flatMap { - case true => + case Accepted => if (isFilteredEvent(envelope)) { offsetStore.addInflight(envelope) FutureDone @@ -284,14 +304,16 @@ private[projection] object R2dbcProjectionImpl { } } } - case false => + case Duplicate => + FutureDone + case RejectedSeqNr => triggerReplayIfPossible(sourceProvider, offsetStore, envelope) FutureDone - } - .recover { - case ex: RejectedEnvelope => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) Done - else throw ex + case RejectedBacktrackingSeqNr => + if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) + FutureDone + else + throwRejectedEnvelope(sourceProvider, envelope) } } } @@ -304,10 +326,11 @@ private[projection] object R2dbcProjectionImpl { () => new AdaptedHandler(handlerFactory()) { override def process(envelope: Envelope): Future[Done] = { + import R2dbcOffsetStore.IsAcceptedResult._ offsetStore .isAccepted(envelope) .flatMap { - case true => + case Accepted => if (isFilteredEvent(envelope)) { offsetStore.addInflight(envelope) FutureDone @@ -321,14 +344,16 @@ private[projection] object R2dbcProjectionImpl { } } } - case false => + case Duplicate => + FutureDone + case RejectedSeqNr => triggerReplayIfPossible(sourceProvider, offsetStore, envelope) FutureDone - } - .recover { - case ex: RejectedEnvelope => - if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) Done - else throw ex + case RejectedBacktrackingSeqNr => + if (triggerReplayIfPossible(sourceProvider, offsetStore, envelope)) + FutureDone + else + throwRejectedEnvelope(sourceProvider, envelope) } } } @@ -344,7 +369,23 @@ private[projection] object R2dbcProjectionImpl { new AdaptedHandler(handlerFactory()) { override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = { - offsetStore.filterAccepted(envelopes).flatMap { acceptedEnvelopes => + import R2dbcOffsetStore.IsAcceptedResult._ + offsetStore.mapIsAccepted(envelopes).flatMap { isAcceptedEnvelopes => + isAcceptedEnvelopes.foreach { + case (env, RejectedSeqNr) => + triggerReplayIfPossible(sourceProvider, offsetStore, env) + case (env, RejectedBacktrackingSeqNr) => + if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) + FutureDone + else + throwRejectedEnvelope(sourceProvider, env) + case _ => + } + + val acceptedEnvelopes = isAcceptedEnvelopes.collect { + case (env, Accepted) => env + } + if (acceptedEnvelopes.isEmpty) { FutureDone } else { @@ -375,13 +416,14 @@ private[projection] object R2dbcProjectionImpl { offsetStore: R2dbcOffsetStore, settings: R2dbcProjectionSettings)( implicit system: ActorSystem[_]): FlowWithContext[Envelope, ProjectionContext, Done, ProjectionContext, _] = { + import R2dbcOffsetStore.IsAcceptedResult._ implicit val ec: ExecutionContext = system.executionContext FlowWithContext[Envelope, ProjectionContext] .mapAsync(1) { env => offsetStore .isAccepted(env) - .flatMap { ok => - if (ok) { + .flatMap { + case Accepted => if (isFilteredEvent(env) && settings.warnAboutFilteredEventsInFlow) { log.info("atLeastOnceFlow doesn't support of skipping envelopes. Envelope [{}] still emitted.", env) } @@ -389,15 +431,16 @@ private[projection] object R2dbcProjectionImpl { offsetStore.addInflight(loadedEnvelope) Some(loadedEnvelope) } - } else { + case Duplicate => + Future.successful(None) + case RejectedSeqNr => triggerReplayIfPossible(sourceProvider, offsetStore, env) Future.successful(None) - } - } - .recover { - case ex: RejectedEnvelope => - if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) None - else throw ex + case RejectedBacktrackingSeqNr => + if (triggerReplayIfPossible(sourceProvider, offsetStore, env)) + Future.successful(None) + else + throwRejectedEnvelope(sourceProvider, env) } } .collect { @@ -427,6 +470,18 @@ private[projection] object R2dbcProjectionImpl { } } + private def throwRejectedEnvelope[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + envelope: Envelope): Nothing = { + extractOffsetPidSeqNr(sourceProvider, envelope) match { + case OffsetPidSeqNr(_, Some((pid, seqNr))) => + throw new RejectedEnvelope( + s"Rejected envelope from backtracking, persistenceId [$pid], seqNr [$seqNr] due to unexpected sequence number.") + case OffsetPidSeqNr(_, None) => + throw new RejectedEnvelope(s"Rejected envelope from backtracking.") + } + } + @nowarn("msg=never used") abstract class AdaptedR2dbcHandler[E](val delegate: R2dbcHandler[E])( implicit