Skip to content

Commit

Permalink
add opt-in config setting for replay
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Nov 29, 2024
1 parent 5984826 commit 5d9dcb8
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -904,12 +904,18 @@ class DynamoDBTimestampOffsetProjectionSpec

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val projectionRef = spawn(
ProjectionBehavior(DynamoDBProjection
.atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository))))
ProjectionBehavior(
DynamoDBProjection
.atLeastOnce(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => new ConcatHandler(repository))))

eventually {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1)
Expand Down Expand Up @@ -960,8 +966,13 @@ class DynamoDBTimestampOffsetProjectionSpec
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val projectionRef = spawn(
ProjectionBehavior(DynamoDBProjection
.atLeastOnce(projectionId, Some(settings), sourceProvider, handler = () => new ConcatHandler(repository))))
ProjectionBehavior(
DynamoDBProjection
.atLeastOnce(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => new ConcatHandler(repository))))

eventually {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1)
Expand Down
3 changes: 3 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ akka.projection.dynamodb {
}
}

# Replay missed events for a particular persistence id when a sequence number is rejected by validation.
replay-on-rejected-sequence-numbers = off

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
# To use a separate client for projections this can be
# set to another config path that defines the config based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object DynamoDBProjectionSettings {
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")),
retrySettings = RetrySettings(config.getConfig("offset-store.retries")))
retrySettings = RetrySettings(config.getConfig("offset-store.retries")),
replayOnRejectedSequenceNumbers = config.getBoolean("replay-on-rejected-sequence-numbers"))
}

/**
Expand All @@ -72,7 +73,8 @@ final class DynamoDBProjectionSettings private (
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings,
val retrySettings: RetrySettings) {
val retrySettings: RetrySettings,
val replayOnRejectedSequenceNumbers: Boolean) {

// 25 is a hard limit of batch writes in DynamoDB
require(offsetBatchSize <= 25, s"offset-batch-size must be <= 25, was [$offsetBatchSize]")
Expand Down Expand Up @@ -122,6 +124,9 @@ final class DynamoDBProjectionSettings private (
def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings =
copy(retrySettings = retrySettings)

def withReplayOnRejectedSequenceNumbers(replayOnRejectedSequenceNumbers: Boolean): DynamoDBProjectionSettings =
copy(replayOnRejectedSequenceNumbers = replayOnRejectedSequenceNumbers)

@nowarn("msg=deprecated")
private def copy(
timestampOffsetTable: String = timestampOffsetTable,
Expand All @@ -132,7 +137,8 @@ final class DynamoDBProjectionSettings private (
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings,
retrySettings: RetrySettings = retrySettings) =
retrySettings: RetrySettings = retrySettings,
replayOnRejectedSequenceNumbers: Boolean = replayOnRejectedSequenceNumbers) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
Expand All @@ -144,10 +150,24 @@ final class DynamoDBProjectionSettings private (
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings,
retrySettings)
retrySettings,
replayOnRejectedSequenceNumbers)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $backtrackingWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
@nowarn("msg=deprecated")
override def toString: String =
s"DynamoDBProjectionSettings(" +
s"timestampOffsetTable=$timestampOffsetTable, " +
s"useClient=$useClient, " +
s"timeWindow=$timeWindow, " +
s"backtrackingWindow=$backtrackingWindow, " +
s"keepNumberOfEntries=$keepNumberOfEntries, " +
s"evictInterval=$evictInterval, " +
s"warnAboutFilteredEventsInFlow=$warnAboutFilteredEventsInFlow, " +
s"offsetBatchSize=$offsetBatchSize, " +
s"offsetSliceReadParallelism=$offsetSliceReadParallelism, " +
s"timeToLiveSettings=$timeToLiveSettings, " +
s"retrySettings=$retrySettings, " +
s"replayOnRejectedSequenceNumbers=$replayOnRejectedSequenceNumbers)"
}

object TimeToLiveSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private[projection] class DynamoDBOffsetStore(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: DynamoDBProjectionSettings,
val settings: DynamoDBProjectionSettings,
client: DynamoDbAsyncClient,
clock: Clock = Clock.systemUTC()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ private[projection] object DynamoDBProjectionImpl {
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
// FIXME config to make this case opt in
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked] =>
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
offsetStore.storedSeqNr(persistenceId).flatMap { storedSeqNr =>
val fromSeqNr = storedSeqNr + 1
Expand Down

0 comments on commit 5d9dcb8

Please sign in to comment.