diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index a60d574df..caeb263be 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -37,6 +37,7 @@ import akka.projection.StatusObserver import akka.projection.dynamodb.DynamoDBProjectionSettings import akka.projection.dynamodb.internal.DynamoDBOffsetStore.RejectedEnvelope import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.internal.ActorHandlerInit import akka.projection.internal.AtLeastOnce import akka.projection.internal.AtMostOnce @@ -202,9 +203,10 @@ private[projection] object DynamoDBProjectionImpl { case Duplicate => FutureDone case RejectedSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map(_ => Done)(ExecutionContext.parasitic) + replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map(_ => Done)( + ExecutionContext.parasitic) case RejectedBacktrackingSeqNr => - triggerReplayIfPossible(sourceProvider, offsetStore, envelope).map { + replayIfPossible(sourceProvider, offsetStore, envelope, delegate).map { case true => Done case false => throwRejectedEnvelope(sourceProvider, envelope) } @@ -420,6 +422,46 @@ private[projection] object DynamoDBProjectionImpl { } } + private def replayIfPossible[Offset, Envelope]( + sourceProvider: SourceProvider[Offset, Envelope], + offsetStore: DynamoDBOffsetStore, + envelope: Envelope, + handler: Handler[Envelope])(implicit ec: ExecutionContext, system: ActorSystem[_]): Future[Boolean] = { + envelope match { + case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 => + sourceProvider match { + // FIXME config to make this case opt in + case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] => + offsetStore.storedSeqNr(env.persistenceId).flatMap { storedSeqNr => + val fromSeqNr = storedSeqNr + 1 + provider.currentEventsByPersistenceId(env.persistenceId, fromSeqNr, env.sequenceNr) match { + case Some(querySource) => + querySource + .mapAsync(1) { env => + handler.process(env.asInstanceOf[Envelope]) + } + .run() + .map(_ => true) + .recoverWith { exc => + log.warn( + "Replay due to rejected envelope failed. PersistenceId [{}] from seqNr [{}] to [{}].", + env.persistenceId, + fromSeqNr, + env.sequenceNr) + triggerReplayIfPossible(sourceProvider, offsetStore, envelope) + } + case None => FutureFalse + } + } + + case _ => + triggerReplayIfPossible(sourceProvider, offsetStore, envelope) + } + case _ => + FutureFalse // no replay support for non typed envelopes + } + } + private def throwRejectedEnvelope[Offset, Envelope]( sourceProvider: SourceProvider[Offset, Envelope], envelope: Envelope): Nothing = { diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala index c67a849ac..563286e65 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala @@ -22,12 +22,14 @@ import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.javadsl.EventsByTagQuery import akka.persistence.query.javadsl.ReadJournal +import akka.persistence.query.typed.javadsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.query.typed.javadsl.EventTimestampQuery import akka.persistence.query.typed.javadsl.EventsBySliceQuery import akka.persistence.query.typed.javadsl.EventsBySliceStartingFromSnapshotsQuery import akka.persistence.query.typed.javadsl.LoadEventQuery import akka.projection.BySlicesSourceProvider import akka.projection.eventsourced.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider import akka.projection.internal.CanTriggerReplay import akka.projection.javadsl import akka.projection.javadsl.SourceProvider @@ -277,7 +279,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { override def readJournal: ReadJournal = eventsBySlicesQuery @@ -296,6 +299,20 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long) + : Option[akka.stream.scaladsl.Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr).asScala) + case _ => None // not supported by this query + } + } } /** diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala index ecdd38985..f1cc98b34 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala @@ -12,11 +12,14 @@ import scala.concurrent.Future import akka.NotUsed import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.scaladsl.EventsByTagQuery import akka.persistence.query.scaladsl.ReadJournal +import akka.persistence.query.typed +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.EventsBySliceQuery import akka.persistence.query.typed.scaladsl.EventsBySliceStartingFromSnapshotsQuery @@ -251,7 +254,8 @@ object EventSourcedProvider { extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] with BySlicesSourceProvider with EventTimestampQuerySourceProvider - with LoadEventQuerySourceProvider { + with LoadEventQuerySourceProvider + with LoadEventsByPersistenceIdSourceProvider[Event] { implicit val executionContext: ExecutionContext = system.executionContext override def readJournal: ReadJournal = eventsBySlicesQuery @@ -271,6 +275,19 @@ object EventSourcedProvider { override def extractCreationTime(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Long = envelope.timestamp + /** + * INTERNAL API + */ + @InternalApi override private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = { + eventsBySlicesQuery match { + case q: CurrentEventsByPersistenceIdTypedQuery => + Some(q.currentEventsByPersistenceIdTyped[Event](persistenceId, fromSequenceNr, toSequenceNr)) + case _ => None // not supported by this query + } + } } private class EventsBySlicesStartingFromSnapshotsSourceProvider[Snapshot, Event]( @@ -341,4 +358,18 @@ object EventSourcedProvider { } } + /** + * INTERNAL API + */ + @InternalApi private[akka] trait LoadEventsByPersistenceIdSourceProvider[Event] { + + /** + * INTERNAL API + */ + @InternalApi private[akka] def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Option[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] + } + }