Skip to content

Commit

Permalink
#374 Add implementation for incremental ingestion from JDBC, and adde…
Browse files Browse the repository at this point in the history
…d first test suite for it.
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 8fe53f1 commit 5b8983c
Show file tree
Hide file tree
Showing 19 changed files with 374 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ trait Source extends ExternalChannel {
* @param onlyForInfoDate An information date to get data for. Can be empty if the source table doesn't have such a column.
* @param columns Select only specified columns. Selects all if an empty Seq is passed.
*/
def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult
def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult

/**
* This method is called after the ingestion is finished. You can query the output table form the output information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
val offsetWhere = getOffsetWhereClause(sqlConfig.offsetInfo.get, offsetFromOpt, offsetToOpt)

if (offsetWhere.nonEmpty) {
s"$dataQuery AND $offsetWhere"
if (onlyForInfoDate.isEmpty) {
s"$dataQuery WHERE $offsetWhere"
} else {
s"$dataQuery AND $offsetWhere"
}
} else {
dataQuery
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
} else {
latestOffset match {
case Some(maxOffset) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
case None =>
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
Expand All @@ -130,14 +130,14 @@ class IncrementalIngestionJob(operationDef: OperationDef,

om.getMaxInfoDateAndOffset(outputTable.name, Option(infoDate)) match {
case Some(offsets) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(offsets.minimumOffset), Option(offsets.maximumOffset), columns)
source.getDataIncremental(sourceTable.query, None, Option(offsets.minimumOffset), Option(offsets.maximumOffset), columns)
case None =>
throw new IllegalStateException(s"No offsets for '${outputTable.name}' for '$infoDate'. Cannot rerun.")
}
} else {
latestOffset match {
case Some(maxOffset) =>
source.getIncrementalData(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns)
source.getDataIncremental(sourceTable.query, None, Option(maxOffset.maximumOffset), None, columns)
case None =>
source.getData(sourceTable.query, infoDate, infoDate, columns)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
)(implicit spark: SparkSession) extends TableReaderJdbcBase(jdbcReaderConfig, jdbcUrlSelector, conf) {
private val log = LoggerFactory.getLogger(this.getClass)

private val infoDateFormatter = DateTimeFormatter.ofPattern(jdbcReaderConfig.infoDateFormat)

logConfiguration()

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
Expand Down Expand Up @@ -68,7 +66,14 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = {
query match {
case Query.Table(tableName) =>
getDataForTableIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns)
case other =>
throw new IllegalArgumentException(s"'${other.name}' incremental ingestion is not supported by the JDBC reader. Use 'table' instead.")
}
}

private[core] def getJdbcConfig: TableReaderJdbcConfig = jdbcReaderConfig

Expand Down Expand Up @@ -149,6 +154,24 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
df
}

private[core] def getDataForTableIncremental(tableName: String,
onlyForInfoDate: Option[LocalDate],
offsetFrom: Option[OffsetValue],
offsetTo: Option[OffsetValue],
columns: Seq[String]): DataFrame = {
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns, jdbcReaderConfig.limitRecords)

log.info(s"JDBC Query: $sql")

val df = getWithRetry[DataFrame](sql, isDataQuery = true, jdbcRetries, Option(tableName))(df => {
// Make sure connection to the server is made without fetching the data
log.debug(df.schema.treeString)
df
})

df
}

private[core] def getDataForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[DataFrame](filteredSql, isDataQuery = true, jdbcRetries, None)(df => filterDfColumns(df, columns))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = ???
override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): DataFrame = {
query match {
case Query.Table(tableName) =>
getDataForTableIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns)
case other =>
throw new IllegalArgumentException(s"'${other.name}' incremental ingestion is not supported by the JDBC Native reader. Use 'table' instead.")
}
}

private[core] def getSqlExpression(query: Query): String = {
query match {
Expand Down Expand Up @@ -106,6 +113,31 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,

df
}

private[core] def getDataForTableIncremental(tableName: String,
onlyForInfoDate: Option[LocalDate],
offsetFrom: Option[OffsetValue],
offsetTo: Option[OffsetValue],
columns: Seq[String]): DataFrame = {
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns, jdbcReaderConfig.limitRecords)
log.info(s"JDBC Query: $sql")

var df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, url, sql)

if (log.isDebugEnabled) {
log.debug(df.schema.treeString)
}

if (jdbcReaderConfig.enableSchemaMetadata) {
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, _) =>
log.info(s"Reading JDBC metadata descriptions the table: $tableName")
df = spark.createDataFrame(df.rdd,
JdbcSparkUtils.addColumnDescriptionsFromJdbc(df.schema, sqlGen.unquote(tableName), connection))
}
}

df
}
}

object TableReaderJdbcNative {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.reader.{JdbcUrlSelector, TableReaderJdbc, TableReaderJdbcNative}

Expand All @@ -35,6 +35,10 @@ class JdbcSource(sourceConfig: Config,

override def hasInfoDateColumn(query: Query): Boolean = jdbcReaderConfig.hasInfoDate

override def getOffsetInfo: Option[OffsetInfo] = {
jdbcReaderConfig.offsetInfoOpt
}

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val reader = getReader(query, isCountQuery = true)

Expand All @@ -49,6 +53,14 @@ class JdbcSource(sourceConfig: Config,
SourceResult(df)
}

override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = {
val reader = getReader(query, isCountQuery = false)

val df = reader.getIncrementalData(query, onlyForInfoDate, offsetFrom, offsetTo, columns)

SourceResult(df)
}

private[core] def getReader(query: Query, isCountQuery: Boolean): TableReader = {
val jdbcConfig = TableReaderJdbcNative.getJdbcConfig(jdbcReaderConfig, sourceConfig)
val jdbcReaderConfigNative = jdbcReaderConfig.copy(jdbcConfig = jdbcConfig)
Expand All @@ -75,8 +87,6 @@ class JdbcSource(sourceConfig: Config,
private def canUseSparkBuiltInJdbcConnector(sql: String): Boolean = {
sql.toLowerCase.startsWith("select") && !jdbcReaderConfig.useJdbcNative
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object JdbcSource extends ExternalChannelFactory[JdbcSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class LocalSparkSource(sparkSource: SparkSource,
tempPath.toString
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object LocalSparkSource extends ExternalChannelFactoryV2[LocalSparkSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class RawFileSource(val sourceConfig: Config,
}
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
}

object RawFileSource extends ExternalChannelFactory[RawFileSource] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class SparkSource(val format: Option[String],
tableReader
}

override def getIncrementalData(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = {
override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = {
val reader = getReader(query)

val df = reader.getIncrementalData(query, onlyForInfoDate, offsetFrom, offsetTo, columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.TimeZone

class SqlGeneratorHsqlDb(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
private val dateFormatterApp = DateTimeFormatter.ofPattern(sqlConfig.dateFormatApp)
Expand Down Expand Up @@ -97,7 +98,10 @@ class SqlGeneratorHsqlDb(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfi
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition TIMESTAMPADD(MILLISECOND, ${ts.toEpochMilli}, TIMESTAMP '1970-01-01 00:00:00')"
// UNIX_MILLIS() in HSQLDB always produces the epoch milli as if the timestamp is in UTC timezone
// But the actual timestamp is in Africa/Johannesburg timezone, so the time set ends up 2 hours bigger then expected.
val timeZoneOffsetHours = TimeZone.getDefault.getRawOffset
s"UNIX_MILLIS($column)-$timeZoneOffsetHours $condition ${ts.toEpochMilli}"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#base.path = "/tmp"

pramen {
pipeline.name = "Integration test with a file-based source"
pipeline.name = "Incremental ingestion with a file-based source"

bookkeeping.enabled = true
stop.spark.session = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This variable is expected to be set up by the test suite
#base.path = "/tmp"

pramen {
pipeline.name = "Incremental ingestion with a JDBC-based source"

bookkeeping.enabled = true
stop.spark.session = false

track.days = 2 # Previous day and today
}

pramen.metastore {
tables = [
{
name = "table1"
format = "delta"
path = ${base.path}/table1
}
]
}

pramen.sources.1 = [
{
name = "jdbc_source"
factory.class = "za.co.absa.pramen.core.source.JdbcSource"
jdbc {
driver = "org.hsqldb.jdbc.JDBCDriver"
connection.string = ${pramen.bookkeeping.jdbc.url}
user = ${pramen.bookkeeping.jdbc.user}
password = ${pramen.bookkeeping.jdbc.password}
}

has.information.date.column = ${has.information.date.column}
information.date.column = "info_date"
information.date.type = "string"

offset.column {
name = "last_updated"
type = "datetime"
}

format = "parquet"
}
]

pramen.operations = [
{
name = "Sourcing from a JDBC table"
type = "ingestion"
schedule.type = "incremental"

source = "jdbc_source"

info.date.expr = "@runDate"

tables = [
{
input.table = "incremental_table"
output.metastore.table = table1
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#base.path = "/tmp"

pramen {
pipeline.name = "Integration test with a file-based source"
pipeline.name = "Incremental ingestion with a file-based source"

bookkeeping.enabled = true
stop.spark.session = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.fixtures
import org.scalatest.{BeforeAndAfterAll, Suite}

import java.sql._
import java.util.TimeZone
import scala.collection.mutable.ListBuffer

trait RelationalDbFixture extends BeforeAndAfterAll {
Expand All @@ -35,6 +36,8 @@ trait RelationalDbFixture extends BeforeAndAfterAll {
def getConnection: Connection = DriverManager.getConnection(url, user, password)

override protected def beforeAll(): Unit = {
TimeZone.setDefault(TimeZone.getTimeZone("Africa/Johannesburg"))

super.beforeAll()
Class.forName(driver)
}
Expand Down
Loading

0 comments on commit 5b8983c

Please sign in to comment.