Skip to content

Commit

Permalink
fix: adopt offsets from other projection keys (#1218)
Browse files Browse the repository at this point in the history
* improvement for projection scaling

(cherry picked from commit 9c7168f)
  • Loading branch information
pvlugter authored and patriknw committed Oct 18, 2024
1 parent 45e7926 commit e407ee1
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1338,5 +1338,195 @@ class R2dbcTimestampOffsetStoreSpec
TimestampOffset.toTimestampOffset(offsetStore4.getOffset().futureValue.get).timestamp shouldBe time4
}

"adopt latest-by-slice offsets from other projection keys" in {
import R2dbcOffsetStore.Validation._

val projectionName = UUID.randomUUID().toString

def offsetStore(minSlice: Int, maxSlice: Int) =
new R2dbcOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
system,
settings.withTimeWindow(JDuration.ofSeconds(10)),
r2dbcExecutor)

// two projections at higher scale
val offsetStore1 = offsetStore(512, 767)
val offsetStore2 = offsetStore(768, 1023)

// one projection at lower scale
val offsetStore3 = offsetStore(512, 1023)

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832
val p4 = "p-2049" // slice 896

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// scaled to 4 projections, testing 512-767 and 768-1023

// key: 512-767 — this projection is further behind
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(0), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(1), Map(p2 -> 1L)), p2, 1L)).futureValue

// key: 768-1023
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p3 -> 2L)), p3, 2L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(3), Map(p3 -> 3L)), p3, 3L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(4), Map(p3 -> 4L)), p3, 4L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(5), Map(p3 -> 5L)), p3, 5L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(6), Map(p4 -> 6L)), p4, 6L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(7), Map(p4 -> 7L)), p4, 7L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(8), Map(p4 -> 8L)), p4, 8L)).futureValue
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(time(9), Map(p4 -> 9L)), p4, 9L)).futureValue

// scaled down to 2 projections, testing 512-1023

// reload: start offset is latest from 512-767 (earliest of latest by slice range)
val startOffset1 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get)
startOffset1.timestamp shouldBe time(1)
startOffset1.seen shouldBe Map(p2 -> 1L)

val state1 = offsetStore3.getState()
state1.size shouldBe 4
state1.latestBySlice.size shouldBe 4

offsetStore3.getForeignOffsets().size shouldBe 4 // all latest are from other projection keys
offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload

// simulate replay from start offset, only up to latest from 512-767
offsetStore3.validate(createEnvelope(p2, 1L, time(1), "event1")).futureValue shouldBe Duplicate

// triggering adoption task will adopt the offsets from 512-767
offsetStore3.getLatestSeen() shouldBe time(1) // updated by validation of duplicates
offsetStore3.adoptForeignOffsets().futureValue shouldBe 2

// reload: start offset is from 512-1023, which has adopted offsets, but before latest from 768-1023
val startOffset2 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get)
startOffset2.timestamp shouldBe time(1)
startOffset2.seen shouldBe Map(p2 -> 1L)

val state2 = offsetStore3.getState()
state2.size shouldBe 4
state2.latestBySlice.size shouldBe 4

offsetStore3.getForeignOffsets().size shouldBe 2 // latest by slice still from other projection keys (768-1023)
offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload

// simulate replay from start offset
offsetStore3.validate(createEnvelope(p2, 1L, time(1), "event1")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p2, 2L, time(2), "event2")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 2L, time(2), "event2"))
offsetStore3.getLatestSeen() shouldBe time(1) // only duplicates move the latest seen forward for validation
offsetStore3.validate(createEnvelope(p3, 2L, time(2), "event2")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p2, 3L, time(3), "event3")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 3L, time(3), "event3"))
offsetStore3.validate(createEnvelope(p3, 3L, time(3), "event3")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p2, 4L, time(4), "event4")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 4L, time(4), "event4"))
offsetStore3.validate(createEnvelope(p3, 4L, time(4), "event4")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p2, 5L, time(5), "event5")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 5L, time(5), "event5"))
offsetStore3.getLatestSeen() shouldBe time(4) // updated by validation of duplicates

// move slice 640 forward, up to the latest for slice 832 (still under 768-1023 key)
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p2 -> 2L)), p2, 2L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(3), Map(p2 -> 3L)), p2, 3L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(4), Map(p2 -> 4L)), p2, 4L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(5), Map(p2 -> 5L)), p2, 5L)).futureValue

// triggering adoption task will adopt the offset for slice 832 (under 768-1023 key)
offsetStore3.getLatestSeen() shouldBe time(5) // updated by save offsets
offsetStore3.adoptForeignOffsets().futureValue shouldBe 1

// reload: start offset is from 512-1023, which has new and adopted offsets, but before latest from 768-1023
val startOffset3 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get)
startOffset3.timestamp shouldBe time(5)
startOffset3.seen shouldBe Map(p2 -> 5L)

val state3 = offsetStore3.getState()
state3.size shouldBe 4
state3.latestBySlice.size shouldBe 4

offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023
offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload

// simulate replay from start offset
offsetStore3.validate(createEnvelope(p2, 5L, time(5), "event5")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p3, 5L, time(5), "event5")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p1, 2L, time(6), "event2")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p1, 2L, time(6), "event2"))
offsetStore3.validate(createEnvelope(p4, 6L, time(6), "event6")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p2, 6L, time(7), "event6")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 6L, time(7), "event6"))
offsetStore3.validate(createEnvelope(p4, 7L, time(7), "event7")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p3, 6L, time(8), "event6")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p3, 6L, time(8), "event6"))
offsetStore3.validate(createEnvelope(p4, 8L, time(8), "event8")).futureValue shouldBe Duplicate

// move slices 576, 640, 832 forward but not past slice 896 yet
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(6), Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(7), Map(p2 -> 6L)), p2, 6L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(8), Map(p3 -> 6L)), p3, 6L)).futureValue

// triggering adoption task will not adopt any offsets
offsetStore3.getLatestSeen() shouldBe time(8)
offsetStore3.adoptForeignOffsets().futureValue shouldBe 0

// reload: start offset is from 512-1023, which has new and adopted offsets, but still before latest from 768-1023
val startOffset4 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get)
startOffset4.timestamp shouldBe time(8)
startOffset4.seen shouldBe Map(p3 -> 6L)

val state4 = offsetStore3.getState()
state4.size shouldBe 4
state4.latestBySlice.size shouldBe 4

offsetStore3.getForeignOffsets().size shouldBe 1 // latest by slice still from 768-1023
offsetStore3.getLatestSeen() shouldBe Instant.EPOCH // latest seen is reset on reload

// simulate replay from start offset
offsetStore3.validate(createEnvelope(p3, 6L, time(8), "event6")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p4, 8L, time(8), "event8")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p4, 9L, time(9), "event9")).futureValue shouldBe Duplicate
offsetStore3.validate(createEnvelope(p1, 3L, time(10), "event3")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p1, 3L, time(10), "event3"))
offsetStore3.validate(createEnvelope(p2, 7L, time(11), "event7")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p2, 7L, time(11), "event7"))
offsetStore3.validate(createEnvelope(p3, 7L, time(12), "event7")).futureValue shouldBe Accepted
offsetStore3.addInflight(createEnvelope(p3, 7L, time(12), "event7"))

// triggering adoption task will adopt the offset for slice 896 (under 768-1023 key)
offsetStore3.getLatestSeen() shouldBe time(9) // updated by validation of duplicates
offsetStore3.adoptForeignOffsets().futureValue shouldBe 1

// move slices past the latest from 768-1023
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(10), Map(p1 -> 3L)), p1, 3L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(11), Map(p2 -> 7L)), p2, 7L)).futureValue
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(12), Map(p3 -> 7L)), p3, 7L)).futureValue

// triggering adoption task will not adopt any offsets (no foreign offsets remaining)
offsetStore3.getLatestSeen() shouldBe time(9) // latest seen is no longer updated
offsetStore3.adoptForeignOffsets().futureValue shouldBe 0

// reload: start offset is latest now, all offsets from 512-1023
val startOffset5 = TimestampOffset.toTimestampOffset(offsetStore3.readOffset().futureValue.get)
startOffset5.timestamp shouldBe time(12)
startOffset5.seen shouldBe Map(p3 -> 7L)

val state5 = offsetStore3.getState()
state5.size shouldBe 4
state5.latestBySlice.size shouldBe 4

offsetStore3.getForeignOffsets() shouldBe empty
offsetStore3.getLatestSeen() shouldBe Instant.EPOCH

// outdated offsets, included those for 768-1023, will eventually be deleted
offsetStore3.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p1 -> 4L)), p1, 4L)).futureValue
offsetStore3.deleteOldTimestampOffsets().futureValue shouldBe 17
}

}
}
4 changes: 4 additions & 0 deletions akka-projection-r2dbc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ akka.projection.r2dbc {
# with this frequency. Can be disabled with `off`.
delete-interval = 1 minute

# Adopt latest-by-slice entries from other projection keys with this frequency.
# Can be disabled with `off`.
adopt-interval = 1 minute

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ object R2dbcProjectionSettings {
case _ => config.getDuration("offset-store.delete-interval")
}

val adoptInterval = config.getString("offset-store.adopt-interval").toLowerCase(Locale.ROOT) match {
case "off" => JDuration.ZERO
case _ => config.getDuration("offset-store.adopt-interval")
}

new R2dbcProjectionSettings(
schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty),
offsetTable = config.getString("offset-store.offset-table"),
Expand All @@ -54,6 +59,7 @@ object R2dbcProjectionSettings {
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval,
adoptInterval,
logDbCallsExceeding,
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"))
Expand All @@ -77,6 +83,7 @@ final class R2dbcProjectionSettings private (
val keepNumberOfEntries: Int,
val evictInterval: JDuration,
val deleteInterval: JDuration,
val adoptInterval: JDuration,
val logDbCallsExceeding: FiniteDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int) {
Expand Down Expand Up @@ -121,6 +128,12 @@ final class R2dbcProjectionSettings private (
def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings =
copy(deleteInterval = deleteInterval)

def withAdoptInterval(adoptInterval: FiniteDuration): R2dbcProjectionSettings =
copy(adoptInterval = adoptInterval.toJava)

def withAdoptInterval(adoptInterval: JDuration): R2dbcProjectionSettings =
copy(adoptInterval = adoptInterval)

def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration): R2dbcProjectionSettings =
copy(logDbCallsExceeding = logDbCallsExceeding)

Expand All @@ -143,6 +156,7 @@ final class R2dbcProjectionSettings private (
keepNumberOfEntries: Int = keepNumberOfEntries,
evictInterval: JDuration = evictInterval,
deleteInterval: JDuration = deleteInterval,
adoptInterval: JDuration = adoptInterval,
logDbCallsExceeding: FiniteDuration = logDbCallsExceeding,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize) =
Expand All @@ -156,6 +170,7 @@ final class R2dbcProjectionSettings private (
keepNumberOfEntries,
evictInterval,
deleteInterval,
adoptInterval,
logDbCallsExceeding,
warnAboutFilteredEventsInFlow,
offsetBatchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ private[projection] trait OffsetStoreDao {

def deleteNewTimestampOffsetsInTx(connection: Connection, timestamp: Instant): Future[Long]

def adoptTimestampOffsets(latestBySlice: Seq[LatestBySlice]): Future[Long]

def clearTimestampOffset(): Future[Long]

def clearPrimitiveOffset(): Future[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,22 @@ private[projection] class PostgresOffsetStoreDao(
private val deleteNewTimestampOffsetSql: String =
sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset >= ?"

private val adoptTimestampOffsetSql: String =
sql"""
UPDATE $timestampOffsetTable
SET projection_key = ?
WHERE projection_name = ? AND slice = ? AND persistence_id = ? AND seq_nr = ?
"""

private def adoptTimestampOffsetBatchSql(offsets: Int): String = {
val conditions = (1 to offsets).map(_ => "(slice = ? AND persistence_id = ? AND seq_nr = ?)").mkString(" OR ")
sql"""
UPDATE $timestampOffsetTable
SET projection_key = ?
WHERE projection_name = ? AND ($conditions)
"""
}

private val clearTimestampOffsetSql: String =
sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?"

Expand Down Expand Up @@ -338,6 +354,39 @@ private[projection] class PostgresOffsetStoreDao(
.bindTimestamp(3, timestamp))
}

override def adoptTimestampOffsets(latestBySlice: Seq[LatestBySlice]): Future[Long] = {
def bindCondition(statement: Statement, latest: LatestBySlice, startIndex: Int): Statement = {
statement
.bind(startIndex + 0, latest.slice)
.bind(startIndex + 1, latest.pid)
.bind(startIndex + 2, latest.seqNr)
}
if (latestBySlice.size == 1) {
r2dbcExecutor.updateOne("adopt timestamp offset") { connection =>
val statement = connection
.createStatement(adoptTimestampOffsetSql)
.bind(0, projectionId.key)
.bind(1, projectionId.name)
bindCondition(statement, latestBySlice.head, startIndex = 2)
}
} else {
val batches = latestBySlice.sliding(settings.offsetBatchSize, settings.offsetBatchSize).toIndexedSeq
r2dbcExecutor
.update("adopt timestamp offsets") { connection =>
batches.map { batch =>
val batchStatement = connection
.createStatement(adoptTimestampOffsetBatchSql(batch.size))
.bind(0, projectionId.key)
.bind(1, projectionId.name)
batch.zipWithIndex.foldLeft(batchStatement) {
case (statement, (latest, index)) => bindCondition(statement, latest, startIndex = 2 + index * 3)
}
}
}
.map(_.sum)
}
}

override def clearTimestampOffset(): Future[Long] = {
val minSlice = timestampOffsetBySlicesSourceProvider.minSlice
val maxSlice = timestampOffsetBySlicesSourceProvider.maxSlice
Expand Down
Loading

0 comments on commit e407ee1

Please sign in to comment.