Skip to content

Commit

Permalink
#374 Fix new incremental ingestion and integration tests to match inc…
Browse files Browse the repository at this point in the history
…lusive intervals.
  • Loading branch information
yruslan committed Oct 4, 2024
1 parent 7bce016 commit 99b3689
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 18 deletions.
11 changes: 10 additions & 1 deletion pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,17 @@ trait Source extends ExternalChannel {
*
* If an information date is provided and available at the source, the query will be limited to that date.
*
* <ul>
* <li> When both `offsetFrom` from and `offsetTo` are passed the source should return offsets using an inclusive interval
* (offsetFrom <= offset <= offsetTo) </li>
* <li> When only `offsetFrom` is present the source should return offsets using an exclusive interval interval
* (offset > offsetFrom)</li>
* <li> When only `offsetTo` is present the source should return offsets using an inclusive interval
* (offset <= offsetTo)</li>
*</ul>
*
* @param offsetFromOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col > min_offset
* @param offsetToOpt This is an exclusive parameter the query will be SELECT ... WHERE offset_col <= min_offset
* @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ trait SqlGenerator {

/**
* Generates a query for incremental ingestion, the result can be restricted by an information column, if present, but also by offset range.
*
* <ul>
* <li> When both `offsetFrom` from and `offsetTo` are passed the generator should return offsets using an inclusive interval
* (offsetFrom <= offset <= offsetTo) </li>
* <li> When only `offsetFrom` is present the generator should return offsets using an exclusive interval interval
* (offset > offsetFrom)</li>
* <li> When only `offsetTo` is present the generator should return offsets using an inclusive interval
* (offset <= offsetTo)</li>
*</ul>
*/
def getDataQueryIncremental(tableName: String,
onlyForInfoDate: Option[LocalDate],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)} AND ${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}"
case (Some(offsetFrom), None) =>
validateOffsetValue(offsetFrom)
s"${getOffsetWhereCondition(offsetColumn, ">=", offsetFrom)}"
s"${getOffsetWhereCondition(offsetColumn, ">", offsetFrom)}"
case (None, Some(offsetTo)) =>
validateOffsetValue(offsetTo)
s"${getOffsetWhereCondition(offsetColumn, "<=", offsetTo)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ class TableReaderSpark(formatOpt: Option[String],
getData(query, infoDate, infoDate, columns)
.filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}")
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(offsetCol >= offsetFrom.getSparkLit)
.filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getData(query, infoDate, infoDate, columns)
Expand All @@ -100,9 +100,9 @@ class TableReaderSpark(formatOpt: Option[String],
getBaseDataFrame(query)
.filter(offsetCol >= offsetFrom.getSparkLit && offsetCol <= offsetTo.getSparkLit)
case (Some(offsetFrom), None) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} >= ${offsetFrom.valueString}")
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${offsetFrom.valueString}")
getBaseDataFrame(query)
.filter(offsetCol >= offsetFrom.getSparkLit)
.filter(offsetCol > offsetFrom.getSparkLit)
case (None, Some(offsetTo)) =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} <= ${offsetTo.valueString}")
getBaseDataFrame(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

assert(offsets.length == 1)

assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
}
Expand Down Expand Up @@ -674,7 +674,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

assert(offsets.length == 1)

assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
}
Expand Down Expand Up @@ -736,10 +736,10 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

assert(offsets.length == 2)

assert(offsets.head.minOffset.get.valueString.toLong == Long.MinValue)
assert(offsets.head.minOffset.get.valueString.toLong == 1)
assert(offsets.head.maxOffset.get.valueString.toLong == 3)
assert(offsets.head.committedAt.nonEmpty)
assert(offsets(1).minOffset.get.valueString.toLong == 3)
assert(offsets(1).minOffset.get.valueString.toLong == 4)
assert(offsets(1).maxOffset.get.valueString.toLong == 6)
assert(offsets(1).committedAt.nonEmpty)
}
Expand Down Expand Up @@ -1043,20 +1043,19 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val om = new OffsetManagerJdbc(pramenDb.db, 123L)

val offsets1 = om.getOffsets("table1", infoDate.minusDays(1))
assert(offsets1.head.minOffset.get.valueString.toLong == -62135596800000L)
assert(offsets1.head.minOffset.get.valueString.toLong == 1613563930000L)
assert(offsets1.head.maxOffset.get.valueString.toLong == 1613563930000L)
assert(offsets1.head.committedAt.nonEmpty)


val offsets2 = om.getOffsets("table1", infoDate)
assert(offsets2.length == 1)
assert(offsets2.head.minOffset.get.valueString.toLong == 1613563930000L)
assert(offsets2.head.minOffset.get.valueString.toLong == 1613639398123L)
assert(offsets2.head.maxOffset.get.valueString.toLong == 1613639399123L)
assert(offsets2.head.committedAt.nonEmpty)

val offsets3 = om.getOffsets("table1", infoDate.plusDays(1))
assert(offsets3.length == 1)
assert(offsets3.head.minOffset.get.valueString.toLong == 1613639399123L)
assert(offsets3.head.minOffset.get.valueString.toLong == 1613740330000L)
assert(offsets3.head.maxOffset.get.valueString.toLong == 1613740330000L)
assert(offsets3.head.committedAt.nonEmpty)
}
Expand Down Expand Up @@ -1109,13 +1108,13 @@ class IncrementalPipelineLongFixture extends AnyWordSpec

val offsets1 = om.getOffsets("table1", infoDate.minusDays(1))
assert(offsets1.length == 1)
assert(offsets1.head.minOffset.get.valueString.toLong == Long.MinValue)
assert(offsets1.head.minOffset.get.valueString.toLong == 1)
assert(offsets1.head.maxOffset.get.valueString.toLong == 2)
assert(offsets1.head.committedAt.nonEmpty)

val offsets2 = om.getOffsets("table1", infoDate)
assert(offsets2.length == 1)
assert(offsets2.head.minOffset.get.valueString.toLong == 2)
assert(offsets2.head.minOffset.get.valueString.toLong == 3)
assert(offsets2.head.maxOffset.get.valueString.toLong == 4)
assert(offsets2.head.committedAt.nonEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class SqlGeneratorGenericSuite extends AnyWordSpec {
"work with only from offset" in {
val sql = genDate.getDataQueryIncremental("table1", None, Some(OffsetValue.IntegralValue(1)), None, Seq.empty)

assert(sql == "SELECT * FROM table1 WHERE offset >= 1")
assert(sql == "SELECT * FROM table1 WHERE offset > 1")
}

"work with only to offset" in {
Expand Down Expand Up @@ -242,7 +242,7 @@ class SqlGeneratorGenericSuite extends AnyWordSpec {
"work with only from offset" in {
val sql = genDate.getDataQueryIncremental("table1", Some(date1), Some(OffsetValue.IntegralValue(1)), None, Seq.empty)

assert(sql == "SELECT * FROM table1 WHERE D = date'2020-08-17' AND offset >= 1")
assert(sql == "SELECT * FROM table1 WHERE D = date'2020-08-17' AND offset > 1")
}

"work with only to offset" in {
Expand Down

0 comments on commit 99b3689

Please sign in to comment.