Skip to content

Commit

Permalink
#496 Fix handling of retrospectively updated jobs.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 14, 2024
1 parent 31bc89f commit c44e6d5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.TimeUtils
import za.co.absa.pramen.core.utils.{Emoji, TimeUtils}

import java.time.{Instant, LocalDate}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -114,15 +114,40 @@ abstract class JobBase(operationDef: OperationDef,
}

protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = {
if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
} else {
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
case Some(chunk) =>
val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished)
if (outOfDateTables.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}")
val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}"
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning)))
} else {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
}
case None =>
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
}
}

private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = {
operationDef.dependencies
.filter(d => !d.isOptional && !d.isPassive)
.flatMap(_.tables)
.distinct
.filter { table =>
bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match {
case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds =>
log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .")
true
case _ =>
false
}
}
}

protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = {
val evaluator = new DateExprEvaluator
evaluator.setValue("infoDate", infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ object ScheduleStrategyUtils {
dependency.tables.foldLeft(false)((acc, table) => {
bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match {
case Some(dependencyUpdated) =>
val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished
val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished
if (isUpdatedRetrospectively) {
log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " +
s"Adding '$outputTable' to rerun for $infoDate.")
Expand Down

0 comments on commit c44e6d5

Please sign in to comment.