Skip to content

Commit

Permalink
#374 Fix the logic of incremental ingestion when information date is …
Browse files Browse the repository at this point in the history
…present.
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 8b4bfb1 commit 8b3ed71
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
import za.co.absa.pramen.core.utils.SparkUtils._

import java.time.{Instant, LocalDate}
import java.time.{Instant, LocalDate, ZoneId}

class IncrementalIngestionJob(operationDef: OperationDef,
metastore: Metastore,
Expand Down Expand Up @@ -91,12 +91,16 @@ class IncrementalIngestionJob(operationDef: OperationDef,
}
}
} else {
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
if (hasInfoDate) {
Reason.Ready
} else {
latestOffset match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) =>
log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.")
Reason.Skip("Incremental ingestion cannot be retrospective")
case _ =>
Reason.Ready
}
}
}
} else {
Expand All @@ -117,10 +121,14 @@ class IncrementalIngestionJob(operationDef: OperationDef,
if (runReason == TaskRunReason.Rerun) {
source.getData(sourceTable.query, infoDate, infoDate, columns)
} else {
latestOffset match {
val om = bookkeeper.getOffsetManager
val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate))
infoDateLatestOffset match {
case Some(maxOffset) =>
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.")
source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
case None =>
log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.")
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ class SqlGeneratorMicrosoft(sqlConfig: SqlConfig) extends SqlGenerator {
val offsetWhere = getOffsetWhereClause(sqlConfig.offsetInfo.get, offsetFromOpt, offsetToOpt)

if (offsetWhere.nonEmpty) {
s"$dataQuery AND $offsetWhere"
if (onlyForInfoDate.isEmpty) {
s"$dataQuery WHERE $offsetWhere"
} else {
s"$dataQuery AND $offsetWhere"
}
} else {
dataQuery
}
Expand Down

0 comments on commit 8b3ed71

Please sign in to comment.