Skip to content

Commit

Permalink
move replayIfPossible to adapted handler implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Nov 29, 2024
1 parent df90091 commit 5984826
Showing 1 changed file with 83 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,95 @@ private[projection] object DynamoDBProjectionImpl {
case Duplicate =>
FutureDone
case RejectedSeqNr =>
replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map(_ => Done)(
ExecutionContext.parasitic)
replayIfPossible(envelope).map(_ => Done)(ExecutionContext.parasitic)
case RejectedBacktrackingSeqNr =>
replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map {
replayIfPossible(envelope).map {
case true => Done
case false => throwRejectedEnvelope(sourceProvider, envelope)
}
}
}

private def replayIfPossible(
originalEnvelope: Envelope)(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = {
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
// FIXME config to make this case opt in
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] =>
val persistenceId = originalEventEnvelope.persistenceId
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
.mapAsync(1) { envelope =>
import DynamoDBOffsetStore.Validation._
offsetStore
.validate(envelope)
.flatMap {
case Accepted =>
if (isFilteredEvent(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
delegate
.process(envelope.asInstanceOf[Envelope])
.map { _ =>
offsetStore.addInflight(envelope)
Done
}
}
case Duplicate =>
FutureDone
case RejectedSeqNr =>
// this shouldn't happen
throw new RejectedEnvelope(
s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].")
case RejectedBacktrackingSeqNr =>
// this shouldn't happen
throw new RejectedEnvelope(
s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].")
}
}
.runFold(0) { case (acc, _) => acc + 1 }
.map { count =>
val expected = toSeqNr - fromSeqNr
if (count == expected) {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
log.warn(
"Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
count,
expected,
persistenceId,
fromSeqNr,
toSeqNr)
false
}
}
.recoverWith { exc =>
log.warn(
"Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr,
exc)
Future.failed(exc)
}
case None => FutureFalse
}
}

case _ =>
triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope)
}
case _ =>
FutureFalse // no replay support for non typed envelopes
}
}
}
}

Expand Down Expand Up @@ -422,90 +502,6 @@ private[projection] object DynamoDBProjectionImpl {
}
}

private def replayIfPossible[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
offsetStore: DynamoDBOffsetStore,
originalEnvelope: Envelope,
handler: Handler[Envelope])(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = {
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
// FIXME config to make this case opt in
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] =>
val persistenceId = originalEventEnvelope.persistenceId
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
val toSeqNr = originalEventEnvelope.sequenceNr
provider.currentEventsByPersistenceId(persistenceId, fromSeqNr, toSeqNr) match {
case Some(querySource) =>
querySource
.mapAsync(1) { envelope =>
import DynamoDBOffsetStore.Validation._
offsetStore
.validate(envelope)
.flatMap {
case Accepted =>
if (isFilteredEvent(envelope)) {
offsetStore.addInflight(envelope)
FutureDone
} else {
handler
.process(envelope.asInstanceOf[Envelope])
.map { _ =>
offsetStore.addInflight(envelope)
Done
}
}
case Duplicate =>
FutureDone
case RejectedSeqNr =>
// this shouldn't happen
throw new RejectedEnvelope(
s"Replay due to rejected envelope was rejected. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].")
case RejectedBacktrackingSeqNr =>
// this shouldn't happen
throw new RejectedEnvelope(
s"Replay due to rejected envelope was rejected. Should not be from backtracking. PersistenceId [$persistenceId] seqNr [${envelope.sequenceNr}].")
}
}
.runFold(0) { case (acc, _) => acc + 1 }
.map { count =>
val expected = toSeqNr - fromSeqNr
if (count == expected) {
true
} else {
// it's expected to find all events, otherwise fail the replay attempt
log.warn(
"Replay due to rejected envelope found [{}] events, but expected [{}]. PersistenceId [{}] from seqNr [{}] to [{}].",
count,
expected,
persistenceId,
fromSeqNr,
toSeqNr)
false
}
}
.recoverWith { exc =>
log.warn(
"Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
persistenceId,
fromSeqNr,
originalEventEnvelope.sequenceNr,
exc)
Future.failed(exc)
}
case None => FutureFalse
}
}

case _ =>
triggerReplayIfPossible(sourceProvider, offsetStore, originalEnvelope)
}
case _ =>
FutureFalse // no replay support for non typed envelopes
}
}

private def throwRejectedEnvelope[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
envelope: Envelope): Nothing = {
Expand Down

0 comments on commit 5984826

Please sign in to comment.