diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index ad14fa81f..14c547a2a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.utils.Emoji._ -import za.co.absa.pramen.core.utils.TimeUtils +import za.co.absa.pramen.core.utils.{Emoji, TimeUtils} import java.time.{Instant, LocalDate} import scala.util.{Failure, Success, Try} @@ -114,15 +114,40 @@ abstract class JobBase(operationDef: OperationDef, } protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = { - if (bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate).isDefined) { - log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") - Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) - } else { - log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") - None + bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match { + case Some(chunk) => + val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished) + if (outOfDateTables.nonEmpty) { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}") + val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}" + Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning))) + } else { + log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.") + Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String])) + } + case None => + log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.") + None } } + private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = { + operationDef.dependencies + .filter(d => !d.isOptional && !d.isPassive) + .flatMap(_.tables) + .distinct + .filter { table => + bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match { + case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds => + log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " + + s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .") + true + case _ => + false + } + } + } + protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = { val evaluator = new DateExprEvaluator evaluator.setValue("infoDate", infoDate) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index a62c300dd..070c7d7b5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -214,7 +214,7 @@ object ScheduleStrategyUtils { dependency.tables.foldLeft(false)((acc, table) => { bookkeeper.getLatestDataChunk(table, dateFrom, dateTo) match { case Some(dependencyUpdated) => - val isUpdatedRetrospectively = dependencyUpdated.jobFinished > lastUpdated.jobFinished + val isUpdatedRetrospectively = dependencyUpdated.jobFinished >= lastUpdated.jobFinished if (isUpdatedRetrospectively) { log.warn(s"Input table '$table' has updated retrospectively${renderPeriod(Option(dateFrom), Option(dateTo))}. " + s"Adding '$outputTable' to rerun for $infoDate.")