Skip to content

Commit

Permalink
#374 Add initial support for incremental transformers.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 12, 2024
1 parent 0328aaa commit 608884c
Show file tree
Hide file tree
Showing 22 changed files with 126 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ package za.co.absa.pramen.core.metastore

case class MetaTableStats(
recordCount: Long,
recordCountAppended: Option[Long],
dataSizeBytes: Option[Long]
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package za.co.absa.pramen.core.metastore

import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode}
import za.co.absa.pramen.api._
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.utils.hive.HiveHelper
Expand Down Expand Up @@ -53,5 +53,5 @@ trait Metastore {

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], infoDate: LocalDate): MetastoreReader
def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class MetastoreImpl(appConfig: Config,
val isTransient = mt.format.isTransient
val start = Instant.now.getEpochSecond

var stats = MetaTableStats(0, None)
var stats = MetaTableStats(0, None, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
Expand Down Expand Up @@ -194,7 +194,7 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false)
}

override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, isIncremental: Boolean): MetastoreReader = {
val metastore = this

new MetastoreReader {
Expand All @@ -207,7 +207,10 @@ class MetastoreImpl(appConfig: Config,

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
metastore.getCurrentBatch(tableName, infoDate)
if (isIncremental)
metastore.getCurrentBatch(tableName, infoDate)
else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}

override def getLatest(tableName: String, until: Option[LocalDate] = None): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class MetastorePersistenceDelta(query: Query,
None
}

MetaTableStats(recordCount, sizeOpt)
MetaTableStats(recordCount, None, sizeOpt)
}

override def createOrUpdateHiveTable(infoDate: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ class MetastorePersistenceParquet(path: String,

fsUtils.createDirectoryRecursive(new Path(path))

val dfIn = if (df.schema.exists(_.name.equalsIgnoreCase(infoDateColumn))) {
df.drop(infoDateColumn)
} else {
df
}

val saveMode = saveModeOpt.getOrElse(SaveMode.Overwrite)

val isAppend = saveMode match {
Expand All @@ -83,18 +77,34 @@ class MetastorePersistenceParquet(path: String,
false
}

val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
val dfIn = if (df.schema.exists(_.name.equalsIgnoreCase(infoDateColumn))) {
df.drop(infoDateColumn)
} else {
df
}

val dfRepartitioned = applyRepartitioning(dfIn, recordCount)
val dfRepartitioned = if (recordsPerPartition.nonEmpty) {
val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
}

applyRepartitioning(dfIn, recordCount)
} else {
dfIn
}

writeAndCleanOnFailure(dfRepartitioned, outputDirStr, fsUtils, saveMode)

val stats = getStats(infoDate, isAppend)

log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records (${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
if (isAppend) {
log.info(s"$SUCCESS Successfully saved ${stats.recordCountAppended.get} records (new count: ${stats.recordCount}, " +
s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
} else {
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir")
}

stats
}
Expand All @@ -106,15 +116,18 @@ class MetastorePersistenceParquet(path: String,

val df = spark.read.parquet(outputDirStr)

val actualCount = if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) {
df.filter(col(batchIdColumn) === batchId).count()
} else {
df.count()
}

val size = fsUtils.getDirectorySize(outputDirStr)

MetaTableStats(actualCount, Option(size))
if (onlyForCurrentBatchId) {
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

MetaTableStats(countAll, Option(batchCount), Option(size))
} else {
val countAll = df.count()

MetaTableStats(countAll, None, Option(size))
}
}

override def createOrUpdateHiveTable(infoDate: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class MetastorePersistenceRaw(path: String,

MetaTableStats(
totalSize,
None,
Some(totalSize)
)
}
Expand All @@ -108,6 +109,7 @@ class MetastorePersistenceRaw(path: String,

MetaTableStats(
files.length,
None,
Some(totalSize)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class MetastorePersistenceTransientEager(tempPathOpt: Option[String],

MetaTableStats(
recordCount,
None,
sizeBytesOpt
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
(implicit spark: SparkSession)
extends IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, sourceTable, outputTable, specialCharacters, None, false) {

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset)
override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset, source.hasInfoDateColumn(sourceTable.query))

override def trackDays: Int = 0

Expand Down Expand Up @@ -140,7 +140,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate),
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = true),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class IngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate),
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, isIncremental = false),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.schedule.Schedule
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.TimeUtils

Expand Down Expand Up @@ -52,6 +53,8 @@ abstract class JobBase(operationDef: OperationDef,

def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult

def isIncremental: Boolean = operationDef.schedule == Schedule.Incremental

final override def preRunCheck(infoDate: LocalDate,
runReason: TaskRunReason,
conf: Config): JobPreRunResult = {
Expand Down Expand Up @@ -114,7 +117,7 @@ abstract class JobBase(operationDef: OperationDef,
}

protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = {
if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) {
if (!isIncremental && bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class SinkJob(operationDef: OperationDef,
try {
val sinkResult = sink.send(df,
sinkTable.metaTableName,
metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate),
metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, isIncremental),
infoDate,
sinkTable.options
)
Expand All @@ -142,7 +142,7 @@ class SinkJob(operationDef: OperationDef,
isTransient
)

val stats = MetaTableStats(sinkResult.recordsSent, None)
val stats = MetaTableStats(sinkResult.recordsSent, None, None)
SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings)
} catch {
case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
package za.co.absa.pramen.core.pipeline

import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Transformer}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing}
import za.co.absa.pramen.core.schedule.Schedule

import java.time.{Instant, LocalDate}

Expand All @@ -40,8 +39,8 @@ class TransformationJob(operationDef: OperationDef,
private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct

override val scheduleStrategy: ScheduleStrategy = {
if (operationDef.schedule == Schedule.Incremental)
new ScheduleStrategyIncremental(None)
if (isIncremental)
new ScheduleStrategyIncremental(None, true)
else
new ScheduleStrategySourcing
}
Expand All @@ -51,11 +50,11 @@ class TransformationJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
transformer.validate(metastore.getMetastoreReader(inputTables, infoDate), infoDate, operationDef.extraOptions)
transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, isIncremental), infoDate, operationDef.extraOptions)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate), infoDate, operationDef.extraOptions))
RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, isIncremental), infoDate, operationDef.extraOptions))
}

def postProcessing(df: DataFrame,
Expand All @@ -70,12 +69,15 @@ class TransformationJob(operationDef: OperationDef,
conf: Config,
jobStarted: Instant,
inputRecordCount: Option[Long]): SaveResult = {
val saveResults = SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))
val saveResults = if (isIncremental)
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None, saveModeOverride = Some(SaveMode.Append)))
else
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))

try {
transformer.postProcess(
outputTable.name,
metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate),
metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, isIncremental),
infoDate, operationDef.extraOptions
)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package za.co.absa.pramen.core.runner.splitter
import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.TaskPreDef
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._
import za.co.absa.pramen.core.schedule.Schedule

import java.time.LocalDate

class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated]) extends ScheduleStrategy {
class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], hasInfoDateColumn: Boolean) extends ScheduleStrategy {
private val log = org.slf4j.LoggerFactory.getLogger(this.getClass)

override def getDaysToRun(
Expand All @@ -50,11 +49,13 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated]) ext
Seq.empty
} else {
Seq(infoDate)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.New))
.map(d => TaskPreDef(d, TaskRunReason.New))
}
case None =>
Seq(infoDate)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.New))
if (hasInfoDateColumn)
Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
else
Seq(TaskPreDef(infoDate, TaskRunReason.New))
}

log.info(s"Days to run: ${runInfoDays.map(_.infoDate).mkString(", ")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ abstract class TaskRunnerBase(conf: Config,

val saveResult = if (runtimeConfig.isDryRun) {
log.warn(s"$WARNING DRY RUN mode, no actual writes to ${task.job.outputTable.name} for ${task.infoDate} will be performed.")
SaveResult(MetaTableStats(dfTransformed.count(), None))
SaveResult(MetaTableStats(dfTransformed.count(), None, None))
} else {
task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class IdentityTransformer extends Transformer {
options: Map[String, String]): DataFrame = {
val tableName = options.getOrElse(INPUT_TABLE_KEY, options(INPUT_TABLE_LEGACY_KEY))

metastore.getTable(tableName, Option(infoDate), Option(infoDate))
metastore.getCurrentBatch(tableName)
}
}

Expand Down
Loading

0 comments on commit 608884c

Please sign in to comment.