Skip to content

Commit

Permalink
Fix reruns so that they are not reported as running based on retrospe…
Browse files Browse the repository at this point in the history
…ctive updates.
  • Loading branch information
yruslan committed Oct 31, 2024
1 parent bf58920 commit 3623a87
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,14 @@ abstract class JobBase(operationDef: OperationDef,
}
}

protected def preRunTransformationCheck(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
case Some(result) => result
case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
protected def preRunTransformationCheck(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning], runReason: TaskRunReason): JobPreRunResult = {
if (runReason == TaskRunReason.Rerun) {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
} else {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
case Some(result) => result
case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SinkJob(operationDef: OperationDef,
override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
val alreadyRanStatus = preRunTransformationCheck(infoDate, dependencyWarnings)
val alreadyRanStatus = preRunTransformationCheck(infoDate, dependencyWarnings, runReason)

alreadyRanStatus.status match {
case JobPreRunStatus.Ready => JobPreRunResult(Ready, Some(getDataDf(infoDate).count()), dependencyWarnings, alreadyRanStatus.warnings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TransformationJob(operationDef: OperationDef,
override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
preRunTransformationCheck(infoDate, dependencyWarnings)
preRunTransformationCheck(infoDate, dependencyWarnings, runReason)
}

override def validate(infoDate: LocalDate, jobConfig: Config): Reason = {
Expand Down

0 comments on commit 3623a87

Please sign in to comment.