Skip to content

Commit

Permalink
#520 Improve performance of committing of transformers and sinks outp…
Browse files Browse the repository at this point in the history
…ut tables.
  • Loading branch information
yruslan committed Dec 2, 2024
1 parent 2636b3d commit b9d4aba
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,25 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {

override def postCommittedRecords(commitRequests: Seq[OffsetCommitRequest]): Unit = {
val committedAt = Instant.now()
val committedAtMilli = committedAt.toEpochMilli

val records = commitRequests.map { req =>
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAt.toEpochMilli))
OffsetRecord(req.table, req.infoDate.toString, req.minOffset.dataType.dataTypeString, req.minOffset.valueString, req.maxOffset.valueString, batchId, req.createdAt.toEpochMilli, Some(committedAtMilli))
}

db.run(
OffsetRecords.records ++= records
).execute()

commitRequests.map(r => (r.table, r.infoDate))
.distinct
.foreach { case (table, infoDate) =>
db.run(
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDate.toString && r.committedAt =!= committedAtMilli)
.delete
).execute()
}
}

override def rollbackOffsets(request: DataOffsetRequest): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,15 @@ class MetastoreImpl(appConfig: Config,
override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) {
log.info(s"Getting the current batch for table '$tableName' at '$infoDate'...")
metastore.getBatch(tableName, infoDate, None)
} else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) {
log.info(s"Getting the current incremental chunk for table '$tableName' at '$infoDate'...")
getIncremental(tableName, infoDate)
} else
} else {
log.info(s"Getting daily data for table '$tableName' at '$infoDate'...")
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}
}

override def getLatest(tableName: String, until: Option[LocalDate] = None): DataFrame = {
Expand Down Expand Up @@ -269,9 +273,25 @@ class MetastoreImpl(appConfig: Config,

override def metadataManager: MetadataManager = metadata

override def commitTable(tableName: String, trackingName: String): Unit = {
override def commitOutputTable(tableName: String, trackingName: String): Unit = {
if (readMode != ReaderMode.Batch) {
getIncrementalDf(tableName, trackingName, infoDate, commit = true)
val om = bookkeeper.getOffsetManager
val minMax = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate))
val batchIdValue = OffsetValue.IntegralValue(batchId)
log.info(s"Starting offset commit for output table '$trackingName' for '$infoDate'.")
val trackingTable = TrackingTable(
Thread.currentThread().getId,
tableName,
outputTable,
trackingName,
"",
minMax.map(_.minimumOffset),
minMax.map(_.maximumOffset),
infoDate,
Instant.now()
)

trackingTables += trackingTable
}
}

Expand Down Expand Up @@ -318,14 +338,15 @@ class MetastoreImpl(appConfig: Config,
}

if (commit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''")
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate'")

val trackingTable = TrackingTable(
Thread.currentThread().getId,
tableName,
outputTable,
trackingName,
tableDef.batchIdColumn,
offsets.map(_.minimumOffset),
offsets.map(_.maximumOffset),
infoDate,
Instant.now()
Expand Down Expand Up @@ -366,12 +387,16 @@ class MetastoreImpl(appConfig: Config,
}

private[core] def commitIncremental(trackingTables: Seq[TrackingTable]): Unit = {
val commitRequests = trackingTables.flatMap { trackingTable =>
val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate))
if (trackingTables.isEmpty)
return

if (df.isEmpty) {
None
} else {
val om = bookkeeper.getOffsetManager
val batchIdValue = OffsetValue.IntegralValue(batchId)

val commitRequests = trackingTables.flatMap { trackingTable =>
val tableDef = getTableDef(trackingTable.inputTable)
if (tableDef.format.isInstanceOf[DataFormat.Raw]) {
val df = getTable(trackingTable.inputTable, Option(trackingTable.infoDate), Option(trackingTable.infoDate))
getMinMaxOffsetFromMetastoreDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
Expand All @@ -386,11 +411,20 @@ class MetastoreImpl(appConfig: Config,
log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '${trackingTable.infoDate}'.")
None
}
} else {
val minOffset = trackingTable.currentMinOffset.getOrElse(batchIdValue)
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '${trackingTable.infoDate}' with min='${minOffset.valueString}', max='$batchId'.")
Some(OffsetCommitRequest(
trackingTable.trackingName,
trackingTable.infoDate,
minOffset,
batchIdValue,
trackingTable.createdAt
))
}
}

if (commitRequests.nonEmpty) {
val om = bookkeeper.getOffsetManager
om.postCommittedRecords(commitRequests)
log.info(s"Committed ${commitRequests.length} requests.'")
}
Expand Down Expand Up @@ -451,14 +485,7 @@ object MetastoreImpl {
}
}

private[core] def getMinMaxOffsetFromMetastoreDf(dfIn: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = {
val df = currentMax match {
case Some(currentMax) =>
dfIn.filter(col(batchIdColumn) > currentMax.getSparkLit)
case None =>
dfIn
}

private[core] def getMinMaxOffsetFromMetastoreDf(df: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = {
val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
OffsetManagerUtils.getMinMaxValueFromData(df, batchIdColumn, offsetType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.metastore
import za.co.absa.pramen.api.MetastoreReader

trait MetastoreReaderCore extends MetastoreReader {
def commitTable(tableName: String, trackingName: String): Unit
def commitOutputTable(tableName: String, trackingName: String): Unit

def commitIncrementalStage(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ case class TrackingTable(
outputTable: String,
trackingName: String,
batchIdColumn: String,
currentMinOffset: Option[OffsetValue],
currentMaxOffset: Option[OffsetValue],
infoDate: LocalDate,
createdAt: Instant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ class SinkJob(operationDef: OperationDef,

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

metastoreReader.asInstanceOf[MetastoreReaderCore].commitTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName")

try {
val sinkResult = sink.send(df,
sinkTable.metaTableName,
Expand All @@ -153,10 +151,14 @@ class SinkJob(operationDef: OperationDef,
sinkTable.options
)

val jobFinished = Instant.now

val isTransient = outputTable.format.isTransient

if (!isTransient) {
metastoreReader.asInstanceOf[MetastoreReaderCore].commitOutputTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName")
}

val jobFinished = Instant.now

val tooLongWarnings = getTookTooLongWarnings(jobStarted, jobFinished, sinkTable.warnMaxExecutionTimeSeconds)

bookkeeper.setRecordCount(outputTable.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TransformationJob(operationDef: OperationDef,
val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch
val metastoreReaderRun = metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, readerMode)

metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitTable(outputTable.name, outputTable.name)
metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitOutputTable(outputTable.name, outputTable.name)
metastoreReaderRun.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
}
}

override def commitTable(tableName: String, trackingName: String): Unit = {}
override def commitOutputTable(tableName: String, trackingName: String): Unit = {}

override def commitIncrementalStage(): Unit = {}
}
Expand Down

0 comments on commit b9d4aba

Please sign in to comment.