diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index bebd17e9..3510de6d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -55,67 +55,26 @@ class IncrementalIngestionJob(operationDef: OperationDef, override def trackDays: Int = 0 override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { - if (source.getOffsetInfo.isEmpty) { - return throw new IllegalArgumentException(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'") - } - - val om = bookkeeper.getOffsetManager - latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) - - val onlyForInfoDate = if (source.hasInfoDateColumn(sourceTable.query)) - Some(infoDate) - else - None - - val uncommittedOffsets = om.getUncommittedOffsets(outputTable.name, onlyForInfoDate) - - if (uncommittedOffsets.nonEmpty) { - if (onlyForInfoDate.isEmpty) { - log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...") - } else { - log.warn(s"Found uncommitted offsets for ${outputTable.name}. Fixing...") - } - - handleUncommittedOffsets(om, metastore, uncommittedOffsets) - } - JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { + val om = bookkeeper.getOffsetManager val hasInfoDate = source.hasInfoDateColumn(sourceTable.query) val isReRun = runReason == TaskRunReason.Rerun + val onlyForInfoDate = if (hasInfoDate) + Some(infoDate) + else + None + if (source.getOffsetInfo.isEmpty) { return Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'") } - (hasInfoDate, isReRun) match { - case (false, false) => - latestOffset match { - case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => - log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") - Reason.Skip("Incremental ingestion cannot be retrospective") - case _ => - Reason.Ready - } - case (false, true) => - val om = bookkeeper.getOffsetManager + validateUncommittedOffsets(onlyForInfoDate, om) - om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match { - case Some(offsets) => - log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.") - Reason.Ready - case None => - log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.") - Reason.SkipOnce("No offsets registered") - } - case (true, false) => - Reason.Ready - case (true, true) => - log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.") - Reason.Ready - } + validateReadiness(infoDate, om, hasInfoDate, isReRun) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { @@ -234,6 +193,48 @@ class IncrementalIngestionJob(operationDef: OperationDef, SaveResult(stats, warnings = tooLongWarnings) } + private[core] def validateReadiness(infoDate: LocalDate, om: OffsetManager, hasInfoDate: Boolean, isRerun: Boolean): Reason = { + (hasInfoDate, isRerun) match { + case (false, false) => + latestOffset match { + case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => + log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") + Reason.Skip("Incremental ingestion cannot be retrospective") + case _ => + Reason.Ready + } + case (false, true) => + om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match { + case Some(offsets) => + log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.") + Reason.Ready + case None => + log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.") + Reason.SkipOnce("No offsets registered") + } + case (true, false) => + Reason.Ready + case (true, true) => + log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.") + Reason.Ready + } + } + + private[core] def validateUncommittedOffsets(onlyForInfoDate: Option[LocalDate], om: OffsetManager): Unit = { + val uncommittedOffsets = om.getUncommittedOffsets(outputTable.name, onlyForInfoDate) + + if (uncommittedOffsets.nonEmpty) { + onlyForInfoDate match { + case Some(date) => log.warn(s"Found uncommitted offsets for ${outputTable.name} at $date. Fixing...") + case None => log.warn(s"Found uncommitted offsets for ${outputTable.name}. Fixing...") + } + + handleUncommittedOffsets(om, metastore, uncommittedOffsets) + } + + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + } + private[core] def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, uncommittedOffsets: Array[UncommittedOffset]): Unit = { import za.co.absa.pramen.core.utils.DateUtils._