Skip to content

Commit

Permalink
#520 Add chain-commit transaction for incremental jobs with transient…
Browse files Browse the repository at this point in the history
… job dependencies.
  • Loading branch information
yruslan committed Nov 28, 2024
1 parent aeac5df commit 254dafa
Show file tree
Hide file tree
Showing 16 changed files with 136 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.bookkeeper

import za.co.absa.pramen.api.offset.OffsetValue

import java.time.{Instant, LocalDate}

case class OffsetCommitRequest(
table: String,
infoDate: LocalDate,
minOffset: OffsetValue,
maxOffset: OffsetValue,
createdAt: Instant
)
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ trait OffsetManager {
/**
* Combines both startWriteOffsets() and commitOffsets() into one operation when it is applicable.
*/
def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit
def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit

/**
* Rolls back an offset request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,15 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
).execute()
}

override def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
val createdAt = Instant.now()
override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = {
val committedAt = Instant.now()

val record = OffsetRecord(table, infoDate.toString, minOffset.dataType.dataTypeString, minOffset.valueString, maxOffset.valueString, batchId, createdAt.toEpochMilli, Some(createdAt.toEpochMilli))
val records = commitRequests.map { req =>
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAt.toEpochMilli))
}

db.run(
OffsetRecords.records += record
OffsetRecords.records ++= records
).execute()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ trait Metastore {
def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader

def commitIncrementalTables(): Unit

def rollbackIncrementalTables(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest}
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable}
import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager}
Expand Down Expand Up @@ -269,15 +269,9 @@ class MetastoreImpl(appConfig: Config,

override def metadataManager: MetadataManager = metadata

override def commitIncremental(isTransient: Boolean): Unit = {
if (isTransient) {
metastore.addTrackingTables(trackingTables.toSeq)
trackingTables.clear()
} else {
metastore.commitIncremental(trackingTables.toSeq)
metastore.commitGlobalTrackingTables()
trackingTables.clear()
}
override def commitIncrementalStage(): Unit = {
metastore.addTrackingTables(trackingTables.toSeq)
trackingTables.clear()
}

private def validateTable(tableName: String): Unit = {
Expand Down Expand Up @@ -313,12 +307,14 @@ class MetastoreImpl(appConfig: Config,
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''")

val trackingTable = TrackingTable(
Thread.currentThread().getId,
tableName,
outputTable,
trackingName,
tableDef.batchIdColumn,
offsets.map(_.maximumOffset),
infoDate
infoDate,
Instant.now()
)

trackingTables += trackingTable
Expand All @@ -329,6 +325,18 @@ class MetastoreImpl(appConfig: Config,
}
}

override def commitIncrementalTables(): Unit = synchronized {
val threadId = Thread.currentThread().getId
val tablesToCommit = globalTrackingTables.filter(_.threadId == threadId)
commitIncremental(tablesToCommit.toSeq)
globalTrackingTables --= tablesToCommit
}

override def rollbackIncrementalTables(): Unit = synchronized {
val threadId = Thread.currentThread().getId
globalTrackingTables --= globalTrackingTables.filter(_.threadId == threadId)
}

private[core] def prepareHiveSchema(schema: StructType, mt: MetaTable): StructType = {
val fieldType = if (mt.infoDateFormat == DEFAULT_DATE_FORMAT) DateType else StringType

Expand All @@ -343,28 +351,31 @@ class MetastoreImpl(appConfig: Config,
globalTrackingTables ++= trackingTables
}

private[core] def commitGlobalTrackingTables(): Unit = synchronized {
commitIncremental(globalTrackingTables.toSeq)
globalTrackingTables.clear()
}

private[core] def commitIncremental(trackingTables: Seq[TrackingTable]): Unit = {
val om = if (trackingTables.nonEmpty) bookkeeper.getOffsetManager else null

trackingTables.foreach { trackingTable =>
log.info(s"Committing offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'")
val commitRequests = trackingTables.flatMap { trackingTable =>
val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate))

getMinMaxOffsetFromDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
val offsetType = if (df.schema.fields.find(_.name == trackingTable.batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
val req = om.startWriteOffsets(trackingTable.trackingName, trackingTable.infoDate, offsetType)
om.commitOffsets(req, minOffset, maxOffset)
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
Some(OffsetCommitRequest(
trackingTable.trackingName,
trackingTable.infoDate,
minOffset,
maxOffset,
trackingTable.createdAt
))
case None =>
log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.")
None
}
}

if (commitRequests.nonEmpty) {
val om = bookkeeper.getOffsetManager
om.postCommittedRecords(commitRequests)
log.info(s"Committed ${commitRequests.length} requests.'")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ package za.co.absa.pramen.core.metastore
import za.co.absa.pramen.api.MetastoreReader

trait MetastoreReaderCore extends MetastoreReader {
def commitIncremental(isTransient: Boolean): Unit
def commitIncrementalStage(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package za.co.absa.pramen.core.metastore.model

import za.co.absa.pramen.api.offset.OffsetValue

import java.time.LocalDate
import java.time.{Instant, LocalDate}

case class TrackingTable(
threadId: Long,
inputTable: String,
outputTable: String,
trackingName: String,
batchIdColumn: String,
currentMaxOffset: Option[OffsetValue],
infoDate: LocalDate
infoDate: LocalDate,
createdAt: Instant
)
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class MetastorePersistenceRaw(path: String,

fsUtilsTrg.createDirectoryRecursive(outputDir)

var totalSize = 0L
var copiedSize = 0L

if (files.isEmpty) {
log.info("Nohting to save")
Expand All @@ -84,16 +84,37 @@ class MetastorePersistenceRaw(path: String,

log.info(s"Copying file from $srcPath to $trgPath")

totalSize += fsSrc.getContentSummary(srcPath).getLength
copiedSize += fsSrc.getContentSummary(srcPath).getLength
fsUtilsTrg.copyFile(srcPath, trgPath)
})
}

MetaTableStats(
Option(totalSize),
None,
Some(totalSize)
)
val stats = if (saveModeOpt.contains(SaveMode.Append)) {
val list = getListOfFilesRange(infoDate, infoDate)
if (list.isEmpty) {
MetaTableStats(
Option(copiedSize),
None,
Some(copiedSize)
)
} else {
val totalSize = list.map(_.getLen).sum
MetaTableStats(
Option(totalSize),
Some(copiedSize),
Some(totalSize)
)
}
} else {
MetaTableStats(
Option(copiedSize),
None,
Some(copiedSize)
)
}

log.info(s"Stats: ${stats}")
stats
}

override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

task.runStatus match {
case s: Succeeded =>
s.recordCount match {
case Some(recordCount) => renderDifferenceSize(recordCount, s.recordCountOld)
case None => ""
(s.recordCount, s.recordsAppended) match {
case (Some(sizeTotal), Some(sizeAppended)) => renderDifferenceSize(sizeTotal, Some(sizeTotal - sizeAppended))
case (Some(sizeTotal), None) => renderDifferenceSize(sizeTotal, s.recordCountOld)
case _ => ""
}
case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld)
case _ => ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api.Reason
import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason}
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy

Expand All @@ -31,6 +32,8 @@ trait Job {

val outputTable: MetaTable

val metastore: Metastore

val operation: OperationDef

val scheduleStrategy: ScheduleStrategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.pramen.api.jobdef.Schedule
import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, JobType, MetastoreDependency, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.status._
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.metastore.Metastore
Expand All @@ -32,7 +32,7 @@ import java.time.{Instant, LocalDate}
import scala.util.{Failure, Success, Try}

abstract class JobBase(operationDef: OperationDef,
metastore: Metastore,
val metastore: Metastore,
bookkeeper: Bookkeeper,
jobNotificationTargets: Seq[JobNotificationTarget],
outputTableDef: MetaTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,15 @@ class SinkJob(operationDef: OperationDef,
isTransient
)

metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()

val stats = MetaTableStats(Option(sinkResult.recordsSent), None, None)
SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings)
} catch {
case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex)
} finally {
Try {
sink.close()
metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental(false)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,10 @@ class TransformationJob(operationDef: OperationDef,
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
val isTransient = outputTable.format.isTransient
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false)
val runResult = try {
RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))
} finally {
// (!) ToDo Commit only on success, and commit only on save.
// Rollback everything on failure
// Use ThreadId for parallrl jobs
metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental(isTransient)
}
val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))

metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()

runResult
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ abstract class TaskRunnerBase(conf: Config,
task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount)
}

if (!isTransient) {
task.job.metastore.commitIncrementalTables()
}

val hiveWarnings = if (task.job.outputTable.hiveTable.nonEmpty) {
val recreate = schemaChangesBeforeTransform.nonEmpty || schemaChangesAfterTransform.nonEmpty || task.reason == TaskRunReason.Rerun
task.job.createOrRefreshHiveTable(dfTransformed.schema, task.infoDate, recreate)
Expand Down Expand Up @@ -432,6 +436,7 @@ abstract class TaskRunnerBase(conf: Config,
case ex: Throwable => Failure(new FatalErrorWrapper("Fatal error has occurred.", ex))
} finally {
if (!isTransient) {
task.job.metastore.rollbackIncrementalTables()
lock.release()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, Reason}
import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory}
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore}
import za.co.absa.pramen.core.mocks.MetaTableFactory.getDummyMetaTable
import za.co.absa.pramen.core.mocks.metastore.MetastoreSpy
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory}

import java.time.{Instant, LocalDate}

Expand Down Expand Up @@ -59,6 +60,8 @@ class JobSpy(jobName: String = "Dummy Job",

override val outputTable: MetaTable = getDummyMetaTable(outputTableIn, format = outputTableFormat, hiveTable = hiveTable)

override val metastore: Metastore = new MetastoreSpy()

override val operation: OperationDef = operationDef

override val scheduleStrategy: ScheduleStrategy = scheduleStrategyIn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
}
}

override def commitIncremental(isTransient: Boolean): Unit = {}
override def commitIncrementalStage(): Unit = {}
}
}

override def commitIncrementalTables(): Unit = {}

override def rollbackIncrementalTables(): Unit = {}
}

0 comments on commit 254dafa

Please sign in to comment.