diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetCommitRequest.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetCommitRequest.scala new file mode 100644 index 00000000..b38fda93 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetCommitRequest.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import za.co.absa.pramen.api.offset.OffsetValue + +import java.time.{Instant, LocalDate} + +case class OffsetCommitRequest( + table: String, + infoDate: LocalDate, + minOffset: OffsetValue, + maxOffset: OffsetValue, + createdAt: Instant + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala index 947f9f6b..d04401c5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala @@ -82,7 +82,7 @@ trait OffsetManager { /** * Combines both startWriteOffsets() and commitOffsets() into one operation when it is applicable. */ - def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit + def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit /** * Rolls back an offset request diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala index d2f65acc..b3358d8e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerJdbc.scala @@ -116,13 +116,15 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager { ).execute() } - override def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = { - val createdAt = Instant.now() + override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = { + val committedAt = Instant.now() - val record = OffsetRecord(table, infoDate.toString, minOffset.dataType.dataTypeString, minOffset.valueString, maxOffset.valueString, batchId, createdAt.toEpochMilli, Some(createdAt.toEpochMilli)) + val records = commitRequests.map { req => + OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAt.toEpochMilli)) + } db.run( - OffsetRecords.records += record + OffsetRecords.records ++= records ).execute() } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index 1d94d446..85750f0e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -55,4 +55,8 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader + + def commitIncrementalTables(): Unit + + def rollbackIncrementalTables(): Unit } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index fa7000c0..edd13af6 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -27,7 +27,7 @@ import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue} import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} -import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest} import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable} import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager} @@ -269,15 +269,9 @@ class MetastoreImpl(appConfig: Config, override def metadataManager: MetadataManager = metadata - override def commitIncremental(isTransient: Boolean): Unit = { - if (isTransient) { - metastore.addTrackingTables(trackingTables.toSeq) - trackingTables.clear() - } else { - metastore.commitIncremental(trackingTables.toSeq) - metastore.commitGlobalTrackingTables() - trackingTables.clear() - } + override def commitIncrementalStage(): Unit = { + metastore.addTrackingTables(trackingTables.toSeq) + trackingTables.clear() } private def validateTable(tableName: String): Unit = { @@ -313,12 +307,14 @@ class MetastoreImpl(appConfig: Config, log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''") val trackingTable = TrackingTable( + Thread.currentThread().getId, tableName, outputTable, trackingName, tableDef.batchIdColumn, offsets.map(_.maximumOffset), - infoDate + infoDate, + Instant.now() ) trackingTables += trackingTable @@ -329,6 +325,18 @@ class MetastoreImpl(appConfig: Config, } } + override def commitIncrementalTables(): Unit = synchronized { + val threadId = Thread.currentThread().getId + val tablesToCommit = globalTrackingTables.filter(_.threadId == threadId) + commitIncremental(tablesToCommit.toSeq) + globalTrackingTables --= tablesToCommit + } + + override def rollbackIncrementalTables(): Unit = synchronized { + val threadId = Thread.currentThread().getId + globalTrackingTables --= globalTrackingTables.filter(_.threadId == threadId) + } + private[core] def prepareHiveSchema(schema: StructType, mt: MetaTable): StructType = { val fieldType = if (mt.infoDateFormat == DEFAULT_DATE_FORMAT) DateType else StringType @@ -343,28 +351,31 @@ class MetastoreImpl(appConfig: Config, globalTrackingTables ++= trackingTables } - private[core] def commitGlobalTrackingTables(): Unit = synchronized { - commitIncremental(globalTrackingTables.toSeq) - globalTrackingTables.clear() - } - private[core] def commitIncremental(trackingTables: Seq[TrackingTable]): Unit = { - val om = if (trackingTables.nonEmpty) bookkeeper.getOffsetManager else null - - trackingTables.foreach { trackingTable => - log.info(s"Committing offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'") + val commitRequests = trackingTables.flatMap { trackingTable => val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate)) getMinMaxOffsetFromDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match { case Some((minOffset, maxOffset)) => - val offsetType = if (df.schema.fields.find(_.name == trackingTable.batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType - val req = om.startWriteOffsets(trackingTable.trackingName, trackingTable.infoDate, offsetType) - om.commitOffsets(req, minOffset, maxOffset) log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.") + Some(OffsetCommitRequest( + trackingTable.trackingName, + trackingTable.infoDate, + minOffset, + maxOffset, + trackingTable.createdAt + )) case None => log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.") + None } } + + if (commitRequests.nonEmpty) { + val om = bookkeeper.getOffsetManager + om.postCommittedRecords(commitRequests) + log.info(s"Committed ${commitRequests.length} requests.'") + } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala index 24cc56fb..11393d5d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala @@ -19,5 +19,5 @@ package za.co.absa.pramen.core.metastore import za.co.absa.pramen.api.MetastoreReader trait MetastoreReaderCore extends MetastoreReader { - def commitIncremental(isTransient: Boolean): Unit + def commitIncrementalStage(): Unit } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala index a67894fe..326e85d9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala @@ -18,13 +18,15 @@ package za.co.absa.pramen.core.metastore.model import za.co.absa.pramen.api.offset.OffsetValue -import java.time.LocalDate +import java.time.{Instant, LocalDate} case class TrackingTable( + threadId: Long, inputTable: String, outputTable: String, trackingName: String, batchIdColumn: String, currentMaxOffset: Option[OffsetValue], - infoDate: LocalDate + infoDate: LocalDate, + createdAt: Instant ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index ce2bb436..a3c9016c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -72,7 +72,7 @@ class MetastorePersistenceRaw(path: String, fsUtilsTrg.createDirectoryRecursive(outputDir) - var totalSize = 0L + var copiedSize = 0L if (files.isEmpty) { log.info("Nohting to save") @@ -84,16 +84,37 @@ class MetastorePersistenceRaw(path: String, log.info(s"Copying file from $srcPath to $trgPath") - totalSize += fsSrc.getContentSummary(srcPath).getLength + copiedSize += fsSrc.getContentSummary(srcPath).getLength fsUtilsTrg.copyFile(srcPath, trgPath) }) } - MetaTableStats( - Option(totalSize), - None, - Some(totalSize) - ) + val stats = if (saveModeOpt.contains(SaveMode.Append)) { + val list = getListOfFilesRange(infoDate, infoDate) + if (list.isEmpty) { + MetaTableStats( + Option(copiedSize), + None, + Some(copiedSize) + ) + } else { + val totalSize = list.map(_.getLen).sum + MetaTableStats( + Option(totalSize), + Some(copiedSize), + Some(totalSize) + ) + } + } else { + MetaTableStats( + Option(copiedSize), + None, + Some(copiedSize) + ) + } + + log.info(s"Stats: ${stats}") + stats } override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index ecbf834a..36c07e88 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -610,9 +610,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot task.runStatus match { case s: Succeeded => - s.recordCount match { - case Some(recordCount) => renderDifferenceSize(recordCount, s.recordCountOld) - case None => "" + (s.recordCount, s.recordsAppended) match { + case (Some(sizeTotal), Some(sizeAppended)) => renderDifferenceSize(sizeTotal, Some(sizeTotal - sizeAppended)) + case (Some(sizeTotal), None) => renderDifferenceSize(sizeTotal, s.recordCountOld) + case _ => "" } case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld) case _ => "" diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala index 20d05030..69adfe57 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import za.co.absa.pramen.api.Reason import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} +import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy @@ -31,6 +32,8 @@ trait Job { val outputTable: MetaTable + val metastore: Metastore + val operation: OperationDef val scheduleStrategy: ScheduleStrategy diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index dd36fe88..cff40fd5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} import za.co.absa.pramen.api.jobdef.Schedule -import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, JobType, MetastoreDependency, TaskDef, TaskRunReason} +import za.co.absa.pramen.api.status._ import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore @@ -32,7 +32,7 @@ import java.time.{Instant, LocalDate} import scala.util.{Failure, Success, Try} abstract class JobBase(operationDef: OperationDef, - metastore: Metastore, + val metastore: Metastore, bookkeeper: Bookkeeper, jobNotificationTargets: Seq[JobNotificationTarget], outputTableDef: MetaTable diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index dffe5091..5c617126 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -148,6 +148,8 @@ class SinkJob(operationDef: OperationDef, isTransient ) + metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage() + val stats = MetaTableStats(Option(sinkResult.recordsSent), None, None) SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings) } catch { @@ -155,7 +157,6 @@ class SinkJob(operationDef: OperationDef, } finally { Try { sink.close() - metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental(false) } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 77833380..4997df14 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -58,16 +58,10 @@ class TransformationJob(operationDef: OperationDef, } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - val isTransient = outputTable.format.isTransient val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false) - val runResult = try { - RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) - } finally { - // (!) ToDo Commit only on success, and commit only on save. - // Rollback everything on failure - // Use ThreadId for parallrl jobs - metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental(isTransient) - } + val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) + + metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage() runResult } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 8b930012..39b7ac47 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -390,6 +390,10 @@ abstract class TaskRunnerBase(conf: Config, task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount) } + if (!isTransient) { + task.job.metastore.commitIncrementalTables() + } + val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) { val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || task.reason == TaskRunReason.Rerun task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate) @@ -432,6 +436,7 @@ abstract class TaskRunnerBase(conf: Config, case ex: Throwable => Failure(new FatalErrorWrapper("Fatal error has occurred.", ex)) } finally { if (!isTransient) { + task.job.metastore.rollbackIncrementalTables() lock.release() } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index 359c77d3..8f920784 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -21,12 +21,13 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, Reason} -import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory} -import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore} import za.co.absa.pramen.core.mocks.MetaTableFactory.getDummyMetaTable +import za.co.absa.pramen.core.mocks.metastore.MetastoreSpy import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} +import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory} import java.time.{Instant, LocalDate} @@ -59,6 +60,8 @@ class JobSpy(jobName: String = "Dummy Job", override val outputTable: MetaTable = getDummyMetaTable(outputTableIn, format = outputTableFormat, hiveTable = hiveTable) + override val metastore: Metastore = new MetastoreSpy() + override val operation: OperationDef = operationDef override val scheduleStrategy: ScheduleStrategy = scheduleStrategyIn diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index c52ca210..b730de97 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -171,7 +171,11 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), } } - override def commitIncremental(isTransient: Boolean): Unit = {} + override def commitIncrementalStage(): Unit = {} } } + + override def commitIncrementalTables(): Unit = {} + + override def rollbackIncrementalTables(): Unit = {} }