Skip to content

Commit

Permalink
#374 Simplify the incremental ingestion interface for data sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 537b85d commit 83b3c7a
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 111 deletions.
27 changes: 7 additions & 20 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,17 @@ trait Source extends ExternalChannel {
def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): SourceResult

/**
* Returns the incremental data greater than the specified offset.
*
* If an information date is provided and available at the source, the query will be limited to that date
*
* @param minOffset This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param infoDateOpt An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): SourceResult

/**
* Returns the incremental data between specified offsets.
* Returns the incremental data between specified offsets. The offset intervals could be half open,
* e.g. only offsetFrom or offsetFrom is specified.
*
* If an information date is provided and available at the source, the query will be limited to that date.
*
* This method is used for re-runs for a particular information day. For sources that have information date column
* the returned data will be for the full information date, even outside the specified offsets.
*
* @param minOffset This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param maxOffset This is an inclusive parameter the query will be SELECT ... WHERE offset_col <= max_offset
* @param infoDateOpt An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
* @param offsetFromOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): SourceResult
def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult

/**
* This method is called after the ingestion is finished. You can query the output table form the output information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ trait TableReader {

def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame

def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame

def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame
def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): DataFrame
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package za.co.absa.pramen.api.sql

import za.co.absa.pramen.api.offset.OffsetValue

import java.sql.Connection
import java.time.LocalDate

Expand All @@ -36,6 +38,11 @@ trait SqlGenerator {
*/
def getCountQuery(tableName: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String

/**
* Generates a query that returns the record count of a table for the given period when the table does have the information date field.
*/
def getCountQuery(tableName: String, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue]): String = ""

/**
* Generates a query that returns the record count of an SQL query that is already formed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ class IncrementalIngestionJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
val hasInfoDate = source.hasInfoDateColumn(sourceTable.query)
if (source.getOffsetInfo.nonEmpty) {
if (runReason == TaskRunReason.Rerun) {
val om = bookkeeper.getOffsetManager
if (hasInfoDate) {
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate'.")
Reason.Ready
} else {
val om = bookkeeper.getOffsetManager

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(a) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${a.minimumOffset.dataTypeString} < offset <= ${a.maximumOffset.dataTypeString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
log.info(s"Rerunning ingestion to '${outputTable.name}' at '$infoDate' for ${offsets.minimumOffset.valueString} < offsets <= ${offsets.maximumOffset.valueString}.")
Reason.Ready
case None =>
log.info(s"Offsets not found for '${outputTable.name}' at '$infoDate'.")
Reason.SkipOnce("No offsets registered")
}
}
} else {
latestOffset match {
Expand All @@ -105,25 +111,37 @@ class IncrementalIngestionJob(operationDef: OperationDef,
Seq.empty[String]
}

val sourceResult = latestOffset match {
case None =>
if (runReason == TaskRunReason.Rerun) {
val om = bookkeeper.getOffsetManager
val hasInfoDate = source.hasInfoDateColumn(sourceTable.query)

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
source.getIncrementalDataRange(sourceTable.query, offsets.minimumOffset, offsets.maximumOffset, Some(infoDate), columns)
case None =>
throw new IllegalStateException(s"No offsets for '${outputTable.name}' for '$infoDate'. Cannot rerun.")
}
} else {
source.getData(sourceTable.query, infoDate, infoDate, columns)
val sourceResult = if (hasInfoDate) {
if (runReason == TaskRunReason.Rerun) {
source.getData(sourceTable.query, infoDate, infoDate, columns)
} else {
latestOffset match {
case Some(maxOffset) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
case None =>
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
}
} else {
if (runReason == TaskRunReason.Rerun) {
val om = bookkeeper.getOffsetManager

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(offsets.minimumOffset), Option(offsets.maximumOffset), columns)
case None =>
throw new IllegalStateException(s"No offsets for '${outputTable.name}' for '$infoDate'. Cannot rerun.")
}
} else {
latestOffset match {
case Some(maxOffset) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
case None =>
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
case Some(maxOffset) =>
if (runReason == TaskRunReason.Rerun)
source.getIncrementalDataRange(sourceTable.query, maxOffset.minimumOffset, maxOffset.maximumOffset, Some(infoDate), columns)
else
source.getIncrementalData(sourceTable.query, maxOffset.maximumOffset, Some(infoDate), columns)
}
}

val sanitizedDf = sanitizeDfColumns(sourceResult.data, specialCharacters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = ???

private[core] def getJdbcConfig: TableReaderJdbcConfig = jdbcReaderConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = ???

private[core] def getSqlExpression(query: Query): String = {
query match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,32 +70,47 @@ class TableReaderSpark(formatOpt: Option[String],
}
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = {
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): DataFrame = {
val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}."))
val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))
infoDateOpt match {
case Some(infoDate) if hasInfoDateColumn =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${minOffset.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(offsetCol > minOffset.getSparkLit)
case _ =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString}")
getBaseDataFrame(query)
.filter(offsetCol > minOffset.getSparkLit)
}
}

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = {
val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}."))
infoDateOpt match {
onlyForInfoDate match {
case Some(infoDate) if hasInfoDateColumn =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'")
getData(query, infoDate, infoDate, columns)
(offsetFromOpt, offsetToOpt) match {
case (Some(offsetFrom), Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${offsetFrom.valueString} AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(offsetCol > offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(offsetCol <= offsetTo.getSparkLit)
case (None, None) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'")
getData(query, infoDate, infoDate, columns)
}
case _ =>
val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString} AND ${offsetInfo.offsetColumn} <= ${maxOffset.valueString}")
getBaseDataFrame(query)
.filter(offsetCol > minOffset.getSparkLit && offsetCol <= maxOffset.getSparkLit)
(offsetFromOpt, offsetToOpt) match {
case (Some(offsetFrom), Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${offsetFrom.valueString} AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getBaseDataFrame(query)
.filter(offsetCol > offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getBaseDataFrame(query)
.filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getBaseDataFrame(query)
.filter(offsetCol <= offsetTo.getSparkLit)
case (None, None) =>
log.info(s"Reading * FROM ${query.query}")
getBaseDataFrame(query)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ class JdbcSource(sourceConfig: Config,
sql.toLowerCase.startsWith("select") && !jdbcReaderConfig.useJdbcNative
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object JdbcSource extends ExternalChannelFactory[JdbcSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ class LocalSparkSource(sparkSource: SparkSource,
tempPath.toString
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object LocalSparkSource extends ExternalChannelFactoryV2[LocalSparkSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ class RawFileSource(val sourceConfig: Config,
}
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object RawFileSource extends ExternalChannelFactory[RawFileSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,10 @@ class SparkSource(val format: Option[String],
tableReader
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = {
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = {
val reader = getReader(query)

val df = reader.getIncrementalData(query, minOffset, infoDate, columns)

val filesRead = getFilesRead(query, df)

SourceResult(df, filesRead)
}

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = {
val reader = getReader(query)

val df = reader.getIncrementalDataRange(query, minOffset, maxOffset, infoDate, columns)
val df = reader.getIncrementalData(query, onlyForInfoDate, offsetFrom, offsetTo, columns)

val filesRead = getFilesRead(query, df)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ class SourceSpy(sourceConfig: Config = ConfigFactory.empty(),
SourceResult(df)
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object SourceSpy extends ExternalChannelFactory[SourceSpy] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,5 @@ class ReaderSpy(numRecords: Long,
df
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): DataFrame = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ class ReaderStub extends TableReader {

override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = null

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): DataFrame = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class SourceMock(
SourceResult(df, Nil, Nil)
}

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???

override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDate: Option[LocalDate], columns: Seq[String]): SourceResult = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object SourceMock extends ExternalChannelFactory[SourceMock] {
Expand Down
Loading

0 comments on commit 83b3c7a

Please sign in to comment.