Skip to content

Commit

Permalink
#520 Add the cache layer to the offset manager calls that incur bigge…
Browse files Browse the repository at this point in the history
…st load on the database.
  • Loading branch information
yruslan committed Nov 29, 2024
1 parent b700025 commit ffa27ae
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)
private val offsetManagement = new OffsetManagerJdbc(db, batchId)
private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId))

override val bookkeepingEnabled: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}

import java.time.LocalDate
import scala.collection.mutable

/**
* The offset manager decorator handles caching or repeated queries.
*/
class OffsetManagerCached(offsetManager: OffsetManager) extends OffsetManager {
private val log = LoggerFactory.getLogger(this.getClass)
private val aggregatedOffsetsCache = new mutable.HashMap[(String, Option[LocalDate]), Option[DataOffsetAggregated]]

def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
offsetManager.getOffsets(table, infoDate)
}

def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = {
offsetManager.getUncommittedOffsets(table, onlyForInfoDate)
}

def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = synchronized {
if (aggregatedOffsetsCache.contains((table, onlyForInfoDate))) {
log.info(s"Got min/max offsets for '$table' from cache.")
aggregatedOffsetsCache((table, onlyForInfoDate))
} else {
val value = offsetManager.getMaxInfoDateAndOffset(table, onlyForInfoDate)
log.info(s"Got min/max offsets for '$table' from the database. Saving to cache...")
aggregatedOffsetsCache += (table, onlyForInfoDate) -> value
value
}
}

def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = {
offsetManager.startWriteOffsets(table, infoDate, offsetType)
}

def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
offsetManager.commitOffsets(request, minOffset, maxOffset)

this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName)
}
}

def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName)
}

offsetManager.commitRerun(request, minOffset, maxOffset)
}

def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = {
offsetManager.postCommittedRecords(commitRequests)

val updatedTables = commitRequests.map(_.table).toSet
this.synchronized {
aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(k => updatedTables.contains(k._1))
}
}

def rollbackOffsets(request: DataOffsetRequest): Unit = {
offsetManager.rollbackOffsets(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
}

override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
// ToDo Consider adding a caching layer for this
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import za.co.absa.pramen.api.offset.DataOffset.{CommittedOffset, UncommittedOffset}
import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerJdbc}
import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerCached, OffsetManagerJdbc}
import za.co.absa.pramen.core.fixtures.RelationalDbFixture
import za.co.absa.pramen.core.rdb.PramenDb
import za.co.absa.pramen.core.reader.model.JdbcConfig
Expand All @@ -44,7 +44,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B
}

def getOffsetManager: OffsetManager = {
new OffsetManagerJdbc(pramenDb.slickDb, 123L)
new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, 123L))
}

"getOffsets" should {
Expand Down

0 comments on commit ffa27ae

Please sign in to comment.