diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index 67830753..6da3d18b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -115,14 +115,25 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = { val committedAt = Instant.now() + val committedAtMilli = committedAt.toEpochMilli val records = commitRequests.map { req => - OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAt.toEpochMilli)) + OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli)) } db.run( OffsetRecords.records ++= records ).execute() + + commitRequests.map(r => (r.table, r.infoDate)) + .distinct + .foreach { case (table, infoDate) => + db.run( + OffsetRecords.records + .filter(r => r.pramenTableName === table && r.infoDate === infoDate.toString && r.committedAt =!= committedAtMilli) + .delete + ).execute() + } } override def rollbackOffsets(request: DataOffsetRequest): Unit = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 4eac866b..72cfb45e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -220,11 +220,15 @@ class MetastoreImpl(appConfig: Config, override def getCurrentBatch(tableName: String): DataFrame = { validateTable(tableName) if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) { + log.info(s"Getting the current batch for table '$tableName' at '$infoDate'...") metastore.getBatch(tableName, infoDate, None) } else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) { + log.info(s"Getting the current incremental chunk for table '$tableName' at '$infoDate'...") getIncremental(tableName, infoDate) - } else + } else { + log.info(s"Getting daily data for table '$tableName' at '$infoDate'...") metastore.getTable(tableName, Option(infoDate), Option(infoDate)) + } } override def getLatest(tableName: String, until: Option[LocalDate] = None): DataFrame = { @@ -269,9 +273,25 @@ class MetastoreImpl(appConfig: Config, override def metadataManager: MetadataManager = metadata - override def commitTable(tableName: String, trackingName: String): Unit = { + override def commitOutputTable(tableName: String, trackingName: String): Unit = { if (readMode != ReaderMode.Batch) { - getIncrementalDf(tableName, trackingName, infoDate, commit = true) + val om = bookkeeper.getOffsetManager + val minMax = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate)) + val batchIdValue = OffsetValue.IntegralValue(batchId) + log.info(s"Starting offset commit for output table '$trackingName' for '$infoDate'.") + val trackingTable = TrackingTable( + Thread.currentThread().getId, + tableName, + outputTable, + trackingName, + "", + minMax.map(_.minimumOffset), + minMax.map(_.maximumOffset), + infoDate, + Instant.now() + ) + + trackingTables += trackingTable } } @@ -318,7 +338,7 @@ class MetastoreImpl(appConfig: Config, } if (commit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) { - log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''") + log.info(s"Starting offset commit for table '$trackingName' for '$infoDate'") val trackingTable = TrackingTable( Thread.currentThread().getId, @@ -326,6 +346,7 @@ class MetastoreImpl(appConfig: Config, outputTable, trackingName, tableDef.batchIdColumn, + offsets.map(_.minimumOffset), offsets.map(_.maximumOffset), infoDate, Instant.now() @@ -366,12 +387,16 @@ class MetastoreImpl(appConfig: Config, } private[core] def commitIncremental(trackingTables: Seq[TrackingTable]): Unit = { - val commitRequests = trackingTables.flatMap { trackingTable => - val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate)) + if (trackingTables.isEmpty) + return - if (df.isEmpty) { - None - } else { + val om = bookkeeper.getOffsetManager + val batchIdValue = OffsetValue.IntegralValue(batchId) + + val commitRequests = trackingTables.flatMap { trackingTable => + val tableDef = getTableDef(trackingTable.inputTable) + if (tableDef.format.isInstanceOf[DataFormat.Raw]) { + val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate)) getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match { case Some((minOffset, maxOffset)) => log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.") @@ -386,11 +411,20 @@ class MetastoreImpl(appConfig: Config, log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.") None } + } else { + val minOffset = trackingTable.currentMinOffset.getOrElse(batchIdValue) + log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='$batchId'.") + Some(OffsetCommitRequest( + trackingTable.trackingName, + trackingTable.infoDate, + minOffset, + batchIdValue, + trackingTable.createdAt + )) } } if (commitRequests.nonEmpty) { - val om = bookkeeper.getOffsetManager om.postCommittedRecords(commitRequests) log.info(s"Committed ${commitRequests.length} requests.'") } @@ -451,14 +485,7 @@ object MetastoreImpl { } } - private[core] def getMinMaxOffsetFromMetastoreDf(dfIn: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = { - val df = currentMax match { - case Some(currentMax) => - dfIn.filter(col(batchIdColumn) > currentMax.getSparkLit) - case None => - dfIn - } - + private[core] def getMinMaxOffsetFromMetastoreDf(df: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = { val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType OffsetManagerUtils.getMinMaxValueFromData(df, batchIdColumn, offsetType) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala index 08ce5a78..d35efd6e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.metastore import za.co.absa.pramen.api.MetastoreReader trait MetastoreReaderCore extends MetastoreReader { - def commitTable(tableName: String, trackingName: String): Unit + def commitOutputTable(tableName: String, trackingName: String): Unit def commitIncrementalStage(): Unit } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala index 326e85d9..bc984380 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala @@ -26,6 +26,7 @@ case class TrackingTable( outputTable: String, trackingName: String, batchIdColumn: String, + currentMinOffset: Option[OffsetValue], currentMaxOffset: Option[OffsetValue], infoDate: LocalDate, createdAt: Instant diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 44ef1d57..592ba32e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -143,8 +143,6 @@ class SinkJob(operationDef: OperationDef, val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) - metastoreReader.asInstanceOf[MetastoreReaderCore].commitTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName") - try { val sinkResult = sink.send(df, sinkTable.metaTableName, @@ -153,10 +151,14 @@ class SinkJob(operationDef: OperationDef, sinkTable.options ) - val jobFinished = Instant.now - val isTransient = outputTable.format.isTransient + if (!isTransient) { + metastoreReader.asInstanceOf[MetastoreReaderCore].commitOutputTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName") + } + + val jobFinished = Instant.now + val tooLongWarnings = getTookTooLongWarnings(jobStarted, jobFinished, sinkTable.warnMaxExecutionTimeSeconds) bookkeeper.setRecordCount(outputTable.name, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 66dc6765..b12e473f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -102,7 +102,7 @@ class TransformationJob(operationDef: OperationDef, val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch val metastoreReaderRun = metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, readerMode) - metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitTable(outputTable.name, outputTable.name) + metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitOutputTable(outputTable.name, outputTable.name) metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitIncrementalStage() } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala index 2f3d4d50..86a708e8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -1274,7 +1274,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec val om = new OffsetManagerJdbc(pramenDb.db, 123L) val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset]) - assert(offsets.length == 2) + assert(offsets.length == 1) } succeed } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 1d1a79f0..90416219 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -171,7 +171,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), } } - override def commitTable(tableName: String, trackingName: String): Unit = {} + override def commitOutputTable(tableName: String, trackingName: String): Unit = {} override def commitIncrementalStage(): Unit = {} }