Skip to content

Commit

Permalink
always trigger deletion after eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 29, 2024
1 parent 5e4b32c commit 9e0e35a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8)
}

"delete old records triggered by number of entries, after eviction, given new persistence ids" in {
"delete old records triggered after eviction" in {
val projectionId = genRandomProjectionId()
val evictSettings = settings
.withKeepNumberOfEntries(5)
Expand All @@ -1206,7 +1206,6 @@ class R2dbcTimestampOffsetStoreSpec
val p6 = "p3080"
val p7 = "p4290"
val p8 = "p20180"
val p9 = "p21390"

val t1 = t0.plusSeconds(1)
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue
Expand Down Expand Up @@ -1238,29 +1237,11 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue

offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted t1|p1, t2|p2, and t3|p3
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion, as already evicted older records
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction

val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(4))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p7 -> 2L)), p7, 2L)).futureValue

offsetStore.getState().byPid.size shouldBe 5 // no eviction (within time window and limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (still within limit)

val t10 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(5))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p8 -> 2L)), p8, 2L)).futureValue

offsetStore.getState().byPid.size shouldBe 5 // no eviction (within time window and limit)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (still within limit)

val t11 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(6))
offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p9 -> 1L)), p9, 1L)).futureValue // new pid

offsetStore.getState().byPid.size shouldBe 6 // no eviction (over limit but still within time window)
offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted t1|p1, t2|p2, and t3|p3

offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8, p9)
offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8)
offsetStore.readOffset().futureValue // reload from database
offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8, p9)
offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8)
}

"set offset" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ private[projection] class R2dbcOffsetStore(
// To avoid delete requests when no new offsets have been stored since previous delete
private val idle = new AtomicBoolean(false)

// To trigger next deletion after in-memory eviction
private val triggerDeletion = new AtomicBoolean(false)

system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
Expand Down Expand Up @@ -438,6 +441,7 @@ private[projection] class R2dbcOffsetStore(
.compareTo(evictWindow) > 0) {
val evictUntil = newState.latestTimestamp.minus(settings.timeWindow)
val s = newState.evict(evictUntil, settings.keepNumberOfEntries)
triggerDeletion.set(true)
logger.debugN(
"Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].",
newState.size - s.size,
Expand Down Expand Up @@ -722,7 +726,7 @@ private[projection] class R2dbcOffsetStore(
Future.successful(0)
} else {
val currentState = getState()
if (currentState.size <= settings.keepNumberOfEntries && currentState.window.compareTo(settings.timeWindow) < 0) {
if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) {
// it hasn't filled up the window yet
Future.successful(0)
} else {
Expand Down

0 comments on commit 9e0e35a

Please sign in to comment.