Skip to content

Commit

Permalink
#520 Implement incremental transformations job tracking logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 29, 2024
1 parent ec92fc8 commit 2636b3d
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ class OffsetManagerCached(offsetManager: OffsetManager) extends OffsetManager {
}

def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = synchronized {
val tbl = onlyForInfoDate match {
case Some(date) => s"'$table' for '$date'"
case None => s"'$table'"
}

if (aggregatedOffsetsCache.contains((table, onlyForInfoDate))) {
log.info(s"Got min/max offsets for '$table' from cache.")
log.info(s"Got min/max offsets for $tbl from cache.")
aggregatedOffsetsCache((table, onlyForInfoDate))
} else {
val value = offsetManager.getMaxInfoDateAndOffset(table, onlyForInfoDate)
log.info(s"Got min/max offsets for '$table' from the database. Saving to cache...")
log.info(s"Got min/max offsets for $tbl from the database. Saving to cache...")
aggregatedOffsetsCache += (table, onlyForInfoDate) -> value
value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class MetastoreImpl(appConfig: Config,
if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) {
metastore.getBatch(tableName, infoDate, None)
} else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) {
getIncremental(tableName, outputTable, infoDate)
getIncremental(tableName, infoDate)
} else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}
Expand Down Expand Up @@ -269,6 +269,12 @@ class MetastoreImpl(appConfig: Config,

override def metadataManager: MetadataManager = metadata

override def commitTable(tableName: String, trackingName: String): Unit = {
if (readMode != ReaderMode.Batch) {
getIncrementalDf(tableName, trackingName, infoDate, commit = true)
}
}

override def commitIncrementalStage(): Unit = {
metastore.addTrackingTables(trackingTables.toSeq)
trackingTables.clear()
Expand All @@ -280,30 +286,38 @@ class MetastoreImpl(appConfig: Config,
}
}

private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = {
private def getIncremental(tableName: String, infoDate: LocalDate): DataFrame = {
val commitChanges = readMode == ReaderMode.IncrementalRun
val trackingName = s"$tableName->$transformationOutputTable"
val tableDef = getTableDef(tableName)
val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType
val om = bookkeeper.getOffsetManager
val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate))
val trackingName = s"$tableName->$outputTable"

if (!tableDf.schema.exists(_.name == tableDef.batchIdColumn)) {
throw new IllegalArgumentException(s"Table '$tableName' does not contain column '${tableDef.batchIdColumn}' needed for incremental processing.")
}
getIncrementalDf(tableName, trackingName, infoDate, commitChanges)
}

private def getIncrementalDf(tableName: String, trackingName: String, infoDate: LocalDate, commit: Boolean): DataFrame = {
val tableDef = metastore.getTableDef(tableName)
val om = bookkeeper.getOffsetManager
val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate))
val offsets = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate))

val df = offsets match {
case Some(values) =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate', column '${tableDef.batchIdColumn}' > ${values.maximumOffset.valueString}")
tableDf.filter(col(tableDef.batchIdColumn) > values.maximumOffset.getSparkLit)
case None =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate''")
tableDf
val df = if (tableDf.isEmpty) {
tableDf
} else {
if (!tableDf.schema.exists(_.name == tableDef.batchIdColumn)) {
log.error(tableDf.schema.treeString)
throw new IllegalArgumentException(s"Table '$tableName' does not contain column '${tableDef.batchIdColumn}' needed for incremental processing.")
}

offsets match {
case Some(values) =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate', column '${tableDef.batchIdColumn}' > ${values.maximumOffset.valueString}")
tableDf.filter(col(tableDef.batchIdColumn) > values.maximumOffset.getSparkLit)
case None =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate''")
tableDf
}
}

if (commitChanges && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
if (commit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''")

val trackingTable = TrackingTable(
Expand Down Expand Up @@ -355,19 +369,23 @@ class MetastoreImpl(appConfig: Config,
val commitRequests = trackingTables.flatMap { trackingTable =>
val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate))

getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
Some(OffsetCommitRequest(
trackingTable.trackingName,
trackingTable.infoDate,
minOffset,
maxOffset,
trackingTable.createdAt
))
case None =>
log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.")
None
if (df.isEmpty) {
None
} else {
getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
Some(OffsetCommitRequest(
trackingTable.trackingName,
trackingTable.infoDate,
minOffset,
maxOffset,
trackingTable.createdAt
))
case None =>
log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.")
None
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ package za.co.absa.pramen.core.metastore
import za.co.absa.pramen.api.MetastoreReader

trait MetastoreReaderCore extends MetastoreReader {
def commitTable(tableName: String, trackingName: String): Unit

def commitIncrementalStage(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ class OperationSplitter(conf: Config,
val notificationTargets = operationDef.notificationTargets
.map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf))

Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer, batchId))
val latestInfoDateOpt = if (operationDef.schedule == Schedule.Incremental) {
bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable, None).map(_.maximumInfoDate)
} else None

Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer, latestInfoDateOpt))
}

def createPythonTransformation(operationDef: OperationDef,
Expand All @@ -139,7 +143,11 @@ class OperationSplitter(conf: Config,
val notificationTargets = operationDef.notificationTargets
.map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf))

Seq(new PythonTransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, pythonClass, pramenPyConfig, processRunner, databricksClientOpt))
val latestInfoDateOpt = if (operationDef.schedule == Schedule.Incremental) {
bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable, None).map(_.maximumInfoDate)
} else None

Seq(new PythonTransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, pythonClass, pramenPyConfig, processRunner, databricksClientOpt, latestInfoDateOpt))
}

def createSink(operationDef: OperationDef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import za.co.absa.pramen.core.metastore.MetastoreImpl.DEFAULT_RECORDS_PER_PARTIT
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.PythonTransformationJob._
import za.co.absa.pramen.core.process.ProcessRunner
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing}
import za.co.absa.pramen.core.utils.StringUtils.escapeString

import java.io.{BufferedWriter, File, FileWriter}
Expand Down Expand Up @@ -63,15 +63,21 @@ class PythonTransformationJob(operationDef: OperationDef,
pythonClass: String,
pramenPyCmdConfigOpt: Option[PramenPyCmdConfig],
processRunner: ProcessRunner,
databricksClientOpt: Option[DatabricksClient])
databricksClientOpt: Option[DatabricksClient],
latestInfoDate: Option[LocalDate])
(implicit spark: SparkSession)
extends JobBase(operationDef, metastore, bookkeeper,notificationTargets, outputTable) {

override val jobType: JobType = JobType.PythonTransformation(pythonClass)

private val minimumRecords: Int = operationDef.extraOptions.getOrElse(MINIMUM_RECORDS_OPTION, "0").toInt

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing
override val scheduleStrategy: ScheduleStrategy = {
if (isIncremental)
new ScheduleStrategyIncremental(latestInfoDate, true)
else
new ScheduleStrategySourcing
}

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.pramen.api.jobdef.SinkTable
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Sink}
import za.co.absa.pramen.api.{MetastoreReader, Reason, Sink}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore}
import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.SparkUtils._

Expand All @@ -49,13 +49,20 @@ class SinkJob(operationDef: OperationDef,

private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing
override val scheduleStrategy: ScheduleStrategy = {
if (isIncremental)
new ScheduleStrategyIncremental(None, true)
else
new ScheduleStrategySourcing
}

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
val alreadyRanStatus = preRunTransformationCheck(infoDate, runReason, dependencyWarnings)
val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

alreadyRanStatus.status match {
case JobPreRunStatus.Ready => JobPreRunResult(Ready, Some(getDataDf(infoDate).count()), dependencyWarnings, alreadyRanStatus.warnings)
case JobPreRunStatus.Ready => JobPreRunResult(Ready, Some(getDataDf(infoDate, metastoreReader).count()), dependencyWarnings, alreadyRanStatus.warnings)
case _ => alreadyRanStatus
}
}
Expand All @@ -65,7 +72,10 @@ class SinkJob(operationDef: OperationDef,

minimumRecordsOpt.foreach(n => log.info(s"Minimum records to send: $n"))

val df = getDataDf(infoDate)
val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

val df = getDataDf(infoDate, metastoreReader)

val inputRecordCount = df.count()

Expand All @@ -86,7 +96,14 @@ class SinkJob(operationDef: OperationDef,
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
RunResult(getDataDf(infoDate))
val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

val result = RunResult(getDataDf(infoDate, metastoreReader))

metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()

result
}

def postProcessing(df: DataFrame,
Expand Down Expand Up @@ -122,17 +139,20 @@ class SinkJob(operationDef: OperationDef,
case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex)
}

val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch
val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

metastoreReader.asInstanceOf[MetastoreReaderCore].commitTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName")

try {
val sinkResult = sink.send(df,
sinkTable.metaTableName,
metastoreReader,
infoDate,
sinkTable.options
)

val jobFinished = Instant.now

val isTransient = outputTable.format.isTransient
Expand Down Expand Up @@ -163,10 +183,14 @@ class SinkJob(operationDef: OperationDef,
}
}

private def getDataDf(infoDate: LocalDate): DataFrame = {
private def getDataDf(infoDate: LocalDate, metastoreReader: MetastoreReader): DataFrame = {
try {
val (from, to) = getInfoDateRange(infoDate, sinkTable.rangeFromExpr, sinkTable.rangeToExpr)
metastore.getTable(sinkTable.metaTableName, Option(from), Option(to))
if (isIncremental) {
metastoreReader.getCurrentBatch(sinkTable.metaTableName)
} else {
val (from, to) = getInfoDateRange(infoDate, sinkTable.rangeFromExpr, sinkTable.rangeToExpr)
metastore.getTable(sinkTable.metaTableName, Option(from), Option(to))
}
} catch {
case NonFatal(ex) => throw new IllegalStateException(s"Unable to read input table ${sinkTable.metaTableName} for $infoDate.", ex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TransformationJob(operationDef: OperationDef,
outputTable: MetaTable,
transformerFactoryClass: String,
transformer: Transformer,
batchId: Long)
latestInfoDate: Option[LocalDate])
(implicit spark: SparkSession)
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) {

Expand All @@ -43,9 +43,9 @@ class TransformationJob(operationDef: OperationDef,
private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct

override val scheduleStrategy: ScheduleStrategy = {
if (isIncremental)
new ScheduleStrategyIncremental(None, true)
else
if (isIncremental) {
new ScheduleStrategyIncremental(latestInfoDate, true)
} else
new ScheduleStrategySourcing
}

Expand Down Expand Up @@ -86,18 +86,26 @@ class TransformationJob(operationDef: OperationDef,
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))

val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, readerMode)
val metastoreReaderPostProcess = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, readerMode)

try {
transformer.postProcess(
outputTable.name,
metastoreReader,
metastoreReaderPostProcess,
infoDate, operationDef.extraOptions
)
} catch {
case _: AbstractMethodError => log.warn(s"Transformers were built using old version of Pramen that does not support post processing. Ignoring...")
}

if (!outputTable.format.isTransient) {
val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch
val metastoreReaderRun = metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, readerMode)

metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitTable(outputTable.name, outputTable.name)
metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()
}

val jobFinished = Instant.now
val tooLongWarnings = getTookTooLongWarnings(jobStarted, jobFinished, None)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
}
}

override def commitTable(tableName: String, trackingName: String): Unit = {}

override def commitIncrementalStage(): Unit = {}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,8 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi
"python_class",
pramenPyConfigOpt,
processRunner,
databricksClientOpt
databricksClientOpt,
None
)

(job, processRunner, pramenPyConfigOpt, databricksClientOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class TransformationJobSuite extends AnyWordSpec with SparkTestBase {

val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1")

(new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer, 1), metastore)
(new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer, None), metastore)
}

}

0 comments on commit 2636b3d

Please sign in to comment.