From e956248d81634c8d6b234bf4a3600e2889ffbd94 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 18 Oct 2024 12:32:48 +0200 Subject: [PATCH] perf: Try to load previous seqNr if missing, in validation (#1197) * perf: Try to load previous seqNr if missing, in validation * makes it more safe to evict from memory, but keep more in database * a step towards lazy loading of offsets, but full lazy loading would require many more changes due to the concurrency model used in the R2dbcOffsetStore * otherwise it will use timestampOf, which are be more costly for gRPC projections * it still falls back to timestampOf as last resort * different sql for SqlServer --- .../r2dbc/R2dbcTimestampOffsetStoreSpec.scala | 31 +++++ .../r2dbc/internal/OffsetStoreDao.scala | 2 + .../internal/PostgresOffsetStoreDao.scala | 26 ++++ .../r2dbc/internal/R2dbcOffsetStore.scala | 118 ++++++++++-------- .../internal/SqlServerOffsetStoreDao.scala | 6 + 5 files changed, 133 insertions(+), 50 deletions(-) diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 7f4606eef..872ad79e1 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -554,6 +554,37 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L) } + "accept via loading of previous seqNr" in { + import R2dbcOffsetStore.Validation._ + val projectionId = genRandomProjectionId() + val eventTimestampQueryClock = TestClock.nowMicros() + val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + + val startTime = TestClock.nowMicros().instant() + + // reject unknown + val pid1 = "p1" + val env1 = createEnvelope(pid1, 7L, startTime.plusMillis(8), "e1-7") + offsetStore.validate(env1).futureValue shouldBe RejectedSeqNr + offsetStore.validate(backtrackingEnvelope(env1)).futureValue shouldBe RejectedBacktrackingSeqNr + // but if there was a stored offset (maybe evicted) + val slice = persistenceExt.sliceForPersistenceId(pid1) + r2dbcExecutor + .withConnection("insert offset") { con => + offsetStore.dao.insertTimestampOffsetInTx( + con, + Vector(R2dbcOffsetStore.Record(slice, pid1, 5L, startTime.minusMillis(1)))) + offsetStore.dao.insertTimestampOffsetInTx(con, Vector(R2dbcOffsetStore.Record(slice, pid1, 6L, startTime))) + } + .futureValue + // then it is accepted + offsetStore.validate(env1).futureValue shouldBe Accepted + + // and also detect duplicate that way + offsetStore.validate(createEnvelope(pid1, 4L, startTime.minusMillis(8), "e1-4")).futureValue shouldBe Duplicate + offsetStore.validate(createEnvelope(pid1, 6L, startTime, "e1-6")).futureValue shouldBe Duplicate + } + "update inflight on error and re-accept element" in { import R2dbcOffsetStore.Validation._ val projectionId = genRandomProjectionId() diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala index 418717c47..e9d0a54b4 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/OffsetStoreDao.scala @@ -24,6 +24,8 @@ private[projection] trait OffsetStoreDao { def readTimestampOffset(): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] + def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] + def readPrimitiveOffset(): Future[immutable.IndexedSeq[OffsetSerialization.SingleOffset]] def insertTimestampOffsetInTx( diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala index 744d43985..651a7af52 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/PostgresOffsetStoreDao.scala @@ -68,6 +68,15 @@ private[projection] class PostgresOffsetStoreDao( SELECT projection_key, slice, persistence_id, seq_nr, timestamp_offset FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ?""" + protected def createSelectOneTimestampOffsetSql: String = + sql""" + SELECT seq_nr, timestamp_offset + FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND persistence_id = ? + ORDER BY seq_nr DESC + LIMIT 1""" + + private val selectOneTimestampOffsetSql: String = createSelectOneTimestampOffsetSql + private val insertTimestampOffsetSql: String = sql""" INSERT INTO $timestampOffsetTable @@ -220,6 +229,23 @@ private[projection] class PostgresOffsetStoreDao( }) } + def readTimestampOffset(slice: Int, pid: String): Future[Option[R2dbcOffsetStore.Record]] = { + r2dbcExecutor.selectOne("read one timestamp offset")( + conn => { + logger.trace("reading one timestamp offset for [{}] pid [{}]", projectionId, pid) + conn + .createStatement(selectOneTimestampOffsetSql) + .bind(0, slice) + .bind(1, projectionId.name) + .bind(2, pid) + }, + row => { + val seqNr = row.get("seq_nr", classOf[java.lang.Long]) + val timestamp = row.getTimestamp("timestamp_offset") + R2dbcOffsetStore.Record(slice, pid, seqNr, timestamp) + }) + } + override def readPrimitiveOffset(): Future[immutable.IndexedSeq[OffsetSerialization.SingleOffset]] = r2dbcExecutor.select("read offset")( conn => { diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index c22cb8fde..22db1e7e9 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -218,7 +218,7 @@ private[projection] class R2dbcOffsetStore( throw new IllegalArgumentException( s"[$unknown] is not a dialect supported by this version of Akka Projection R2DBC") } - private val dao = { + val dao: OffsetStoreDao = { logger.debug("Offset store [{}] created, with dialect [{}]", projectionId, dialectName) dialect.createOffsetStoreDao(settings, sourceProvider, system, r2dbcExecutor, projectionId) } @@ -643,29 +643,6 @@ private[projection] class R2dbcOffsetStore( recordWithOffset.offset) } - def logUnknown(): Unit = { - if (recordWithOffset.fromPubSub) { - logger.debug( - "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", - seqNr, - pid, - recordWithOffset.offset) - } else if (!recordWithOffset.fromBacktracking) { - // This may happen rather frequently when using `publish-events`, after reconnecting and such. - logger.debug( - "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", - seqNr, - pid, - recordWithOffset.offset) - } else { - logger.warn( - "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}", - seqNr, - pid, - recordWithOffset.offset) - } - } - if (prevSeqNr > 0) { // expecting seqNr to be +1 of previously known val ok = seqNr == prevSeqNr + 1 @@ -693,33 +670,16 @@ private[projection] class R2dbcOffsetStore( // always accept starting from snapshots when there was no previous event seen FutureAccepted } else { - // Haven't see seen this pid within the time window. Since events can be missed - // when read at the tail we will only accept it if the event with previous seqNr has timestamp - // before the time window of the offset store. - // Backtracking will emit missed event again. - timestampOf(pid, seqNr - 1).map { - case Some(previousTimestamp) => - val before = currentState.latestTimestamp.minus(settings.timeWindow) - if (previousTimestamp.isBefore(before)) { - logger.debug( - "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + - "is before time window [{}].", - pid, - seqNr, - previousTimestamp, - before) - Accepted - } else if (!recordWithOffset.fromBacktracking) { - logUnknown() - RejectedSeqNr - } else { - logUnknown() - // This will result in projection restart (with normal configuration) - RejectedBacktrackingSeqNr - } + dao.readTimestampOffset(recordWithOffset.record.slice, pid).flatMap { + case Some(loadedRecord) => + if (seqNr == loadedRecord.seqNr + 1) + FutureAccepted + else if (seqNr <= loadedRecord.seqNr) + FutureDuplicate + else + validateEventTimestamp(currentState, recordWithOffset) case None => - // previous not found, could have been deleted - Accepted + validateEventTimestamp(currentState, recordWithOffset) } } } else { @@ -736,6 +696,64 @@ private[projection] class R2dbcOffsetStore( } } + private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = { + import Validation._ + val pid = recordWithOffset.record.pid + val seqNr = recordWithOffset.record.seqNr + + def logUnknown(): Unit = { + if (recordWithOffset.fromPubSub) { + logger.debug( + "Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + seqNr, + pid, + recordWithOffset.offset) + } else if (!recordWithOffset.fromBacktracking) { + // This may happen rather frequently when using `publish-events`, after reconnecting and such. + logger.debug( + "Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}", + seqNr, + pid, + recordWithOffset.offset) + } else { + logger.warn( + "Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}", + seqNr, + pid, + recordWithOffset.offset) + } + } + + // Haven't see seen this pid within the time window. Since events can be missed + // when read at the tail we will only accept it if the event with previous seqNr has timestamp + // before the time window of the offset store. + // Backtracking will emit missed event again. + timestampOf(pid, seqNr - 1).map { + case Some(previousTimestamp) => + val before = currentState.latestTimestamp.minus(settings.timeWindow) + if (previousTimestamp.isBefore(before)) { + logger.debug( + "Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " + + "is before time window [{}].", + pid, + seqNr, + previousTimestamp, + before) + Accepted + } else if (!recordWithOffset.fromBacktracking) { + logUnknown() + RejectedSeqNr + } else { + logUnknown() + // This will result in projection restart (with normal configuration) + RejectedBacktrackingSeqNr + } + case None => + // previous not found, could have been deleted + Accepted + } + } + @tailrec final def addInflight[Envelope](envelope: Envelope): Unit = { createRecordWithOffset(envelope) match { case Some(recordWithOffset) => diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala index b5bff7ee4..9fd356dcf 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/SqlServerOffsetStoreDao.scala @@ -39,6 +39,12 @@ private[projection] class SqlServerOffsetStoreDao( override protected implicit def timestampCodec: TimestampCodec = SqlServerTimestampCodec + override protected def createSelectOneTimestampOffsetSql: String = + sql""" + SELECT TOP(1) seq_nr, timestamp_offset + FROM $timestampOffsetTable WHERE slice = ? AND projection_name = ? AND persistence_id = ? + ORDER BY seq_nr DESC""" + override protected def createUpsertOffsetSql() = sql""" UPDATE $offsetTable SET