Skip to content

Commit

Permalink
fix: [DynamoDB] Ensure that inflight persistence IDs are not evicted (#…
Browse files Browse the repository at this point in the history
…1269)


---------

Co-authored-by: Patrik Nordwall <patrik.nordwall@gmail.com>
Co-authored-by: Peter Vlugter <59895+pvlugter@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 26, 2024
1 parent 39ec093 commit a639151
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
state6.offsetBySlice(slice("p10084")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L, "p10084" -> 9))
}

val allowAll: Any => Boolean = _ => true

"evict old" in {
val p1 = "p500" // slice 645
val p2 = "p621" // slice 645
Expand All @@ -107,17 +109,17 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

// keep all
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1
state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000), allowAll) shouldBe state1

// evict older than time window
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll)
state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L)

val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10))))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2))
val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2), allowAll)
state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L)

val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2))
val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2), allowAll)
state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(
p1 -> 1L,
p2 -> 2L,
Expand Down Expand Up @@ -156,21 +158,43 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match
val timeWindow = JDuration.ofMillis(1)

val state2 = slices.foldLeft(state1) {
case (acc, slice) => acc.evict(slice, timeWindow)
case (acc, slice) => acc.evict(slice, timeWindow, allowAll)
}
// note that p92 is evicted because it has same slice as p108
// p1 is kept because keeping one for each slice
state2.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)

val state3 = slices.foldLeft(state2) {
case (acc, slice) => acc.evict(slice, timeWindow)
case (acc, slice) => acc.evict(slice, timeWindow, allowAll)
}
// still keeping one for each slice
state3.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p1" -> 1L, "p108" -> 3L, "p4" -> 4L, "p5" -> 5L)
}

"evict old but only those allowed to be evicted" in {
val t0 = TestClock.nowMillis().instant()
val state1 = State.empty.add(
Vector(
createRecord("p92", 2, t0.plusMillis(1)),
createRecord("p108", 3, t0.plusMillis(20)),
createRecord("p229", 4, t0.plusMillis(30))))

val slices = state1.bySliceSorted.keySet
slices.size shouldBe 1

state1
.evict(slices.head, JDuration.ofMillis(1), allowAll)
.byPid
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p229" -> 4) // p92 and p108 evicted

state1
.evict(slices.head, JDuration.ofMillis(1), _.pid == "p108")
.byPid // allow only p108 to be evicted
.map { case (pid, r) => pid -> r.seqNr } shouldBe Map("p92" -> 2, "p229" -> 4)
}

"find duplicate" in {
val t0 = TestClock.nowMillis().instant()
val state =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,19 @@ private[projection] object DynamoDBOffsetStore {
type Pid = String

final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] {

override def compare(that: Record): Int = {
val result = this.timestamp.compareTo(that.timestamp)
if (result == 0) {
if (this.slice == that.slice)
if (this.pid == that.pid)
if (this.seqNr == that.seqNr)
0
else
java.lang.Long.compare(this.seqNr, that.seqNr)
else
this.pid.compareTo(that.pid)
else Integer.compare(this.slice, that.slice)
} else {
result
override def compare(that: Record): Int =
timestamp.compareTo(that.timestamp) match {
case 0 =>
Integer.compare(slice, that.slice) match {
case 0 =>
pid.compareTo(that.pid) match {
case 0 => java.lang.Long.compare(seqNr, that.seqNr)
case result => result
}
case result => result
}
case result => result
}
}
}

final case class RecordWithOffset(
Expand Down Expand Up @@ -147,14 +143,27 @@ private[projection] object DynamoDBOffsetStore {
}
}

def evict(slice: Int, timeWindow: JDuration): State = {
def evict(slice: Int, timeWindow: JDuration, ableToEvictRecord: Record => Boolean): State = {
val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record])
if (recordsSortedByTimestamp.isEmpty) {
this
} else {
val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow)
val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until))
if (filtered.size == recordsSortedByTimestamp.size) {
val filtered = {
// Records comparing >= this record by recordOrdering will definitely be kept,
// Records comparing < this record by recordOrdering are subject to eviction
// Slice will be equal, and pid will compare lexicographically less than any valid pid
val untilRecord = Record(slice, "", 0, until)
val newerRecords = recordsSortedByTimestamp.rangeFrom(untilRecord) // inclusive of until
val olderRecords = recordsSortedByTimestamp.rangeUntil(untilRecord) // exclusive of until
val filteredOlder = olderRecords.filterNot(ableToEvictRecord)

if (filteredOlder.size == olderRecords.size) recordsSortedByTimestamp
else newerRecords.union(filteredOlder)
}

// adding back filtered is linear in the size of filtered, but so is checking if we're able to evict
if (filtered eq recordsSortedByTimestamp) {
this
} else {
val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice }
Expand Down Expand Up @@ -394,11 +403,12 @@ private[projection] class DynamoDBOffsetStore(
storeSequenceNumbers: IndexedSeq[Record] => Future[Done],
canBeConcurrent: Boolean): Future[Done] = {
load(records.map(_.pid)).flatMap { oldState =>
val filteredRecords = {
val filteredRecords =
if (records.size <= 1)
records.filterNot(oldState.isDuplicate)
else {
// use last record for each pid
// Can assume (given other projection guarantees) that records for the same pid
// have montonically increasing sequence numbers
records
.groupBy(_.pid)
.valuesIterator
Expand All @@ -407,7 +417,7 @@ private[projection] class DynamoDBOffsetStore(
}
.toVector
}
}

if (filteredRecords.isEmpty) {
FutureDone
} else {
Expand All @@ -417,8 +427,18 @@ private[projection] class DynamoDBOffsetStore(
if (filteredRecords.size == 1) Set(filteredRecords.head.slice)
else filteredRecords.iterator.map(_.slice).toSet

val currentInflight = getInflight()
val evictedNewState = slices.foldLeft(newState) {
case (s, slice) => s.evict(slice, settings.timeWindow)
case (s, slice) =>
s.evict(
slice,
settings.timeWindow,
// Only persistence IDs that aren't inflight are evictable,
// if only so that those persistence IDs can be removed from
// inflight... in the absence of further records from that
// persistence ID, the next store will evict (further records
// would make that persistence ID recent enough to not be evicted)
record => !currentInflight.contains(record.pid))
}

// FIXME we probably don't have to store the latest offset per slice all the time, but can
Expand Down Expand Up @@ -463,7 +483,7 @@ private[projection] class DynamoDBOffsetStore(
if (newInflight.size >= 10000) {
throw new IllegalStateException(
s"Too many envelopes in-flight [${newInflight.size}]. " +
"Please report this issue at https://github.com/akka/akka-persistence-dynamodb")
"Please report this issue at https://github.com/akka/akka-projection")
}
if (!inflight.compareAndSet(currentInflight, newInflight))
cleanupInflight(newState) // CAS retry, concurrent update of inflight
Expand Down

0 comments on commit a639151

Please sign in to comment.