Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Skip backtracking, PubSub after idle (backport 1.2) #615

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# PR #599 internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.copy*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState.unapply")
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internals
ProblemFilters.exclude[Problem]("akka.persistence.r2dbc.internal.BySliceQuery#QueryState*")
209 changes: 143 additions & 66 deletions core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@

package akka.persistence.r2dbc.internal

import java.time.Clock

import scala.collection.immutable
import java.time.Instant
import java.time.{ Duration => JDuration }

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import akka.NotUsed
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.InternalApi
Expand Down Expand Up @@ -43,7 +47,11 @@ import org.slf4j.Logger
backtrackingExpectFiltered = 0,
buckets = Buckets.empty,
previous = TimestampOffset.Zero,
previousBacktracking = TimestampOffset.Zero)
previousBacktracking = TimestampOffset.Zero,
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
}

final case class QueryState(
Expand All @@ -58,24 +66,32 @@ import org.slf4j.Logger
backtrackingExpectFiltered: Int,
buckets: Buckets,
previous: TimestampOffset,
previousBacktracking: TimestampOffset) {
previousBacktracking: TimestampOffset,
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {

def backtracking: Boolean = backtrackingCount > 0

def currentOffset: TimestampOffset =
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking && latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
latest.timestamp.minus(backtrackingWindow)
else if (backtracking)
latestBacktracking.timestamp
else
latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
else highestSeenSeqNr(previous, latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
def nextQueryToTimestamp(backtrackingWindow: JDuration, atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp(backtrackingWindow), atLeastNumberOfEvents) match {
case Some(t) =>
if (backtracking)
if (t.isAfter(latest.timestamp)) Some(latest.timestamp) else Some(t)
Expand Down Expand Up @@ -209,15 +225,18 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Row],
createEnvelope: (TimestampOffset, Row) => Envelope,
extractOffset: Envelope => TimestampOffset,
createHeartbeat: Instant => Option[Envelope],
clock: Clock,
settings: R2dbcSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset

private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
private val backtrackingBehindCurrentTime =
JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis)
private val firstBacktrackingQueryWindow = backtrackingWindow.plus(backtrackingBehindCurrentTime)
private val eventBucketCountInterval = JDuration.ofSeconds(60)

def currentBySlices(
Expand All @@ -229,8 +248,12 @@ import org.slf4j.Logger
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState =
state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope))
state
else
state.copy(latest = extractOffset(envelope), rowCount = state.rowCount + 1)
}

def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
// Note that we can't know how many events with the same timestamp that are filtered out
Expand All @@ -242,7 +265,7 @@ import org.slf4j.Logger
val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
Expand Down Expand Up @@ -334,45 +357,49 @@ import org.slf4j.Logger
initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(state.previousBacktracking, offset) ==
highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
rowCount = state.rowCount + 1)
if (EnvelopeOrigin.isHeartbeatEvent(envelope))
state
else {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(state.previousBacktracking, offset) ==
highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")
state.copy(
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
rowCount = state.rowCount + 1)

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debugN(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
}
} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow)))
log.debugN(
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
logPrefix,
state.latestBacktracking,
offset)
}

state.copy(latest = offset, rowCount = state.rowCount + 1)
state.copy(latest = offset, rowCount = state.rowCount + 1)
}
}
}

def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
if (switchFromBacktracking(state)) {
// switch from from backtracking immediately
// switch from backtracking immediately
None
} else {
val delay = ContinuousQuery.adjustNextDelay(
Expand All @@ -399,20 +426,38 @@ import org.slf4j.Logger
state.backtracking && state.rowCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered
}

def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = {
// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)
val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp else state.previous.timestamp
aheadOfInitial &&
previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))
}

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= qSettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)
}

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
// only start tracking query wall clock (for heartbeats) after initial backtracking query
val newQueryWallClock = if (state.latestBacktracking != TimestampOffset.Zero) clock.instant() else Instant.EPOCH
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
state.rowCountSinceBacktracking + state.rowCount >= settings.querySettings.bufferSize * 3 ||
JDuration
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

if (switchToBacktracking(state, newIdleCount)) {
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
Expand All @@ -427,15 +472,19 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
// switching from backtracking
state.copy(
rowCount = 0,
rowCountSinceBacktracking = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = 0)
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else {
// continue
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -445,16 +494,18 @@ import org.slf4j.Logger
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
}

val behindCurrentTime =
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
val toTimestamp = newState.nextQueryToTimestamp(backtrackingWindow, settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
val backtrackingInfo =
Expand Down Expand Up @@ -502,12 +553,38 @@ import org.slf4j.Logger
.via(deserializeAndAddOffset(newState.currentOffset)))
}

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _))
def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
// using wall clock to measure duration since the start time (database timestamp) up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
createHeartbeat(timestamp)
} else
None
}

val nextHeartbeat: QueryState => Option[Envelope] =
if (settings.journalPublishEvents) heartbeat else _ => None

val currentTimestamp =
if (settings.useAppTimestamp) Future.successful(InstantFactory.now())
else dao.currentDbTimestamp(minSlice)

Source
.futureSource[Envelope, NotUsed] {
currentTimestamp.map { currentTime =>
val currentWallClock = clock.instant()
ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty
.copy(latest = initialOffset, startTimestamp = currentTime, startWallClock = currentWallClock),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = beforeQuery(logPrefix, entityType, minSlice, maxSlice, _),
heartbeat = nextHeartbeat)
}
}
.mapMaterializedValue(_ => NotUsed)
}

private def beforeQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ private[r2dbc] object ContinuousQuery {
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
beforeQuery: S => Option[Future[S]] = (_: S) => None,
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))

private case object NextQuery

Expand Down Expand Up @@ -69,7 +71,8 @@ final private[r2dbc] class ContinuousQuery[S, T](
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]])
beforeQuery: S => Option[Future[S]],
heartbeat: S => Option[T])
extends GraphStage[SourceShape[T]] {
import ContinuousQuery._

Expand Down Expand Up @@ -150,8 +153,12 @@ final private[r2dbc] class ContinuousQuery[S, T](
next()
}
})
val sourceWithHeartbeat = heartbeat(newState) match {
case None => source
case Some(h) => Source.single(h).concat(source)
}
val graph = Source
.fromGraph(source)
.fromGraph(sourceWithHeartbeat)
.to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph)
sinkIn.pull()
Expand Down
Loading
Loading