Skip to content

Commit

Permalink
fix: DynamoDB Timestamp validation of previous seqNr (#1258)
Browse files Browse the repository at this point in the history
* base the validaiton time window for the previous sequence number
on start timestamp and backtracking window
  • Loading branch information
patriknw authored Nov 22, 2024
1 parent 17cd73d commit 197d260
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,15 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
private def createOffsetStore(
projectionId: ProjectionId,
customSettings: DynamoDBProjectionSettings = settings,
offsetStoreClock: TestClock = clock,
eventTimestampQueryClock: TestClock = clock) =
new DynamoDBOffsetStore(
projectionId,
Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, eventTimestampQueryClock)),
system,
customSettings,
client)
client,
offsetStoreClock)

def createEnvelope(pid: Pid, seqNr: SeqNr, timestamp: Instant, event: String): EventEnvelope[String] = {
val entityType = PersistenceId.extractEntityType(pid)
Expand Down Expand Up @@ -559,6 +561,11 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestampBySlice.size shouldBe 0
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestampBySlice.values.toSet shouldBe Set(clock.instant())

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
Expand Down Expand Up @@ -1033,5 +1040,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
// "set offset" in {
// "clear offset" in {

"validate timestamp of previous sequence number" in {
import DynamoDBOffsetStore.Validation._

val projectionName = UUID.randomUUID().toString

def offsetStore(minSlice: Int, maxSlice: Int) =
new DynamoDBOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
system,
settings,
client,
clock)

// one projection at lower scale
val offsetStore1 = offsetStore(512, 1023)

// two projections at higher scale
val offsetStore2 = offsetStore(512, 767)

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
offsetStore2.readOffset().futureValue
offsetStore2.getState().startTimestampBySlice(576) shouldBe time(2)
val slice640StartTimestamp = offsetStore2.getState().startTimestampBySlice(640)
slice640StartTimestamp shouldBe clock.instant()
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted

}

}
}
6 changes: 6 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ akka.projection.dynamodb {
# within this time window from latest offset.
time-window = 5 minutes

# Backtracking window of the source (query). Should be equal to
# the akka.persistence.dynamodb.query.backtracking.window that is used for the
# SourceProvider.
# It should not be larger than the akka.projection.dynamodb.offset-store.time-window.
backtracking-window = ${akka.persistence.dynamodb.query.backtracking.window}

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object DynamoDBProjectionSettings {
timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"),
useClient = config.getString("use-client"),
timeWindow = config.getDuration("offset-store.time-window"),
backtrackingWindow = config.getDuration("offset-store.backtracking-window"),
keepNumberOfEntries = 0,
evictInterval = JDuration.ZERO,
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
Expand All @@ -62,6 +63,7 @@ final class DynamoDBProjectionSettings private (
val timestampOffsetTable: String,
val useClient: String,
val timeWindow: JDuration,
val backtrackingWindow: JDuration,
@deprecated("Not used, evict is only based on time window", "1.6.2")
val keepNumberOfEntries: Int,
@deprecated("Not used, evict is not periodic", "1.6.2")
Expand All @@ -84,6 +86,12 @@ final class DynamoDBProjectionSettings private (
def withTimeWindow(timeWindow: JDuration): DynamoDBProjectionSettings =
copy(timeWindow = timeWindow)

def withBacktrackingWindow(backtrackingWindow: FiniteDuration): DynamoDBProjectionSettings =
copy(backtrackingWindow = backtrackingWindow.toJava)

def withBacktrackingWindow(backtrackingWindow: JDuration): DynamoDBProjectionSettings =
copy(backtrackingWindow = backtrackingWindow)

@deprecated("Not used, evict is only based on time window", "1.6.2")
def withKeepNumberOfEntries(keepNumberOfEntries: Int): DynamoDBProjectionSettings =
this
Expand Down Expand Up @@ -116,6 +124,7 @@ final class DynamoDBProjectionSettings private (
timestampOffsetTable: String = timestampOffsetTable,
useClient: String = useClient,
timeWindow: JDuration = timeWindow,
backtrackingWindow: JDuration = backtrackingWindow,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
Expand All @@ -125,6 +134,7 @@ final class DynamoDBProjectionSettings private (
timestampOffsetTable,
useClient,
timeWindow,
backtrackingWindow,
keepNumberOfEntries,
evictInterval,
warnAboutFilteredEventsInFlow,
Expand All @@ -134,7 +144,7 @@ final class DynamoDBProjectionSettings private (
retrySettings)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $backtrackingWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
}

object TimeToLiveSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.projection.dynamodb.internal

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -73,18 +74,21 @@ private[projection] object DynamoDBOffsetStore {
fromSnapshot: Boolean)

object State {
val empty: State = State(Map.empty, Map.empty, Map.empty)
val empty: State = State(Map.empty, Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset]): State =
if (offsetBySlice.isEmpty) empty
else new State(Map.empty, Map.empty, offsetBySlice)
def apply(offsetBySlice: Map[Int, TimestampOffset], startTimestampBySlice: Map[Int, Instant]): State =
if (offsetBySlice.isEmpty && startTimestampBySlice.isEmpty)
empty
else
new State(Map.empty, Map.empty, offsetBySlice, startTimestampBySlice)

}

final case class State(
byPid: Map[Pid, Record],
bySliceSorted: Map[Int, TreeSet[Record]],
offsetBySlice: Map[Int, TimestampOffset]) {
offsetBySlice: Map[Int, TimestampOffset],
startTimestampBySlice: Map[Int, Instant]) {

def size: Int = byPid.size

Expand Down Expand Up @@ -191,7 +195,8 @@ private[projection] class DynamoDBOffsetStore(
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: DynamoDBProjectionSettings,
client: DynamoDbAsyncClient) {
client: DynamoDbAsyncClient,
clock: Clock = Clock.systemUTC()) {

import DynamoDBOffsetStore._

Expand Down Expand Up @@ -282,7 +287,13 @@ private[projection] class DynamoDBOffsetStore(
})

offsetBySliceFut.map { offsetBySlice =>
val newState = State(offsetBySlice)
val now = clock.instant()
val startTimestampBySlice =
(minSlice to maxSlice).map { slice =>
slice -> offsetBySlice.get(slice).map(_.timestamp).getOrElse(now)
}.toMap

val newState = State(offsetBySlice, startTimestampBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
Expand Down Expand Up @@ -560,32 +571,6 @@ private[projection] class DynamoDBOffsetStore(
recordWithOffset.offset)
}

def logUnknown(): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"{} Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
} else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"{} Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
}
}

if (prevSeqNr > 0) {
// expecting seqNr to be +1 of previously known
val ok = seqNr == prevSeqNr + 1
Expand Down Expand Up @@ -613,35 +598,7 @@ private[projection] class DynamoDBOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
if (previousTimestamp.isBefore(before)) {
logger.debug(
"{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
logPrefix,
pid,
seqNr,
previousTimestamp,
before)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown()
RejectedSeqNr
} else {
logUnknown()
// This will result in projection restart (with normal configuration)
RejectedBacktrackingSeqNr
}
case None =>
// previous not found, could have been deleted
Accepted
}
validateEventTimestamp(currentState, recordWithOffset)
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
Expand All @@ -663,6 +620,66 @@ private[projection] class DynamoDBOffsetStore(
}
}

private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = {
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore =
currentState.startTimestampBySlice(slice).minus(settings.backtrackingWindow)

if (previousTimestamp.isBefore(acceptBefore)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
pid,
seqNr,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
Accepted
} else if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
RejectedSeqNr
} else if (recordWithOffset.fromBacktracking) {
// This will result in projection restart (with normal configuration)
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
// Backtracking will emit missed event again.
RejectedSeqNr
}
case None =>
// previous not found, could have been deleted
logger.debug("Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", pid, seqNr)
Accepted
}
}

@tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
Expand Down

0 comments on commit 197d260

Please sign in to comment.