Skip to content

Commit

Permalink
feat: Make it possible to disable offset deletion (#1195)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Sep 12, 2024
1 parent 15c944d commit b6418a0
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion akka-projection-r2dbc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ akka.projection.r2dbc {
evict-interval = 10 seconds

# Remove old entries outside the time-window from the offset store database
# with this frequency.
# with this frequency. Can be disabled with `off`.
delete-interval = 1 minute

# Trying to batch insert offsets in batches of this size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ object R2dbcProjectionSettings {
case _ => config.getDuration("log-db-calls-exceeding").asScala
}

val deleteInterval = config.getString("offset-store.delete-interval").toLowerCase(Locale.ROOT) match {
case "off" => JDuration.ZERO
case _ => config.getDuration("offset-store.delete-interval")
}

new R2dbcProjectionSettings(
schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty),
offsetTable = config.getString("offset-store.offset-table"),
Expand All @@ -48,7 +53,7 @@ object R2dbcProjectionSettings {
timeWindow = config.getDuration("offset-store.time-window"),
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
deleteInterval,
logDbCallsExceeding,
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,12 @@ private[projection] class R2dbcOffsetStore(
// To trigger next deletion after in-memory eviction
private val triggerDeletion = new AtomicBoolean(false)

system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
() => deleteOldTimestampOffsets(),
system.executionContext)
if (!settings.deleteInterval.isZero && !settings.deleteInterval.isNegative)
system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
() => deleteOldTimestampOffsets(),
system.executionContext)

private def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] = {
sourceProvider match {
Expand Down

0 comments on commit b6418a0

Please sign in to comment.