Skip to content

Commit

Permalink
feat: Start projection from custom offset (#943)
Browse files Browse the repository at this point in the history
mostly intended for Projections over gRPC
  • Loading branch information
patriknw authored Jul 6, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c4db6b6 commit 27a7d5a
Showing 9 changed files with 452 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# internal changes
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.state.javadsl.DurableStateSourceProvider#DurableStateBySlicesSourceProvider.executionContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.state.javadsl.DurableStateSourceProvider#DurableStateStoreQuerySourceProvider.executionContext")

Original file line number Diff line number Diff line change
@@ -8,9 +8,7 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.annotation.nowarn

import akka.NotUsed
import akka.actor.typed.ActorSystem
@@ -53,21 +51,22 @@ object DurableStateSourceProvider {
new DurableStateStoreQuerySourceProvider(durableStateStoreQuery, tag, system)
}

/**
* INTERNAL API
*/
@InternalApi
@nowarn("msg=never used") // system
private class DurableStateStoreQuerySourceProvider[A](
durableStateStoreQuery: DurableStateStoreQuery[A],
tag: String,
system: ActorSystem[_])
extends javadsl.SourceProvider[Offset, DurableStateChange[A]] {
implicit val executionContext: ExecutionContext = system.executionContext

override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
val source: Future[Source[DurableStateChange[A], NotUsed]] = offsetAsync.get().toScala.map { offsetOpt =>
durableStateStoreQuery
.changes(tag, offsetOpt.orElse(NoOffset))
offsetAsync.get().thenApply { storedOffset =>
durableStateStoreQuery.changes(tag, storedOffset.orElse(NoOffset))
}
source.toJava
}

override def extractOffset(stateChange: DurableStateChange[A]): Offset = stateChange.offset
@@ -117,6 +116,11 @@ object DurableStateSourceProvider {
.getDurableStateStoreFor(classOf[DurableStateStoreBySliceQuery[Any]], durableStateStoreQueryPluginId)
.sliceRanges(numberOfRanges)

/**
* INTERNAL API
*/
@InternalApi
@nowarn("msg=never used") // system
private class DurableStateBySlicesSourceProvider[A](
durableStateStoreQuery: DurableStateStoreBySliceQuery[A],
entityType: String,
@@ -126,15 +130,12 @@ object DurableStateSourceProvider {
extends SourceProvider[Offset, DurableStateChange[A]]
with BySlicesSourceProvider
with DurableStateStore[A] {
implicit val executionContext: ExecutionContext = system.executionContext

override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[DurableStateChange[A], NotUsed]] = {
val source: Future[Source[DurableStateChange[A], NotUsed]] = offsetAsync.get().toScala.map { offsetOpt =>
durableStateStoreQuery
.changesBySlices(entityType, minSlice, maxSlice, offsetOpt.orElse(NoOffset))
offsetAsync.get().thenApply { storedOffset =>
durableStateStoreQuery.changesBySlices(entityType, minSlice, maxSlice, storedOffset.orElse(NoOffset))
}
source.toJava
}

override def extractOffset(stateChange: DurableStateChange[A]): Offset = stateChange.offset
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# internal changes
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.eventsourced.javadsl.EventSourcedProvider#EventsBySlicesSourceProvider.executionContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.eventsourced.javadsl.EventSourcedProvider#EventsBySlicesSourceProvider.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.eventsourced.javadsl.EventSourcedProvider#EventsByTagSourceProvider.executionContext")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.projection.eventsourced.scaladsl.EventSourcedProvider#EventsBySlicesSourceProvider.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.projection.eventsourced.scaladsl.EventSourcedProvider#EventsByTagSourceProvider.this")
Original file line number Diff line number Diff line change
@@ -9,10 +9,9 @@ import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import java.util.function.{ Function => JFunction }

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.annotation.nowarn

import akka.NotUsed
import akka.actor.typed.ActorSystem
@@ -56,21 +55,20 @@ object EventSourcedProvider {
* INTERNAL API
*/
@InternalApi
@nowarn("msg=never used") // system
private class EventsByTagSourceProvider[Event](
system: ActorSystem[_],
eventsByTagQuery: EventsByTagQuery,
tag: String)
extends javadsl.SourceProvider[Offset, EventEnvelope[Event]] {
implicit val executionContext: ExecutionContext = system.executionContext

override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[EventEnvelope[Event], NotUsed]] = {
val source: Future[Source[EventEnvelope[Event], NotUsed]] = offsetAsync.get().toScala.map { offsetOpt =>
offsetAsync.get().thenApply { storedOffset =>
eventsByTagQuery
.eventsByTag(tag, offsetOpt.orElse(NoOffset))
.eventsByTag(tag, storedOffset.orElse(NoOffset))
.map(env => EventEnvelope(env))
}
source.toJava
}

override def extractOffset(envelope: EventEnvelope[Event]): Offset = envelope.offset
@@ -89,23 +87,67 @@ object EventSourcedProvider {
eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice)
}

/**
* By default, the `SourceProvider` uses the stored offset when starting the Projection. This offset can be adjusted
* by defining the `adjustStartOffset` function, which is a function from loaded offset (if any) to the
* adjusted offset that will be used to by the `eventsBySlicesQuery`.
*/
def eventsBySlices[Event](
system: ActorSystem[_],
readJournalPluginId: String,
entityType: String,
minSlice: Int,
maxSlice: Int,
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
: SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceQuery], readJournalPluginId)
eventsBySlices(system, eventsBySlicesQuery, entityType, minSlice, maxSlice, adjustStartOffset)
}

def eventsBySlices[Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
minSlice: Int,
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
maxSlice: Int): SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] =
eventsBySlices(
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
(offset: Optional[Offset]) => CompletableFuture.completedFuture(offset))

/**
* By default, the `SourceProvider` uses the stored offset when starting the Projection. This offset can be adjusted
* by defining the `adjustStartOffset` function, which is a function from loaded offset (if any) to the
* adjusted offset that will be used to by the `eventsBySlicesQuery`.
*/
def eventsBySlices[Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
minSlice: Int,
maxSlice: Int,
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
: SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
eventsBySlicesQuery match {
case query: EventsBySliceQuery with CanTriggerReplay =>
new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
with CanTriggerReplay {
new EventsBySlicesSourceProvider[Event](
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
adjustStartOffset) with CanTriggerReplay {

private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)

}
case _ =>
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
new EventsBySlicesSourceProvider(system, eventsBySlicesQuery, entityType, minSlice, maxSlice, adjustStartOffset)
}
}

@@ -122,36 +164,82 @@ object EventSourcedProvider {
eventsBySlicesStartingFromSnapshots(system, eventsBySlicesQuery, entityType, minSlice, maxSlice, transformSnapshot)
}

/**
* By default, the `SourceProvider` uses the stored offset when starting the Projection. This offset can be adjusted
* by defining the `adjustStartOffset` function, which is a function from loaded offset (if any) to the
* adjusted offset that will be used to by the `eventsBySlicesQuery`.
*/
def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
system: ActorSystem[_],
readJournalPluginId: String,
entityType: String,
minSlice: Int,
maxSlice: Int,
transformSnapshot: java.util.function.Function[Snapshot, Event],
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
: SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
val eventsBySlicesQuery =
PersistenceQuery(system).getReadJournalFor(classOf[EventsBySliceStartingFromSnapshotsQuery], readJournalPluginId)
eventsBySlicesStartingFromSnapshots(
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
transformSnapshot,
adjustStartOffset)
}

def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceStartingFromSnapshotsQuery,
entityType: String,
minSlice: Int,
maxSlice: Int,
transformSnapshot: java.util.function.Function[Snapshot, Event])
: SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] =
eventsBySlicesStartingFromSnapshots(
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
transformSnapshot,
(offset: Optional[Offset]) => CompletableFuture.completedFuture(offset))

def eventsBySlicesStartingFromSnapshots[Snapshot, Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceStartingFromSnapshotsQuery,
entityType: String,
minSlice: Int,
maxSlice: Int,
transformSnapshot: java.util.function.Function[Snapshot, Event],
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
: SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]] = {
eventsBySlicesQuery match {
case query: EventsBySliceQuery with CanTriggerReplay =>
new EventsBySlicesStartingFromSnapshotsSourceProvider[Snapshot, Event](
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
transformSnapshot,
system) with CanTriggerReplay {
adjustStartOffset) with CanTriggerReplay {

private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)

}
case _ =>
new EventsBySlicesStartingFromSnapshotsSourceProvider(
system,
eventsBySlicesQuery,
entityType,
minSlice,
maxSlice,
transformSnapshot,
system)
adjustStartOffset)
}
}

@@ -172,28 +260,29 @@ object EventSourcedProvider {
* INTERNAL API
*/
@InternalApi
@nowarn("msg=never used") // system
private class EventsBySlicesSourceProvider[Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceQuery,
entityType: String,
override val minSlice: Int,
override val maxSlice: Int,
system: ActorSystem[_])
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuerySourceProvider
with LoadEventQuerySourceProvider {
implicit val executionContext: ExecutionContext = system.executionContext

override def readJournal: ReadJournal = eventsBySlicesQuery

override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = {
val source: Future[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
offsetAsync.get().toScala.map { offsetOpt =>
offsetAsync.get().thenCompose { storedOffset =>
adjustStartOffset(storedOffset).thenApply { startOffset =>
eventsBySlicesQuery
.eventsBySlices(entityType, minSlice, maxSlice, offsetOpt.orElse(NoOffset))
.eventsBySlices(entityType, minSlice, maxSlice, startOffset.orElse(NoOffset))
}
source.toJava
}
}

override def extractOffset(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
@@ -207,34 +296,35 @@ object EventSourcedProvider {
* INTERNAL API
*/
@InternalApi
@nowarn("msg=never used") // system
private class EventsBySlicesStartingFromSnapshotsSourceProvider[Snapshot, Event](
system: ActorSystem[_],
eventsBySlicesQuery: EventsBySliceStartingFromSnapshotsQuery,
entityType: String,
override val minSlice: Int,
override val maxSlice: Int,
transformSnapshot: java.util.function.Function[Snapshot, Event],
system: ActorSystem[_])
adjustStartOffset: JFunction[Optional[Offset], CompletionStage[Optional[Offset]]])
extends SourceProvider[Offset, akka.persistence.query.typed.EventEnvelope[Event]]
with BySlicesSourceProvider
with EventTimestampQuerySourceProvider
with LoadEventQuerySourceProvider {
implicit val executionContext: ExecutionContext = system.executionContext

override def readJournal: ReadJournal = eventsBySlicesQuery

override def source(offsetAsync: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] = {
val source: Future[Source[akka.persistence.query.typed.EventEnvelope[Event], NotUsed]] =
offsetAsync.get().toScala.map { offsetOpt =>
offsetAsync.get().thenCompose { storedOffset =>
adjustStartOffset(storedOffset).thenApply { startOffset =>
eventsBySlicesQuery
.eventsBySlicesStartingFromSnapshots(
entityType,
minSlice,
maxSlice,
offsetOpt.orElse(NoOffset),
startOffset.orElse(NoOffset),
transformSnapshot)
}
source.toJava
}
}

override def extractOffset(envelope: akka.persistence.query.typed.EventEnvelope[Event]): Offset = envelope.offset
Loading

0 comments on commit 27a7d5a

Please sign in to comment.