Skip to content

Commit

Permalink
#520 Implement offset management for incremental processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 23, 2024
1 parent d90bd7d commit e5755e7
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
}

object SqlGeneratorBase {
val MAX_STRING_OFFSET_CHARACTERS = 64
val MAX_STRING_OFFSET_CHARACTERS = 128

val forbiddenCharacters = ";'\\"
val normalCharacters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ trait OffsetManager {
*/
def commitRerun(request: DataOffsetRequest, minOffset: OffsetValue, maxOffset: OffsetValue): Unit

/**
* Combines both startWriteOffsets() and commitOffsets() into one operation when it is applicable.
*/
def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit

/**
* Rolls back an offset request
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
).execute()
}

override def postCommittedRecord(table: String, infoDate: LocalDate, minOffset: OffsetValue, maxOffset: OffsetValue): Unit = {
val createdAt = Instant.now()

val record = OffsetRecord(table, infoDate.toString, minOffset.dataType.dataTypeString, minOffset.valueString, maxOffset.valueString, batchId, createdAt.toEpochMilli, Some(createdAt.toEpochMilli))

db.run(
OffsetRecords.records += record
).execute()
}

override def rollbackOffsets(request: DataOffsetRequest): Unit = {
db.run(
OffsetRecords.records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package za.co.absa.pramen.core.bookkeeper.model
import slick.jdbc.H2Profile.api._

class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
def pramenTableName = column[String]("table_name", O.Length(128))
def pramenTableName = column[String]("table_name", O.Length(200))
def infoDate = column[String]("info_date", O.Length(20))
def dataType = column[String]("data_type", O.Length(20))
def minOffset = column[String]("min_offset", O.Length(64))
def maxOffset = column[String]("max_offset", O.Length(64))

def minOffset = column[String]("min_offset", O.Length(128))

def maxOffset = column[String]("max_offset", O.Length(128))
def batchId = column[Long]("batch_id")
def createdAt = column[Long]("created_at")
def committedAt = column[Option[Long]]("committed_at")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ package za.co.absa.pramen.core.metastore

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.functions.{col, lit, max, min}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable}
import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper}
Expand Down Expand Up @@ -206,7 +206,7 @@ class MetastoreImpl(appConfig: Config,
val metastore = this

new MetastoreReaderCore {
private val incrementalInputTables = new ListBuffer[String]
private val trackingTables = new ListBuffer[TrackingTable]

override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
validateTable(tableName)
Expand All @@ -220,7 +220,6 @@ class MetastoreImpl(appConfig: Config,
if (isPostProcessing && isIncremental) {
metastore.getBatch(tableName, infoDate, None)
} else if (isIncremental && !isRerun && !isPostProcessing) {
incrementalInputTables += tableName
getIncremental(tableName, outputTable, infoDate)
} else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
Expand Down Expand Up @@ -269,7 +268,45 @@ class MetastoreImpl(appConfig: Config,
override def metadataManager: MetadataManager = metadata

override def commitIncremental(): Unit = {
// ToDo Replace this with proper offset management implementation
val om = if (trackingTables.nonEmpty) bookkeeper.getOffsetManager else null

trackingTables.foreach { trackingTable =>
log.info(s"Committing offsets for table '${trackingTable.trackingName}' for '$infoDate'")

val df = metastore.getTable(trackingTable.inputTable, Option(infoDate), Option(infoDate))

getMinMaxOffsetFromDf(df, trackingTable.batchIdColumn, trackingTable.currentMaxOffset) match {
case Some((minOffset, maxOffset)) =>
val offsetType = if (df.schema.fields.find(_.name == trackingTable.batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
val req = om.startWriteOffsets(trackingTable.trackingName, infoDate, offsetType)
om.commitOffsets(req, minOffset, maxOffset)
log.info(s"Commited offsets for table '${trackingTable.trackingName}' for '$infoDate' with min='${minOffset.valueString}', max='${maxOffset.valueString}'.")
case None =>
log.info(s"No new data processed that requires offsets update of table '${trackingTable.trackingName}' for '$infoDate'.")
}
}
}

private def getMinMaxOffsetFromDf(dfIn: DataFrame, batchIdColumn: String, currentMax: Option[OffsetValue]): Option[(OffsetValue, OffsetValue)] = {
val df = currentMax match {
case Some(currentMax) =>
dfIn.filter(col(batchIdColumn) > currentMax.getSparkLit)
case None =>
dfIn
}

if (df.isEmpty) {
None
} else {
val offsetType = if (df.schema.fields.find(_.name == batchIdColumn).get.dataType == StringType) OffsetType.StringType else OffsetType.IntegralType
val row = df.agg(min(offsetType.getSparkCol(col(batchIdColumn)).cast(StringType)),
max(offsetType.getSparkCol(col(batchIdColumn))).cast(StringType))
.collect()(0)

val minValue = OffsetValue.fromString(offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))
Some(minValue, maxValue)
}
}

private def validateTable(tableName: String): Unit = {
Expand All @@ -279,12 +316,46 @@ class MetastoreImpl(appConfig: Config,
}

private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = {
// Don't forget to use incrementalDryRun to decide if we need to commit
val trackingName = s"$tableName->$transformationOutputTable"
val tableDef = getTableDef(tableName)
val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType
val needsToCommit = !isPostProcessing && !incrementalDryRun
val om = bookkeeper.getOffsetManager
val tableDf = metastore.getTable(tableName, Option(infoDate), Option(infoDate))

if (!tableDf.schema.exists(_.name == tableDef.batchIdColumn)) {
throw new IllegalArgumentException(s"Table '$tableName' does not contain column '${tableDef.batchIdColumn}' needed for incremental processing.")
}

// ToDo Handle uncommitted offsets

val offsets = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate))

val df = offsets match {
case Some(values) =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate', column '${tableDef.batchIdColumn}' > ${values.maximumOffset.valueString}")
tableDf.filter(col(tableDef.batchIdColumn) > values.maximumOffset.getSparkLit)
case None =>
log.info(s"Getting incremental table '$trackingName' for '$infoDate''")
tableDf
}

if (needsToCommit && !trackingTables.exists(t => t.trackingName == trackingName && t.infoDate == infoDate)) {
log.info(s"Starting offset commit for table '$trackingName' for '$infoDate''")

val trackingTable = TrackingTable(
tableName,
outputTable,
trackingName,
tableDef.batchIdColumn,
offsets.map(_.maximumOffset),
infoDate
)

trackingTables += trackingTable
}

// ToDo Replace this with proper offset management implementation
metastore.getBatch(tableName, infoDate, None)
df
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package za.co.absa.pramen.core.metastore.model

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetRequest
import za.co.absa.pramen.api.offset.OffsetValue

import java.time.LocalDate

case class TrackingTable(
inputTable: String,
outputTable: String,
commitRequest: DataOffsetRequest,
data: DataFrame
trackingName: String,
batchIdColumn: String,
currentMaxOffset: Option[OffsetValue],
infoDate: LocalDate
)
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class OperationSplitter(conf: Config,
val notificationTargets = operationDef.notificationTargets
.map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf))

Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer))
Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer, batchId))
}

def createPythonTransformation(operationDef: OperationDef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class SinkJob(operationDef: OperationDef,
case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex)
}

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false)
val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false)

try {
val sinkResult = sink.send(df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class TransformationJob(operationDef: OperationDef,
notificationTargets: Seq[JobNotificationTarget],
outputTable: MetaTable,
transformerFactoryClass: String,
transformer: Transformer)
transformer: Transformer,
batchId: Long)
(implicit spark: SparkSession)
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) {

Expand All @@ -53,11 +54,12 @@ class TransformationJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false), infoDate, operationDef.extraOptions)
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false), infoDate, operationDef.extraOptions)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false)
val isTransitive = outputTable.format.isTransient
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = !isTransitive)
val runResult = try {
RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.pramen.core.runner.task

import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.status._
Expand Down Expand Up @@ -219,8 +219,6 @@ abstract class TaskRunnerBase(conf: Config,
* @return an instance of TaskResult on the check failure or optional record count on success.
*/
private[core] def preRunCheck(task: Task, started: Instant): Either[TaskResult, JobPreRunResult] = {
val jobName = task.job.name
val outputTable = MetaTable.getMetaTableDef(task.job.outputTable)
val outputTableName = task.job.outputTable.name
val options = task.job.operation.extraOptions
val isTransient = task.job.outputTable.format.isTransient
Expand Down Expand Up @@ -358,7 +356,15 @@ abstract class TaskRunnerBase(conf: Config,
dfWithTimestamp.withColumn(task.job.outputTable.infoDateColumn, lit(Date.valueOf(task.infoDate)))
}

val postProcessed = task.job.postProcessing(dfWithInfoDate, task.infoDate, conf)
val batchIdColumn = task.job.outputTable.batchIdColumn

val dfWithBatchIdColumn = if (dfWithInfoDate.schema.exists(f => f.name == batchIdColumn)) {
dfWithInfoDate
} else {
dfWithInfoDate.withColumn(batchIdColumn, lit(pipelineState.getBatchId))
}

val postProcessed = task.job.postProcessing(dfWithBatchIdColumn, task.infoDate, conf)

val dfTransformed = applyFilters(
applyTransformations(postProcessed, task.job.operation.schemaTransformations),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils}

Expand Down Expand Up @@ -93,6 +93,8 @@ class RawFileSource(val sourceConfig: Config,

override val config: Config = sourceConfig

override def getOffsetInfo: Option[OffsetInfo] = Some(OffsetInfo(RAW_OFFSET_FIELD_KEY, OffsetType.StringType))

override def hasInfoDateColumn(query: Query): Boolean = {
query match {
case Query.Path(pathPattern) => pathPattern.contains("{{")
Expand Down Expand Up @@ -121,7 +123,6 @@ class RawFileSource(val sourceConfig: Config,

override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = {
val filePaths = getPaths(query, onlyForInfoDate.get, onlyForInfoDate.get)
val fileNames = filePaths.map(_.getPath.getName).sorted
val list = filePaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}.filter {
Expand All @@ -136,7 +137,7 @@ class RawFileSource(val sourceConfig: Config,

val df = listOfFilesToDataFrame(list)

SourceResult(df, fileNames)
SourceResult(df, list.map(_._2).sorted)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class TransformationJobSuite extends AnyWordSpec with SparkTestBase {

val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1")

(new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer), metastore)
(new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer, 1), metastore)
}

}

0 comments on commit e5755e7

Please sign in to comment.