From ffa27ae53f6f444d38e8280720b231d7cf90db8e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 29 Nov 2024 12:38:15 +0100 Subject: [PATCH] #520 Add the cache layer to the offset manager calls that incur biggest load on the database. --- .../core/bookkeeper/BookkeeperJdbc.scala | 2 +- .../core/bookkeeper/OffsetManagerCached.scala | 86 +++++++++++++++++++ .../core/bookkeeper/OffsetManagerJdbc.scala | 1 - .../bookkeeper/OffsetManagerJdbcSuite.scala | 4 +- 4 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index de82d0ea3..8a6fbf9e3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -33,7 +33,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { import za.co.absa.pramen.core.utils.FutureImplicits._ private val log = LoggerFactory.getLogger(this.getClass) - private val offsetManagement = new OffsetManagerJdbc(db, batchId) + private val offsetManagement = new OffsetManagerCached(new OffsetManagerJdbc(db, batchId)) override val bookkeepingEnabled: Boolean = true diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala new file mode 100644 index 000000000..040788aba --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scala @@ -0,0 +1,86 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset +import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} +import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} + +import java.time.LocalDate +import scala.collection.mutable + +/** + * The offset manager decorator handles caching or repeated queries. + */ +class OffsetManagerCached(offsetManager: OffsetManager) extends OffsetManager { + private val log = LoggerFactory.getLogger(this.getClass) + private val aggregatedOffsetsCache = new mutable.HashMap[(String, Option[LocalDate]), Option[DataOffsetAggregated]] + + def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { + offsetManager.getOffsets(table, infoDate) + } + + def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = { + offsetManager.getUncommittedOffsets(table, onlyForInfoDate) + } + + def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = synchronized { + if (aggregatedOffsetsCache.contains((table, onlyForInfoDate))) { + log.info(s"Got min/max offsets for '$table' from cache.") + aggregatedOffsetsCache((table, onlyForInfoDate)) + } else { + val value = offsetManager.getMaxInfoDateAndOffset(table, onlyForInfoDate) + log.info(s"Got min/max offsets for '$table' from the database. Saving to cache...") + aggregatedOffsetsCache += (table, onlyForInfoDate) -> value + value + } + } + + def startWriteOffsets(table: String, infoDate: LocalDate, offsetType: OffsetType): DataOffsetRequest = { + offsetManager.startWriteOffsets(table, infoDate, offsetType) + } + + def commitOffsets(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { + offsetManager.commitOffsets(request, minOffset, maxOffset) + + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName) + } + } + + def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(_._1 == request.tableName) + } + + offsetManager.commitRerun(request, minOffset, maxOffset) + } + + def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = { + offsetManager.postCommittedRecords(commitRequests) + + val updatedTables = commitRequests.map(_.table).toSet + this.synchronized { + aggregatedOffsetsCache --= aggregatedOffsetsCache.keys.filter(k => updatedTables.contains(k._1)) + } + } + + def rollbackOffsets(request: DataOffsetRequest): Unit = { + offsetManager.rollbackOffsets(request) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index 8f0545997..678307530 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -57,7 +57,6 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { } override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = { - // ToDo Consider adding a caching layer for this val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table)) try { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index e0896d60b..a09ed347a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -20,7 +20,7 @@ import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.api.offset.DataOffset.{CommittedOffset, UncommittedOffset} import za.co.absa.pramen.api.offset.{OffsetType, OffsetValue} -import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerJdbc} +import za.co.absa.pramen.core.bookkeeper.{OffsetManager, OffsetManagerCached, OffsetManagerJdbc} import za.co.absa.pramen.core.fixtures.RelationalDbFixture import za.co.absa.pramen.core.rdb.PramenDb import za.co.absa.pramen.core.reader.model.JdbcConfig @@ -44,7 +44,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B } def getOffsetManager: OffsetManager = { - new OffsetManagerJdbc(pramenDb.slickDb, 123L) + new OffsetManagerCached(new OffsetManagerJdbc(pramenDb.slickDb, 123L)) } "getOffsets" should {