Skip to content

Commit

Permalink
perf: Try to load previous seqNr if missing, in validation (#1197)
Browse files Browse the repository at this point in the history
* perf: Try to load previous seqNr if missing, in validation

* makes it more safe to evict from memory, but keep more in database
* a step towards lazy loading of offsets,
  but full lazy loading would require many more changes due to the concurrency model used in the R2dbcOffsetStore
* otherwise it will use timestampOf, which are be more costly for gRPC projections
* it still falls back to timestampOf as last resort

* different sql for SqlServer
  • Loading branch information
patriknw authored Oct 18, 2024
1 parent 9c7168f commit e956248
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,37 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
}

"accept via loading of previous seqNr" in {
import R2dbcOffsetStore.Validation._
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

val startTime = TestClock.nowMicros().instant()

// reject unknown
val pid1 = "p1"
val env1 = createEnvelope(pid1, 7L, startTime.plusMillis(8), "e1-7")
offsetStore.validate(env1).futureValue shouldBe RejectedSeqNr
offsetStore.validate(backtrackingEnvelope(env1)).futureValue shouldBe RejectedBacktrackingSeqNr
// but if there was a stored offset (maybe evicted)
val slice = persistenceExt.sliceForPersistenceId(pid1)
r2dbcExecutor
.withConnection("insert offset") { con =>
offsetStore.dao.insertTimestampOffsetInTx(
con,
Vector(R2dbcOffsetStore.Record(slice, pid1, 5L, startTime.minusMillis(1))))
offsetStore.dao.insertTimestampOffsetInTx(con, Vector(R2dbcOffsetStore.Record(slice, pid1, 6L, startTime)))
}
.futureValue
// then it is accepted
offsetStore.validate(env1).futureValue shouldBe Accepted

// and also detect duplicate that way
offsetStore.validate(createEnvelope(pid1, 4L, startTime.minusMillis(8), "e1-4")).futureValue shouldBe Duplicate
offsetStore.validate(createEnvelope(pid1, 6L, startTime, "e1-6")).futureValue shouldBe Duplicate
}

"update inflight on error and re-accept element" in {
import R2dbcOffsetStore.Validation._
val projectionId = genRandomProjectionId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ private[projection] trait OffsetStoreDao {

def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]]

def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]]

def readPrimitiveOffset(): Future[immutable.IndexedSeq[OffsetSerialization.SingleOffset]]

def insertTimestampOffsetInTx(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ private[projection] class PostgresOffsetStoreDao(
SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?"""

protected def createSelectOneTimestampOffsetSql: String =
sql"""
SELECT seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND persistence_id = ?
ORDER BY seq_nr DESC
LIMIT 1"""

private val selectOneTimestampOffsetSql: String = createSelectOneTimestampOffsetSql

private val insertTimestampOffsetSql: String =
sql"""
INSERT INTO $timestampOffsetTable
Expand Down Expand Up @@ -220,6 +229,23 @@ private[projection] class PostgresOffsetStoreDao(
})
}

def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] = {
r2dbcExecutor.selectOne("read one timestamp offset")(
conn => {
logger.trace("reading one timestamp offset for [{}] pid [{}]", projectionId, pid)
conn
.createStatement(selectOneTimestampOffsetSql)
.bind(0, slice)
.bind(1, projectionId.name)
.bind(2, pid)
},
row => {
val seqNr = row.get("seq_nr", classOf[java.lang.Long])
val timestamp = row.getTimestamp("timestamp_offset")
R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp)
})
}

override def readPrimitiveOffset(): Future[immutable.IndexedSeq[OffsetSerialization.SingleOffset]] =
r2dbcExecutor.select("read offset")(
conn => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private[projection] class R2dbcOffsetStore(
throw new IllegalArgumentException(
s"[$unknown] is not a dialect supported by this version of Akka Projection R2DBC")
}
private val dao = {
val dao: OffsetStoreDao = {
logger.debug("Offset store [{}] created, with dialect [{}]", projectionId, dialectName)
dialect.createOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId)
}
Expand Down Expand Up @@ -643,29 +643,6 @@ private[projection] class R2dbcOffsetStore(
recordWithOffset.offset)
}

def logUnknown(): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
seqNr,
pid,
recordWithOffset.offset)
}
}

if (prevSeqNr > 0) {
// expecting seqNr to be +1 of previously known
val ok = seqNr == prevSeqNr + 1
Expand Down Expand Up @@ -693,33 +670,16 @@ private[projection] class R2dbcOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
if (previousTimestamp.isBefore(before)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
pid,
seqNr,
previousTimestamp,
before)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown()
RejectedSeqNr
} else {
logUnknown()
// This will result in projection restart (with normal configuration)
RejectedBacktrackingSeqNr
}
dao.readTimestampOffset(recordWithOffset.record.slice, pid).flatMap {
case Some(loadedRecord) =>
if (seqNr == loadedRecord.seqNr + 1)
FutureAccepted
else if (seqNr <= loadedRecord.seqNr)
FutureDuplicate
else
validateEventTimestamp(currentState, recordWithOffset)
case None =>
// previous not found, could have been deleted
Accepted
validateEventTimestamp(currentState, recordWithOffset)
}
}
} else {
Expand All @@ -736,6 +696,64 @@ private[projection] class R2dbcOffsetStore(
}
}

private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = {
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr

def logUnknown(): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
seqNr,
pid,
recordWithOffset.offset)
}
}

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
if (previousTimestamp.isBefore(before)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
pid,
seqNr,
previousTimestamp,
before)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown()
RejectedSeqNr
} else {
logUnknown()
// This will result in projection restart (with normal configuration)
RejectedBacktrackingSeqNr
}
case None =>
// previous not found, could have been deleted
Accepted
}
}

@tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ private[projection] class SqlServerOffsetStoreDao(

override protected implicit def timestampCodec: TimestampCodec = SqlServerTimestampCodec

override protected def createSelectOneTimestampOffsetSql: String =
sql"""
SELECT TOP(1) seq_nr, timestamp_offset
FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND persistence_id = ?
ORDER BY seq_nr DESC"""

override protected def createUpsertOffsetSql() =
sql"""
UPDATE $offsetTable SET
Expand Down

0 comments on commit e956248

Please sign in to comment.