Skip to content

Commit

Permalink
#374 Add support for historical runs, and for re-committing uncommitt…
Browse files Browse the repository at this point in the history
…ed offsets.
  • Loading branch information
yruslan committed Sep 16, 2024
1 parent 5c311e4 commit d23cab2
Show file tree
Hide file tree
Showing 4 changed files with 509 additions and 47 deletions.
7 changes: 7 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ pramen {
# If this is set the current date will overridden by the specified value.
#current.date =

# Optionally, you can specify which dates to run historical pipeline for a date range:
#load.date.from = "2022-01-01"
#load.date.to = "2022-01-15"

# Specify one of run modes for historical run: fill_gaps, check_updates (default), force.
#runtime.run.mode = force

#spark.conf = {
# Pass arbitrary Spark Configuration when initializing Spark Session

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package za.co.absa.pramen.core.pipeline

import com.typesafe.config.Config
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.offset.OffsetValue
import org.apache.spark.sql.types.{IntegerType, LongType, ShortType, StringType}
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.offset.{DataOffset, OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
Expand All @@ -36,7 +36,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
metastore: Metastore,
bookkeeper: Bookkeeper,
notificationTargets: Seq[JobNotificationTarget],
latestOffset: Option[DataOffsetAggregated],
latestOffsetIn: Option[DataOffsetAggregated],
batchId: Long,
sourceName: String,
source: Source,
Expand All @@ -46,28 +46,76 @@ class IncrementalIngestionJob(operationDef: OperationDef,
(implicit spark: SparkSession)
extends IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, sourceTable, outputTable, specialCharacters, None, false) {

private var latestOffset = latestOffsetIn

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset, source.hasInfoDateColumn(sourceTable.query))

override def trackDays: Int = 0

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
val om = bookkeeper.getOffsetManager

val uncommittedOffsets = om.getOffsets(outputTable.name, infoDate).filter(_.committedAt.isEmpty)

if (uncommittedOffsets.nonEmpty) {
log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...")
handleUncommittedOffsets(om, metastore, infoDate, uncommittedOffsets)
}

val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query)
if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) {
super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings)
} else {
latestOffset match {
case Some(offset) =>
if (offset.maximumInfoDate.isAfter(infoDate) && !hasInfoDateColumn) {
JobPreRunResult(JobPreRunStatus.Skip("Retrospective runs are not allowed yet"), None, dependencyWarnings, Nil)
} else {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}
case None =>
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}
}

private def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, infoDate: LocalDate, uncommittedOffsets: Array[DataOffset]): Unit = {
val minOffset = uncommittedOffsets.map(_.minOffset).min

val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " +
s"query: '${sourceTable.query.query}''"))

val df = try {
mt.getTable(outputTable.name, Option(infoDate), Option(infoDate))
} catch {
case ex: AnalysisException =>
log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...", ex)

uncommittedOffsets.foreach { of =>
log.warn(s"Cleaning uncommitted offset: $of...")
om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt))
}

latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
return
}

if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.")
}

val newMaxOffset = if (df.isEmpty) {
minOffset
} else {
val row = df.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0)
OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String])
}

log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " +
s"min offset: ${minOffset.valueString}, max offset: ${newMaxOffset.valueString}.")

val req = om.startWriteOffsets(outputTable.name, infoDate, minOffset)
om.commitOffsets(req, newMaxOffset)

uncommittedOffsets.foreach { of =>
log.warn(s"Cleaning uncommitted offset: $of...")
om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt))
}

latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
if (source.getOffsetInfo(sourceTable.query).nonEmpty) {
Reason.Ready
Expand Down Expand Up @@ -116,6 +164,8 @@ class IncrementalIngestionJob(operationDef: OperationDef,
throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''")
)

validateOffsetColumn(df, offsetInfo)

val minimumOffset = latestOffset.map(_.maximumOffset).getOrElse(offsetInfo.minimalOffset)

val req = om.startWriteOffsets(outputTable.name, infoDate, minimumOffset)
Expand Down Expand Up @@ -171,4 +221,25 @@ class IncrementalIngestionJob(operationDef: OperationDef,

SaveResult(stats, warnings = tooLongWarnings)
}

private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = {
if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.")
}

val field = df.schema.fields.find(_.name.equalsIgnoreCase(offsetInfo.offsetColumn)).get

offsetInfo.minimalOffset match {
case v: OffsetValue.LongType =>
if (!field.dataType.isInstanceOf[ShortType] && !field.dataType.isInstanceOf[IntegerType] && !field.dataType.isInstanceOf[LongType]) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " +
s"But only integral types are supported for offset type '${v.dataTypeString}'.")
}
case v: OffsetValue.StringType =>
if (!field.dataType.isInstanceOf[StringType]) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " +
s"But only string type is supported for offset type '${v.dataTypeString}'.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pramen.sources.1 = [

option {
header = true
inferSchema = ${infer.schema}
}
}
]
Expand Down
Loading

0 comments on commit d23cab2

Please sign in to comment.