Skip to content

Commit

Permalink
fix: condition for deleting old timestamp offsets (#1186)
Browse files Browse the repository at this point in the history
* add test cases for eviction/deletion overlap scenarios
* always trigger deletion after eviction
  • Loading branch information
pvlugter authored Sep 3, 2024
1 parent a6345e4 commit 88aa4dc
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,172 @@ class R2dbcTimestampOffsetStoreSpec
}
}

"delete old records triggered by time window, while still within entries limit" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings
.withKeepNumberOfEntries(10)
.withTimeWindow(JDuration.ofSeconds(100))
.withEvictInterval(JDuration.ofSeconds(10))
val offsetStore = createOffsetStore(projectionId, evictSettings)

import evictSettings.{ evictInterval, timeWindow }

val t0 = TestClock.nowMicros().instant()
log.debug("Start time [{}]", t0)

val p1 = "p500" // slice 645
val p2 = "p92" // slice 905
val p3 = "p108" // slice 905
val p4 = "p863" // slice 645
val p5 = "p984" // slice 645
val p6 = "p3080" // slice 645
val p7 = "p4290" // slice 645
val p8 = "p20180" // slice 645

val t1 = t0.plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue

val t2 = t0.plusSeconds(2)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t2, Map(p2 -> 1L)), p2, 1L)).futureValue

val t3 = t0.plusSeconds(3)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue

val t4 = t0.plus(evictInterval).plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue

val t5 = t0.plus(evictInterval).plusSeconds(2)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue

val t6 = t0.plus(evictInterval).plusSeconds(3)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue

offsetStore.getState().size shouldBe 6

val t7 = t0.plus(timeWindow.minusSeconds(10))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue

offsetStore.getState().size shouldBe 7 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window)

val t8 = t0.plus(timeWindow.plus(evictInterval).minusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().size shouldBe 8 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p1@t1 and p2@t2, kept p3@t3 (latest)

val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().size shouldBe 8 // no eviction (outside eviction window, but within keep-number-of-entries)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5, kept p3@t3 (latest)

offsetStore.getState().byPid.keySet shouldBe Set(p1, p2, p3, p4, p5, p6, p7, p8)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8)
}

"delete old records triggered after eviction" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings
.withKeepNumberOfEntries(5)
.withTimeWindow(JDuration.ofSeconds(100))
.withEvictInterval(JDuration.ofSeconds(10))
val offsetStore = createOffsetStore(projectionId, evictSettings)

import evictSettings.{ evictInterval, timeWindow }

val t0 = TestClock.nowMicros().instant()
log.debug("Start time [{}]", t0)

// all slice 645
val p1 = "p500"
val p2 = "p621"
val p3 = "p742"
val p4 = "p863"
val p5 = "p984"
val p6 = "p3080"
val p7 = "p4290"
val p8 = "p20180"
val p9 = "p21390"
val p10 = "p31070"
val p11 = "p31191"
val p12 = "p32280"

val t1 = t0.plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue

val t2 = t0.plusSeconds(2)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t2, Map(p2 -> 1L)), p2, 1L)).futureValue

val t3 = t0.plusSeconds(3)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue

val t4 = t0.plus(evictInterval).plusSeconds(7)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue

val t5 = t0.plus(evictInterval).plusSeconds(8)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue

val t6 = t0.plus(evictInterval).plusSeconds(9)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue

offsetStore.getState().size shouldBe 6 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion

val t7 = t0.plus(timeWindow.minus(evictInterval))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue

offsetStore.getState().size shouldBe 7 // no eviction
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window)

val t8 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted p1@t1, p2@t2, and p3@t3
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction

val t9 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(10))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().size shouldBe 5 // no eviction (outside time window, but still within limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted p4@t4, p5@t5, p6@t6 (outside window)

val t10 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(11))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p9 -> 1L)), p9, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8, p9) // evicted p4@t4
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t11 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(12))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p10 -> 1L)), p10, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p6, p7, p8, p9, p10) // evicted p5@t5
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t12 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(13))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t12, Map(p11 -> 1L)), p11, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // evicted p6@t6
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete

val t13 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(14))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t13, Map(p12 -> 1L)), p12, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // no eviction (within time window)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion

val t14 = t0.plus(timeWindow.multipliedBy(2).plus(evictInterval.multipliedBy(3)).plusSeconds(1))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t14, Map(p12 -> 2L)), p12, 2L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) // evicted p7@t7
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // triggered by evict, deleted p7@t7, p8@t8, p8@t9

offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p9, p10, p11, p12)
}

"set offset" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ private[projection] object R2dbcOffsetStore {
def evict(until: Instant, keepNumberOfEntries: Int): State = {
if (oldestTimestamp.isBefore(until) && size > keepNumberOfEntries) {
val newState = State(
sortedByTimestamp
.take(size - keepNumberOfEntries)
.filterNot(_.timestamp.isBefore(until)) ++ sortedByTimestamp
.takeRight(keepNumberOfEntries) ++ latestBySlice)
sortedByTimestamp.take(size - keepNumberOfEntries).filterNot(_.timestamp.isBefore(until))
++ sortedByTimestamp.takeRight(keepNumberOfEntries)
++ latestBySlice)
newState.copy(sizeAfterEvict = newState.size)
} else
this
Expand Down Expand Up @@ -241,6 +240,9 @@ private[projection] class R2dbcOffsetStore(
// To avoid delete requests when no new offsets have been stored since previous delete
private val idle = new AtomicBoolean(false)

// To trigger next deletion after in-memory eviction
private val triggerDeletion = new AtomicBoolean(false)

system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
Expand Down Expand Up @@ -455,6 +457,7 @@ private[projection] class R2dbcOffsetStore(
.compareTo(evictWindow) > 0) {
val evictUntil = newState.latestTimestamp.minus(settings.timeWindow)
val s = newState.evict(evictUntil, settings.keepNumberOfEntries)
triggerDeletion.set(true)
logger.debugN(
"Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].",
newState.size - s.size,
Expand Down Expand Up @@ -739,7 +742,7 @@ private[projection] class R2dbcOffsetStore(
Future.successful(0)
} else {
val currentState = getState()
if (currentState.size <= settings.keepNumberOfEntries || currentState.window.compareTo(settings.timeWindow) < 0) {
if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) {
// it hasn't filled up the window yet
Future.successful(0)
} else {
Expand Down

0 comments on commit 88aa4dc

Please sign in to comment.