Skip to content

Commit

Permalink
fix: DynamoDB Opt-in to replay rejected events
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Nov 28, 2024
1 parent c5a614b commit 7163160
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import akka.projection.StatusObserver
import akka.projection.dynamodb.DynamoDBProjectionSettings
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.RejectedEnvelope
import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.internal.ActorHandlerInit
import akka.projection.internal.AtLeastOnce
import akka.projection.internal.AtMostOnce
Expand Down Expand Up @@ -202,9 +203,10 @@ private[projection] object DynamoDBProjectionImpl {
case Duplicate =>
FutureDone
case RejectedSeqNr =>
triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)(ExecutionContext.parasitic)
replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map(_ => Done)(
ExecutionContext.parasitic)
case RejectedBacktrackingSeqNr =>
triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map {
replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map {
case true => Done
case false => throwRejectedEnvelope(sourceProvider, envelope)
}
Expand Down Expand Up @@ -420,6 +422,46 @@ private[projection] object DynamoDBProjectionImpl {
}
}

private def replayIfPossible[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
offsetStore: DynamoDBOffsetStore,
envelope: Envelope,
handler: Handler[Envelope])(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = {
envelope match {
case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 =>
sourceProvider match {
// FIXME config to make this case opt in
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] =>
offsetStore.storedSeqNr(env.persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
provider.currentEventsByPersistenceId(env.persistenceId, fromSeqNr, env.sequenceNr) match {
case Some(querySource) =>
querySource
.mapAsync(1) { env =>
handler.process(env.asInstanceOf[Envelope])
}
.run()
.map(_ => true)
.recoverWith { exc =>
log.warn(
"Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].",
env.persistenceId,
fromSeqNr,
env.sequenceNr)
triggerReplayIfPossible(sourceProvider, offsetStore, envelope)
}
case None => FutureFalse
}
}

case _ =>
triggerReplayIfPossible(sourceProvider, offsetStore, envelope)
}
case _ =>
FutureFalse // no replay support for non typed envelopes
}
}

private def throwRejectedEnvelope[Offset, Envelope](
sourceProvider: SourceProvider[Offset, Envelope],
envelope: Envelope): Nothing = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.javadsl.EventsByTagQuery
import akka.persistence.query.javadsl.ReadJournal
import akka.persistence.query.typed.javadsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.query.typed.javadsl.EventTimestampQuery
import akka.persistence.query.typed.javadsl.EventsBySliceQuery
import akka.persistence.query.typed.javadsl.EventsBySliceStartingFromSnapshotsQuery
import akka.persistence.query.typed.javadsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.eventsourced.EventEnvelope
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.internal.CanTriggerReplay
import akka.projection.javadsl
import akka.projection.javadsl.SourceProvider
Expand Down Expand Up @@ -277,7 +279,8 @@ object EventSourcedProvider {
extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuerySourceProvider
with LoadEventQuerySourceProvider {
with LoadEventQuerySourceProvider
with LoadEventsByPersistenceIdSourceProvider[Event] {

override def readJournal: ReadJournal = eventsBySlicesQuery

Expand All @@ -296,6 +299,20 @@ object EventSourcedProvider {
override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long =
envelope.timestamp

/**
* INTERNAL API
*/
@InternalApi override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long)
: Option[akka.stream.scaladsl.Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = {
eventsBySlicesQuery match {
case q: CurrentEventsByPersistenceIdTypedQuery =>
Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr).asScala)
case _ => None // not supported by this query
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import scala.concurrent.Future

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.persistence.query.scaladsl.ReadJournal
import akka.persistence.query.typed
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.EventsBySliceQuery
import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery
Expand Down Expand Up @@ -251,7 +254,8 @@ object EventSourcedProvider {
extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuerySourceProvider
with LoadEventQuerySourceProvider {
with LoadEventQuerySourceProvider
with LoadEventsByPersistenceIdSourceProvider[Event] {
implicit val executionContext: ExecutionContext = system.executionContext

override def readJournal: ReadJournal = eventsBySlicesQuery
Expand All @@ -271,6 +275,19 @@ object EventSourcedProvider {
override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long =
envelope.timestamp

/**
* INTERNAL API
*/
@InternalApi override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = {
eventsBySlicesQuery match {
case q: CurrentEventsByPersistenceIdTypedQuery =>
Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr))
case _ => None // not supported by this query
}
}
}

private class EventsBySlicesStartingFromSnapshotsSourceProvider[Snapshot, Event](
Expand Down Expand Up @@ -341,4 +358,18 @@ object EventSourcedProvider {
}
}

/**
* INTERNAL API
*/
@InternalApi private[akka] trait LoadEventsByPersistenceIdSourceProvider[Event] {

/**
* INTERNAL API
*/
@InternalApi private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]]
}

}

0 comments on commit 7163160

Please sign in to comment.