Skip to content

Commit

Permalink
#374 Refactor validation for the incremental ingestion job.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 9, 2024
1 parent d44a8db commit b3f91b1
Showing 1 changed file with 50 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,67 +55,26 @@ class IncrementalIngestionJob(operationDef: OperationDef,
override def trackDays: Int = 0

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
if (source.getOffsetInfo.isEmpty) {
return throw new IllegalArgumentException(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
}

val om = bookkeeper.getOffsetManager
latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)

val onlyForInfoDate = if (source.hasInfoDateColumn(sourceTable.query))
Some(infoDate)
else
None

val uncommittedOffsets = om.getUncommittedOffsets(outputTable.name, onlyForInfoDate)

if (uncommittedOffsets.nonEmpty) {
if (onlyForInfoDate.isEmpty) {
log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...")
} else {
log.warn(s"Found uncommitted offsets for ${outputTable.name}. Fixing...")
}

handleUncommittedOffsets(om, metastore, uncommittedOffsets)
}

JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
val om = bookkeeper.getOffsetManager
val hasInfoDate = source.hasInfoDateColumn(sourceTable.query)
val isReRun = runReason == TaskRunReason.Rerun

val onlyForInfoDate = if (hasInfoDate)
Some(infoDate)
else
None

if (source.getOffsetInfo.isEmpty) {
return Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
}

(hasInfoDate, isReRun) match {
case (false, false) =>
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
}
case (false, true) =>
val om = bookkeeper.getOffsetManager
validateUncommittedOffsets(onlyForInfoDate, om)

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
}
case (true, false) =>
Reason.Ready
case (true, true) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.")
Reason.Ready
}
validateReadiness(infoDate, om, hasInfoDate, isReRun)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
Expand Down Expand Up @@ -234,6 +193,48 @@ class IncrementalIngestionJob(operationDef: OperationDef,
SaveResult(stats, warnings = tooLongWarnings)
}

private[core] def validateReadiness(infoDate: LocalDate, om: OffsetManager, hasInfoDate: Boolean, isRerun: Boolean): Reason = {
(hasInfoDate, isRerun) match {
case (false, false) =>
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
}
case (false, true) =>
om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
}
case (true, false) =>
Reason.Ready
case (true, true) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.")
Reason.Ready
}
}

private[core] def validateUncommittedOffsets(onlyForInfoDate: Option[LocalDate], om: OffsetManager): Unit = {
val uncommittedOffsets = om.getUncommittedOffsets(outputTable.name, onlyForInfoDate)

if (uncommittedOffsets.nonEmpty) {
onlyForInfoDate match {
case Some(date) => log.warn(s"Found uncommitted offsets for ${outputTable.name} at $date. Fixing...")
case None => log.warn(s"Found uncommitted offsets for ${outputTable.name}. Fixing...")
}

handleUncommittedOffsets(om, metastore, uncommittedOffsets)
}

latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
}

private[core] def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, uncommittedOffsets: Array[UncommittedOffset]): Unit = {
import za.co.absa.pramen.core.utils.DateUtils._

Expand Down

0 comments on commit b3f91b1

Please sign in to comment.