Skip to content

Commit

Permalink
#520 Rename the commit flag to make it easier to understand.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 29, 2024
1 parent c381cf5 commit b700025
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
Expand All @@ -29,8 +28,6 @@ import scala.util.control.NonFatal
class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)

override def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset] = {
val offsets = getOffsetRecords(table, infoDate)

Expand Down Expand Up @@ -60,6 +57,7 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
}

override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
// ToDo Consider adding a caching layer for this
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ trait Metastore {

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader
def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader

def commitIncrementalTables(): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false)
}

override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = {
val metastore = this

new MetastoreReaderCore {
Expand All @@ -219,7 +219,7 @@ class MetastoreImpl(appConfig: Config,

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
if (isPostProcessing && isIncremental && !isRerun) {
if (isIncremental && !isRerun && isPostProcessing) {
metastore.getBatch(tableName, infoDate, None)
} else if (isIncremental && !isRerun) {
getIncremental(tableName, outputTable, infoDate)
Expand Down Expand Up @@ -284,7 +284,6 @@ class MetastoreImpl(appConfig: Config,
val trackingName = s"$tableName->$transformationOutputTable"
val tableDef = getTableDef(tableName)
val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType
val needsToCommit = !isPostProcessing && !incrementalDryRun
val om = bookkeeper.getOffsetManager
val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate))

Expand All @@ -303,7 +302,7 @@ class MetastoreImpl(appConfig: Config,
tableDf
}

if (needsToCommit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
if (commitChanges && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''")

val trackingTable = TrackingTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,25 +130,18 @@ class IncrementalIngestionJob(operationDef: OperationDef,
jobStarted: Instant,
inputRecordCount: Option[Long]): SaveResult = {
val isRerun = runReason == TaskRunReason.Rerun

val dfToSave = df.withColumn(outputTable.batchIdColumn, lit(batchId))

val om = bookkeeper.getOffsetManager

val offsetInfo = source.getOffsetInfo.getOrElse(
throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''")
throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}'")
)

validateOffsetColumn(df, offsetInfo)

val req = om.startWriteOffsets(outputTable.name, infoDate, offsetInfo.offsetType)

val stats = try {
val statsToReturn = if (isRerun) {
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Overwrite))
} else {
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
}
val saveMode = if (isRerun) SaveMode.Overwrite else SaveMode.Append
val statsToReturn = metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(saveMode))

val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw])
df
Expand Down Expand Up @@ -176,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, incrementalDryRun = false, isPostProcessing = true),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, commitChanges = false, isPostProcessing = true),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, incrementalDryRun = false, isPostProcessing = true),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, commitChanges = false, isPostProcessing = true),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class SinkJob(operationDef: OperationDef,
case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex)
}

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false)
val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false)

try {
val sinkResult = sink.send(df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ class TransformationJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false), infoDate, operationDef.extraOptions)
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = false), infoDate, operationDef.extraOptions)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false)
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false)
val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))

metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()
Expand All @@ -83,7 +83,7 @@ class TransformationJob(operationDef: OperationDef,
else
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))

val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = true)
val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = true)

try {
transformer.postProcess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)

val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate))

Expand All @@ -404,7 +404,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)

val ex = intercept[TableNotConfigured] {
reader.getTable("table1", Some(infoDate), Some(infoDate))
Expand All @@ -420,7 +420,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val runInfo1 = reader.getTableRunInfo("table1", infoDate)
val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1))

Expand All @@ -438,7 +438,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val metadataManager = reader.metadataManager

metadataManager.setMetadata("table1", infoDate, "key1", "value1")
Expand All @@ -456,7 +456,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF
m.saveTable("table1", infoDate, getDf)
m.saveTable("table1", infoDate.plusDays(1), getDf)

val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)

val date1 = reader.getLatestAvailableDate("table1")
val date2 = reader.getLatestAvailableDate("table1", Some(infoDate))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
stats
}

override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = {
val metastore = this

new MetastoreReaderCore {
Expand Down

0 comments on commit b700025

Please sign in to comment.