diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index b3358d8e..8f054599 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -16,7 +16,6 @@ package za.co.absa.pramen.core.bookkeeper -import org.slf4j.LoggerFactory import slick.jdbc.H2Profile.api._ import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} @@ -29,8 +28,6 @@ import scala.util.control.NonFatal class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { import za.co.absa.pramen.core.utils.FutureImplicits._ - private val log = LoggerFactory.getLogger(this.getClass) - override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = { val offsets = getOffsetRecords(table, infoDate) @@ -60,6 +57,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { } override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = { + // ToDo Consider adding a caching layer for this val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table)) try { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index 85750f0e..30bbc997 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -54,7 +54,7 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats - def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader + def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader def commitIncrementalTables(): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 6b2f607c..044a8ddc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -204,7 +204,7 @@ class MetastoreImpl(appConfig: Config, MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false) } - override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = { val metastore = this new MetastoreReaderCore { @@ -219,7 +219,7 @@ class MetastoreImpl(appConfig: Config, override def getCurrentBatch(tableName: String): DataFrame = { validateTable(tableName) - if (isPostProcessing && isIncremental && !isRerun) { + if (isIncremental && !isRerun && isPostProcessing) { metastore.getBatch(tableName, infoDate, None) } else if (isIncremental && !isRerun) { getIncremental(tableName, outputTable, infoDate) @@ -284,7 +284,6 @@ class MetastoreImpl(appConfig: Config, val trackingName = s"$tableName->$transformationOutputTable" val tableDef = getTableDef(tableName) val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType - val needsToCommit = !isPostProcessing && !incrementalDryRun val om = bookkeeper.getOffsetManager val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate)) @@ -303,7 +302,7 @@ class MetastoreImpl(appConfig: Config, tableDf } - if (needsToCommit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) { + if (commitChanges && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) { log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''") val trackingTable = TrackingTable( diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index e486db87..73b50e4e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -130,25 +130,18 @@ class IncrementalIngestionJob(operationDef: OperationDef, jobStarted: Instant, inputRecordCount: Option[Long]): SaveResult = { val isRerun = runReason == TaskRunReason.Rerun - val dfToSave = df.withColumn(outputTable.batchIdColumn, lit(batchId)) - val om = bookkeeper.getOffsetManager - val offsetInfo = source.getOffsetInfo.getOrElse( - throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''") + throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}'") ) validateOffsetColumn(df, offsetInfo) - val req = om.startWriteOffsets(outputTable.name, infoDate, offsetInfo.offsetType) val stats = try { - val statsToReturn = if (isRerun) { - metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Overwrite)) - } else { - metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append)) - } + val saveMode = if (isRerun) SaveMode.Overwrite else SaveMode.Append + val statsToReturn = metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(saveMode)) val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw]) df @@ -176,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, incrementalDryRun = false, isPostProcessing = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, commitChanges = false, isPostProcessing = true), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index a0296316..5e370112 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, incrementalDryRun = false, isPostProcessing = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, commitChanges = false, isPostProcessing = true), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 5c617126..c53c1ca2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -122,7 +122,7 @@ class SinkJob(operationDef: OperationDef, case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex) } - val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false) + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false) try { val sinkResult = sink.send(df, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 4997df14..e09d365c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -54,11 +54,11 @@ class TransformationJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false), infoDate, operationDef.extraOptions) + transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = false), infoDate, operationDef.extraOptions) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false) + val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false) val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage() @@ -83,7 +83,7 @@ class TransformationJob(operationDef: OperationDef, else SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None)) - val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = true) + val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = true) try { transformer.postProcess( diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 179bba18..1b3e832e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -390,7 +390,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -404,7 +404,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) val ex = intercept[TableNotConfigured] { reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -420,7 +420,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) val runInfo1 = reader.getTableRunInfo("table1", infoDate) val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) @@ -438,7 +438,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) val metadataManager = reader.metadataManager metadataManager.setMetadata("table1", infoDate, "key1", "value1") @@ -456,7 +456,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) m.saveTable("table1", infoDate.plusDays(1), getDf) - val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) val date1 = reader.getLatestAvailableDate("table1") val date2 = reader.getLatestAvailableDate("table1", Some(infoDate)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index b730de97..db32d380 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -105,7 +105,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), stats } - override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = { val metastore = this new MetastoreReaderCore {