diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 6ae5ebb12..8c9bf7316 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -1054,6 +1054,172 @@ class R2dbcTimestampOffsetStoreSpec } } + "delete old records triggered by time window, while still within entries limit" in { + val projectionId = genRandomProjectionId() + val evictSettings = settings + .withKeepNumberOfEntries(10) + .withTimeWindow(JDuration.ofSeconds(100)) + .withEvictInterval(JDuration.ofSeconds(10)) + val offsetStore = createOffsetStore(projectionId, evictSettings) + + import evictSettings.{ evictInterval, timeWindow } + + val t0 = TestClock.nowMicros().instant() + log.debug("Start time [{}]", t0) + + val p1 = "p500" // slice 645 + val p2 = "p92" // slice 905 + val p3 = "p108" // slice 905 + val p4 = "p863" // slice 645 + val p5 = "p984" // slice 645 + val p6 = "p3080" // slice 645 + val p7 = "p4290" // slice 645 + val p8 = "p20180" // slice 645 + + val t1 = t0.plusSeconds(1) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue + + val t2 = t0.plusSeconds(2) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t2, Map(p2 -> 1L)), p2, 1L)).futureValue + + val t3 = t0.plusSeconds(3) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue + + val t4 = t0.plus(evictInterval).plusSeconds(1) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue + + val t5 = t0.plus(evictInterval).plusSeconds(2) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue + + val t6 = t0.plus(evictInterval).plusSeconds(3) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue + + offsetStore.getState().size shouldBe 6 + + val t7 = t0.plus(timeWindow.minusSeconds(10)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue + + offsetStore.getState().size shouldBe 7 // no eviction + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) + + val t8 = t0.plus(timeWindow.plus(evictInterval).minusSeconds(3)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue + + offsetStore.getState().size shouldBe 8 // no eviction + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p1@t1 and p2@t2, kept p3@t3 (latest) + + val t9 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue + + offsetStore.getState().size shouldBe 8 // no eviction (outside eviction window, but within keep-number-of-entries) + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 2 // deleted p4@t4 and p5@t5, kept p3@t3 (latest) + + offsetStore.getState().byPid.keySet shouldBe Set(p1, p2, p3, p4, p5, p6, p7, p8) + offsetStore.readOffset().futureValue // reload from database + offsetStore.getState().byPid.keySet shouldBe Set(p3, p6, p7, p8) + } + + "delete old records triggered after eviction" in { + val projectionId = genRandomProjectionId() + val evictSettings = settings + .withKeepNumberOfEntries(5) + .withTimeWindow(JDuration.ofSeconds(100)) + .withEvictInterval(JDuration.ofSeconds(10)) + val offsetStore = createOffsetStore(projectionId, evictSettings) + + import evictSettings.{ evictInterval, timeWindow } + + val t0 = TestClock.nowMicros().instant() + log.debug("Start time [{}]", t0) + + // all slice 645 + val p1 = "p500" + val p2 = "p621" + val p3 = "p742" + val p4 = "p863" + val p5 = "p984" + val p6 = "p3080" + val p7 = "p4290" + val p8 = "p20180" + val p9 = "p21390" + val p10 = "p31070" + val p11 = "p31191" + val p12 = "p32280" + + val t1 = t0.plusSeconds(1) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t1, Map(p1 -> 1L)), p1, 1L)).futureValue + + val t2 = t0.plusSeconds(2) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t2, Map(p2 -> 1L)), p2, 1L)).futureValue + + val t3 = t0.plusSeconds(3) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t3, Map(p3 -> 1L)), p3, 1L)).futureValue + + val t4 = t0.plus(evictInterval).plusSeconds(7) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t4, Map(p4 -> 1L)), p4, 1L)).futureValue + + val t5 = t0.plus(evictInterval).plusSeconds(8) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t5, Map(p5 -> 1L)), p5, 1L)).futureValue + + val t6 = t0.plus(evictInterval).plusSeconds(9) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t6, Map(p6 -> 1L)), p6, 1L)).futureValue + + offsetStore.getState().size shouldBe 6 // no eviction + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion + + val t7 = t0.plus(timeWindow.minus(evictInterval)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t7, Map(p7 -> 1L)), p7, 1L)).futureValue + + offsetStore.getState().size shouldBe 7 // no eviction + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion (within time window) + + val t8 = t0.plus(timeWindow.plus(evictInterval).plusSeconds(3)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t8, Map(p8 -> 1L)), p8, 1L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p4, p5, p6, p7, p8) // evicted p1@t1, p2@t2, and p3@t3 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deletion triggered by eviction + + val t9 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(10)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t9, Map(p8 -> 2L)), p8, 2L)).futureValue + + offsetStore.getState().size shouldBe 5 // no eviction (outside time window, but still within limit) + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // deleted p4@t4, p5@t5, p6@t6 (outside window) + + val t10 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(11)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t10, Map(p9 -> 1L)), p9, 1L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8, p9) // evicted p4@t4 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + + val t11 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(12)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t11, Map(p10 -> 1L)), p10, 1L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p6, p7, p8, p9, p10) // evicted p5@t5 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + + val t12 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(13)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t12, Map(p11 -> 1L)), p11, 1L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11) // evicted p6@t6 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // deletion triggered, but nothing to delete + + val t13 = t0.plus(timeWindow.plus(evictInterval.multipliedBy(2)).plusSeconds(14)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t13, Map(p12 -> 1L)), p12, 1L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8, p9, p10, p11, p12) // no eviction (within time window) + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 0 // no deletion + + val t14 = t0.plus(timeWindow.multipliedBy(2).plus(evictInterval.multipliedBy(3)).plusSeconds(1)) + offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(t14, Map(p12 -> 2L)), p12, 2L)).futureValue + + offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) // evicted p7@t7 + offsetStore.deleteOldTimestampOffsets().futureValue shouldBe 3 // triggered by evict, deleted p7@t7, p8@t8, p8@t9 + + offsetStore.getState().byPid.keySet shouldBe Set(p8, p9, p10, p11, p12) + offsetStore.readOffset().futureValue // reload from database + offsetStore.getState().byPid.keySet shouldBe Set(p9, p10, p11, p12) + } + "set offset" in { val projectionId = genRandomProjectionId() val offsetStore = createOffsetStore(projectionId) diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index db3b3816a..37994dc47 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -153,10 +153,9 @@ private[projection] object R2dbcOffsetStore { def evict(until: Instant, keepNumberOfEntries: Int): State = { if (oldestTimestamp.isBefore(until) && size > keepNumberOfEntries) { val newState = State( - sortedByTimestamp - .take(size - keepNumberOfEntries) - .filterNot(_.timestamp.isBefore(until)) ++ sortedByTimestamp - .takeRight(keepNumberOfEntries) ++ latestBySlice) + sortedByTimestamp.take(size - keepNumberOfEntries).filterNot(_.timestamp.isBefore(until)) + ++ sortedByTimestamp.takeRight(keepNumberOfEntries) + ++ latestBySlice) newState.copy(sizeAfterEvict = newState.size) } else this @@ -241,6 +240,9 @@ private[projection] class R2dbcOffsetStore( // To avoid delete requests when no new offsets have been stored since previous delete private val idle = new AtomicBoolean(false) + // To trigger next deletion after in-memory eviction + private val triggerDeletion = new AtomicBoolean(false) + system.scheduler.scheduleWithFixedDelay( settings.deleteInterval, settings.deleteInterval, @@ -455,6 +457,7 @@ private[projection] class R2dbcOffsetStore( .compareTo(evictWindow) > 0) { val evictUntil = newState.latestTimestamp.minus(settings.timeWindow) val s = newState.evict(evictUntil, settings.keepNumberOfEntries) + triggerDeletion.set(true) logger.debugN( "Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].", newState.size - s.size, @@ -739,7 +742,7 @@ private[projection] class R2dbcOffsetStore( Future.successful(0) } else { val currentState = getState() - if (currentState.size <= settings.keepNumberOfEntries || currentState.window.compareTo(settings.timeWindow) < 0) { + if (!triggerDeletion.getAndSet(false) && currentState.window.compareTo(settings.timeWindow) < 0) { // it hasn't filled up the window yet Future.successful(0) } else {