From 9c7168fcd2975fcc67d674df969e018945008947 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 18 Oct 2024 21:00:23 +1300 Subject: [PATCH] fix: adopt offsets from other projection keys (#1218) * improvement for projection scaling --- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 190 ++++++++++++++++++ .../src/main/resources/reference.conf | 4 + .../r2dbc/R2dbcProjectionSettings.scala | 15 ++ .../r2dbc/internal/OffsetStoreDao.scala | 2 + .../internal/PostgresOffsetStoreDao.scala | 49 +++++ .../r2dbc/internal/R2dbcOffsetStore.scala | 94 +++++++++ 6 files changed, 354 insertions(+) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 43dba8bf4..7f4606eef 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -1338,5 +1338,195 @@ class R2dbcTimestampOffsetStoreSpec TimestampOffset.toTimestampOffset(offsetStore4.getOffset().futureValue.get).timestamp shouldBe time4 } + "adopt latest-by-slice offsets from other projection keys" in { + import R2dbcOffsetStore.Validation._ + + val projectionName = UUID.randomUUID().toString + + def offsetStore(minSlice: Int, maxSlice: Int) = + new R2dbcOffsetStore( + ProjectionId(projectionName, s"$minSlice-$maxSlice"), + Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)), + system, + settings.withTimeWindow(JDuration.ofSeconds(10)), + r2dbcExecutor) + + // two projections at higher scale + val offsetStore1 = offsetStore(512, 767) + val offsetStore2 = offsetStore(768, 1023) + + // one projection at lower scale + val offsetStore3 = offsetStore(512, 1023) + + val p1 = "p-0960" // slice 576 + val p2 = "p-6009" // slice 640 + val p3 = "p-3039" // slice 832 + val p4 = "p-2049" // slice 896 + + val t0 = clock.instant().minusSeconds(100) + def time(step: Int) = t0.plusSeconds(step) + + // scaled to 4 projections, testing 512-767 and 768-1023 + + // key: 512-767 — this projection is further behind + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(0), Map(p1 -> 1L)), p1, 1L)).futureValue + offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(1), Map(p2 -> 1L)), p2, 1L)).futureValue + + // key: 768-1023 + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p3 -> 2L)), p3, 2L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(3), Map(p3 -> 3L)), p3, 3L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(4), Map(p3 -> 4L)), p3, 4L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(5), Map(p3 -> 5L)), p3, 5L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(6), Map(p4 -> 6L)), p4, 6L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(7), Map(p4 -> 7L)), p4, 7L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(8), Map(p4 -> 8L)), p4, 8L)).futureValue + offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(9), Map(p4 -> 9L)), p4, 9L)).futureValue + + // scaled down to 2 projections, testing 512-1023 + + // reload: start offset is latest from 512-767 (earliest of latest by slice range) + val startOffset1 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) + startOffset1.timestamp shouldBe time(1) + startOffset1.seen shouldBe Map(p2 -> 1L) + + val state1 = offsetStore3.getState() + state1.size shouldBe 4 + state1.latestBySlice.size shouldBe 4 + + offsetStore3.getForeignOffsets().size shouldBe 4 // all latest are from other projection keys + offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload + + // simulate replay from start offset, only up to latest from 512-767 + offsetStore3.validate(createEnvelope(p2, 1L, time(1), "event1")).futureValue shouldBe Duplicate + + // triggering adoption task will adopt the offsets from 512-767 + offsetStore3.getLatestSeen() shouldBe time(1) // updated by validation of duplicates + offsetStore3.adoptForeignOffsets().futureValue shouldBe 2 + + // reload: start offset is from 512-1023, which has adopted offsets, but before latest from 768-1023 + val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) + startOffset2.timestamp shouldBe time(1) + startOffset2.seen shouldBe Map(p2 -> 1L) + + val state2 = offsetStore3.getState() + state2.size shouldBe 4 + state2.latestBySlice.size shouldBe 4 + + offsetStore3.getForeignOffsets().size shouldBe 2 // latest by slice still from other projection keys (768-1023) + offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload + + // simulate replay from start offset + offsetStore3.validate(createEnvelope(p2, 1L, time(1), "event1")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p2, 2L, time(2), "event2")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 2L, time(2), "event2")) + offsetStore3.getLatestSeen() shouldBe time(1) // only duplicates move the latest seen forward for validation + offsetStore3.validate(createEnvelope(p3, 2L, time(2), "event2")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p2, 3L, time(3), "event3")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 3L, time(3), "event3")) + offsetStore3.validate(createEnvelope(p3, 3L, time(3), "event3")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p2, 4L, time(4), "event4")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 4L, time(4), "event4")) + offsetStore3.validate(createEnvelope(p3, 4L, time(4), "event4")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p2, 5L, time(5), "event5")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 5L, time(5), "event5")) + offsetStore3.getLatestSeen() shouldBe time(4) // updated by validation of duplicates + + // move slice 640 forward, up to the latest for slice 832 (still under 768-1023 key) + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p2 -> 2L)), p2, 2L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(3), Map(p2 -> 3L)), p2, 3L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(4), Map(p2 -> 4L)), p2, 4L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(5), Map(p2 -> 5L)), p2, 5L)).futureValue + + // triggering adoption task will adopt the offset for slice 832 (under 768-1023 key) + offsetStore3.getLatestSeen() shouldBe time(5) // updated by save offsets + offsetStore3.adoptForeignOffsets().futureValue shouldBe 1 + + // reload: start offset is from 512-1023, which has new and adopted offsets, but before latest from 768-1023 + val startOffset3 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) + startOffset3.timestamp shouldBe time(5) + startOffset3.seen shouldBe Map(p2 -> 5L) + + val state3 = offsetStore3.getState() + state3.size shouldBe 4 + state3.latestBySlice.size shouldBe 4 + + offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 + offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload + + // simulate replay from start offset + offsetStore3.validate(createEnvelope(p2, 5L, time(5), "event5")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p3, 5L, time(5), "event5")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p1, 2L, time(6), "event2")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p1, 2L, time(6), "event2")) + offsetStore3.validate(createEnvelope(p4, 6L, time(6), "event6")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p2, 6L, time(7), "event6")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 6L, time(7), "event6")) + offsetStore3.validate(createEnvelope(p4, 7L, time(7), "event7")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p3, 6L, time(8), "event6")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p3, 6L, time(8), "event6")) + offsetStore3.validate(createEnvelope(p4, 8L, time(8), "event8")).futureValue shouldBe Duplicate + + // move slices 576, 640, 832 forward but not past slice 896 yet + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(6), Map(p1 -> 2L)), p1, 2L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(7), Map(p2 -> 6L)), p2, 6L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(8), Map(p3 -> 6L)), p3, 6L)).futureValue + + // triggering adoption task will not adopt any offsets + offsetStore3.getLatestSeen() shouldBe time(8) + offsetStore3.adoptForeignOffsets().futureValue shouldBe 0 + + // reload: start offset is from 512-1023, which has new and adopted offsets, but still before latest from 768-1023 + val startOffset4 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) + startOffset4.timestamp shouldBe time(8) + startOffset4.seen shouldBe Map(p3 -> 6L) + + val state4 = offsetStore3.getState() + state4.size shouldBe 4 + state4.latestBySlice.size shouldBe 4 + + offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023 + offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload + + // simulate replay from start offset + offsetStore3.validate(createEnvelope(p3, 6L, time(8), "event6")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p4, 8L, time(8), "event8")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p4, 9L, time(9), "event9")).futureValue shouldBe Duplicate + offsetStore3.validate(createEnvelope(p1, 3L, time(10), "event3")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p1, 3L, time(10), "event3")) + offsetStore3.validate(createEnvelope(p2, 7L, time(11), "event7")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p2, 7L, time(11), "event7")) + offsetStore3.validate(createEnvelope(p3, 7L, time(12), "event7")).futureValue shouldBe Accepted + offsetStore3.addInflight(createEnvelope(p3, 7L, time(12), "event7")) + + // triggering adoption task will adopt the offset for slice 896 (under 768-1023 key) + offsetStore3.getLatestSeen() shouldBe time(9) // updated by validation of duplicates + offsetStore3.adoptForeignOffsets().futureValue shouldBe 1 + + // move slices past the latest from 768-1023 + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(10), Map(p1 -> 3L)), p1, 3L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(11), Map(p2 -> 7L)), p2, 7L)).futureValue + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(12), Map(p3 -> 7L)), p3, 7L)).futureValue + + // triggering adoption task will not adopt any offsets (no foreign offsets remaining) + offsetStore3.getLatestSeen() shouldBe time(9) // latest seen is no longer updated + offsetStore3.adoptForeignOffsets().futureValue shouldBe 0 + + // reload: start offset is latest now, all offsets from 512-1023 + val startOffset5 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get) + startOffset5.timestamp shouldBe time(12) + startOffset5.seen shouldBe Map(p3 -> 7L) + + val state5 = offsetStore3.getState() + state5.size shouldBe 4 + state5.latestBySlice.size shouldBe 4 + + offsetStore3.getForeignOffsets() shouldBe empty + offsetStore3.getLatestSeen() shouldBe Instant.EPOCH + + // outdated offsets, included those for 768-1023, will eventually be deleted + offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p1 -> 4L)), p1, 4L)).futureValue + offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17 + } + } } diff --git a/akka-projection-r2dbc/src/main/resources/reference.conf b/akka-projection-r2dbc/src/main/resources/reference.conf index 7435a83ec..cdb91a62a 100644 --- a/akka-projection-r2dbc/src/main/resources/reference.conf +++ b/akka-projection-r2dbc/src/main/resources/reference.conf @@ -33,6 +33,10 @@ akka.projection.r2dbc { # with this frequency. Can be disabled with `off`. delete-interval = 1 minute + # Adopt latest-by-slice entries from other projection keys with this frequency. + # Can be disabled with `off`. + adopt-interval = 1 minute + # Trying to batch insert offsets in batches of this size. offset-batch-size = 20 } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala index c7ceb6a2f..864bf6526 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/R2dbcProjectionSettings.scala @@ -44,6 +44,11 @@ object R2dbcProjectionSettings { case _ => config.getDuration("offset-store.delete-interval") } + val adoptInterval = config.getString("offset-store.adopt-interval").toLowerCase(Locale.ROOT) match { + case "off" => JDuration.ZERO + case _ => config.getDuration("offset-store.adopt-interval") + } + new R2dbcProjectionSettings( schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty), offsetTable = config.getString("offset-store.offset-table"), @@ -54,6 +59,7 @@ object R2dbcProjectionSettings { keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), evictInterval = config.getDuration("offset-store.evict-interval"), deleteInterval, + adoptInterval, logDbCallsExceeding, warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), offsetBatchSize = config.getInt("offset-store.offset-batch-size")) @@ -77,6 +83,7 @@ final class R2dbcProjectionSettings private ( val keepNumberOfEntries: Int, val evictInterval: JDuration, val deleteInterval: JDuration, + val adoptInterval: JDuration, val logDbCallsExceeding: FiniteDuration, val warnAboutFilteredEventsInFlow: Boolean, val offsetBatchSize: Int) { @@ -121,6 +128,12 @@ final class R2dbcProjectionSettings private ( def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings = copy(deleteInterval = deleteInterval) + def withAdoptInterval(adoptInterval: FiniteDuration): R2dbcProjectionSettings = + copy(adoptInterval = adoptInterval.toJava) + + def withAdoptInterval(adoptInterval: JDuration): R2dbcProjectionSettings = + copy(adoptInterval = adoptInterval) + def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration): R2dbcProjectionSettings = copy(logDbCallsExceeding = logDbCallsExceeding) @@ -143,6 +156,7 @@ final class R2dbcProjectionSettings private ( keepNumberOfEntries: Int = keepNumberOfEntries, evictInterval: JDuration = evictInterval, deleteInterval: JDuration = deleteInterval, + adoptInterval: JDuration = adoptInterval, logDbCallsExceeding: FiniteDuration = logDbCallsExceeding, warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, offsetBatchSize: Int = offsetBatchSize) = @@ -156,6 +170,7 @@ final class R2dbcProjectionSettings private ( keepNumberOfEntries, evictInterval, deleteInterval, + adoptInterval, logDbCallsExceeding, warnAboutFilteredEventsInFlow, offsetBatchSize) diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index 198b867c7..418717c47 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -39,6 +39,8 @@ private[projection] trait OffsetStoreDao { def deleteNewTimestampOffsetsInTx(connection: Connection, timestamp: Instant): Future[Long] + def adoptTimestampOffsets(latestBySlice: Seq[LatestBySlice]): Future[Long] + def clearTimestampOffset(): Future[Long] def clearPrimitiveOffset(): Future[Long] diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 9acb743fd..744d43985 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -111,6 +111,22 @@ private[projection] class PostgresOffsetStoreDao( private val deleteNewTimestampOffsetSql: String = sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset >= ?" + private val adoptTimestampOffsetSql: String = + sql""" + UPDATE $timestampOffsetTable + SET projection_key = ? + WHERE projection_name = ? AND slice = ? AND persistence_id = ? AND seq_nr = ? + """ + + private def adoptTimestampOffsetBatchSql(offsets: Int): String = { + val conditions = (1 to offsets).map(_ => "(slice = ? AND persistence_id = ? AND seq_nr = ?)").mkString(" OR ") + sql""" + UPDATE $timestampOffsetTable + SET projection_key = ? + WHERE projection_name = ? AND ($conditions) + """ + } + private val clearTimestampOffsetSql: String = sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?" @@ -336,6 +352,39 @@ private[projection] class PostgresOffsetStoreDao( .bindTimestamp(3, timestamp)) } + override def adoptTimestampOffsets(latestBySlice: Seq[LatestBySlice]): Future[Long] = { + def bindCondition(statement: Statement, latest: LatestBySlice, startIndex: Int): Statement = { + statement + .bind(startIndex + 0, latest.slice) + .bind(startIndex + 1, latest.pid) + .bind(startIndex + 2, latest.seqNr) + } + if (latestBySlice.size == 1) { + r2dbcExecutor.updateOne("adopt timestamp offset") { connection => + val statement = connection + .createStatement(adoptTimestampOffsetSql) + .bind(0, projectionId.key) + .bind(1, projectionId.name) + bindCondition(statement, latestBySlice.head, startIndex = 2) + } + } else { + val batches = latestBySlice.sliding(settings.offsetBatchSize, settings.offsetBatchSize).toIndexedSeq + r2dbcExecutor + .update("adopt timestamp offsets") { connection => + batches.map { batch => + val batchStatement = connection + .createStatement(adoptTimestampOffsetBatchSql(batch.size)) + .bind(0, projectionId.key) + .bind(1, projectionId.name) + batch.zipWithIndex.foldLeft(batchStatement) { + case (statement, (latest, index)) => bindCondition(statement, latest, startIndex = 2 + index * 3) + } + } + } + .map(_.sum) + } + } + override def clearTimestampOffset(): Future[Long] = { val minSlice = timestampOffsetBySlicesSourceProvider.minSlice val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index a22a93457..c22cb8fde 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -248,6 +248,26 @@ private[projection] class R2dbcOffsetStore( () => deleteOldTimestampOffsets(), system.executionContext) + // Foreign offsets (latest by slice offsets from other projection keys) that should be adopted when passed in time. + // Contains remaining offsets to adopt. Sorted by timestamp. Can be updated concurrently with CAS retries. + private val foreignOffsets = new AtomicReference(Seq.empty[RecordWithProjectionKey]) + + // The latest timestamp as seen by validation or saving offsets. For determining when to update foreign offsets. + // Can be updated concurrently with CAS retries. + private val latestSeen = new AtomicReference(Instant.EPOCH) + + private val adoptingForeignOffsets = !settings.adoptInterval.isZero && !settings.adoptInterval.isNegative + + private val scheduledAdoptForeignOffsets = + if (adoptingForeignOffsets) + Some( + system.scheduler.scheduleWithFixedDelay( + settings.adoptInterval, + settings.adoptInterval, + () => adoptForeignOffsets(), + system.executionContext)) + else None + private def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = { sourceProvider match { case Some(timestampQuery: EventTimestampQuery) => @@ -305,6 +325,8 @@ private[projection] class R2dbcOffsetStore( if (!state.compareAndSet(oldState, newState)) throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") clearInflight() + clearForeignOffsets() + clearLatestSeen() if (newState == State.empty) { None } else if (moreThanOneProjectionKey(recordsWithKey)) { @@ -321,6 +343,12 @@ private[projection] class R2dbcOffsetStore( // Only needed if there's more than one projection key within the latest offsets by slice. // To handle restarts after previous downscaling, and all latest are from the same instance. if (moreThanOneProjectionKey(latestBySliceWithKey)) { + if (adoptingForeignOffsets) { + val foreignOffsets = latestBySliceWithKey + .filter(_.projectionKey != projectionId.key) + .sortBy(_.record.timestamp) + setForeignOffsets(foreignOffsets) + } // Use the earliest of the latest from each projection instance (distinct projection key). val latestByKey = latestBySliceWithKey.groupBy(_.projectionKey).map { @@ -442,6 +470,12 @@ private[projection] class R2dbcOffsetStore( .toVector } } + if (hasForeignOffsets() && records.nonEmpty) { + val latestTimestamp = + if (records.size == 1) records.head.timestamp + else records.maxBy(_.timestamp).timestamp + updateLatestSeen(latestTimestamp) + } if (filteredRecords.isEmpty) { FutureDone } else { @@ -578,6 +612,8 @@ private[projection] class R2dbcOffsetStore( if (duplicate) { logger.trace("Filtering out duplicate sequence number [{}] for pid [{}]", seqNr, pid) + // also move latest seen forward, for adopting foreign offsets on replay of duplicates + if (hasForeignOffsets()) updateLatestSeen(recordWithOffset.offset.timestamp) FutureDuplicate } else if (recordWithOffset.strictSeqNr) { // strictSeqNr == true is for event sourced @@ -776,6 +812,64 @@ private[projection] class R2dbcOffsetStore( } } + def getForeignOffsets(): Seq[RecordWithProjectionKey] = + foreignOffsets.get() + + def hasForeignOffsets(): Boolean = + adoptingForeignOffsets && getForeignOffsets().nonEmpty + + @tailrec private def setForeignOffsets(records: Seq[RecordWithProjectionKey]): Unit = { + val currentForeignOffsets = getForeignOffsets() + if (!foreignOffsets.compareAndSet(currentForeignOffsets, records)) + setForeignOffsets(records) // CAS retry, concurrent update of foreignOffsets + } + + // return the adoptable foreign offsets up to the latest timestamp, and set to the remaining foreign offsets + @tailrec private def takeAdoptableForeignOffsets(latestTimestamp: Instant): Seq[RecordWithProjectionKey] = { + val currentForeignOffsets = getForeignOffsets() + val adoptable = currentForeignOffsets.takeWhile(_.record.timestamp.compareTo(latestTimestamp) <= 0) + if (adoptable.isEmpty) Seq.empty + else { + val remainingForeignOffsets = currentForeignOffsets.drop(adoptable.size) + if (foreignOffsets.compareAndSet(currentForeignOffsets, remainingForeignOffsets)) adoptable + else takeAdoptableForeignOffsets(latestTimestamp) // CAS retry, concurrent update of foreignOffsets + } + } + + private def clearForeignOffsets(): Unit = setForeignOffsets(Seq.empty) + + def getLatestSeen(): Instant = + latestSeen.get() + + @tailrec private def updateLatestSeen(instant: Instant): Unit = { + val currentLatestSeen = getLatestSeen() + if (instant.isAfter(currentLatestSeen)) { + if (!latestSeen.compareAndSet(currentLatestSeen, instant)) + updateLatestSeen(instant) // CAS retry, concurrent update of latestSeen + } + } + + @tailrec private def clearLatestSeen(): Unit = { + val currentLatestSeen = getLatestSeen() + if (!latestSeen.compareAndSet(currentLatestSeen, Instant.EPOCH)) + clearLatestSeen() // CAS retry, concurrent update of latestSeen + } + + def adoptForeignOffsets(): Future[Long] = { + if (!hasForeignOffsets()) { + scheduledAdoptForeignOffsets.foreach(_.cancel()) + Future.successful(0) + } else { + val latestTimestamp = getLatestSeen() + val adoptableRecords = takeAdoptableForeignOffsets(latestTimestamp) + if (!hasForeignOffsets()) scheduledAdoptForeignOffsets.foreach(_.cancel()) + val adoptableLatestBySlice = adoptableRecords.map { adoptable => + LatestBySlice(adoptable.record.slice, adoptable.record.pid, adoptable.record.seqNr) + } + dao.adoptTimestampOffsets(adoptableLatestBySlice) + } + } + /** * Resetting an offset. Deletes newer offsets. Used from ProjectionManagement. Doesn't update in-memory state because * the projection is supposed to be stopped/started for this operation.