Skip to content

Commit

Permalink
#374 Add offset configuration for JDBC sources.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 19, 2024
1 parent ca224ca commit 86f02ad
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 73 deletions.
6 changes: 3 additions & 3 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ trait Source extends ExternalChannel {
def hasInfoDateColumn(query: Query): Boolean = true

/**
* If non-empty, the source + query is configured for incremental ingestion, returns minimum value with type
* If non-empty, the source is configured for incremental ingestion, returns minimum value with type
*
* If empty, the source + query can't be used for incremental ingestion.
* If empty, the source can't be used for incremental ingestion.
*/
def getOffsetInfo(query: Query): Option[OffsetInfo] = None
def getOffsetInfo: Option[OffsetInfo] = None

/**
* Validates if the source is okay and the ingestion can proceed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,20 @@ object SqlColumnType {
case _ => None
}
}

def fromStringStrict(s: String, configPath: String = ""): SqlColumnType = {
s.toLowerCase() match {
case DATE.toString => DATE
case DATETIME.toString => DATETIME
case STRING.toString => STRING
case NUMBER.toString => NUMBER
case _ =>
val allowedTypes = Seq(DATE.toString, DATETIME.toString, STRING.toString, NUMBER.toString).mkString(", ")
if (configPath.nonEmpty) {
throw new IllegalArgumentException(s"Unknown information type '$s' configured at $configPath. Allowed valued: $allowedTypes.")
} else {
throw new IllegalArgumentException(s"Unknown information type '$s'. Allowed valued: $allowedTypes.")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package za.co.absa.pramen.api.sql

import com.typesafe.config.Config
import za.co.absa.pramen.api.offset.OffsetInfo

case class SqlConfig(
infoDateColumn: String,
infoDateType: SqlColumnType,
dateFormatApp: String,
offsetInfo: Option[OffsetInfo],
identifierQuotingPolicy: QuotingPolicy,
sqlGeneratorClass: Option[String],
extraConfig: Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
private def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, infoDate: LocalDate, uncommittedOffsets: Array[DataOffset]): Unit = {
val minOffset = uncommittedOffsets.map(_.minOffset).min

val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " +
val offsetInfo = source.getOffsetInfo.getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " +
s"query: '${sourceTable.query.query}''"))

val df = try {
Expand Down Expand Up @@ -126,7 +126,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
if (source.getOffsetInfo(sourceTable.query).nonEmpty) {
if (source.getOffsetInfo.nonEmpty) {
Reason.Ready
} else {
Reason.NotReady(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
Expand Down Expand Up @@ -169,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,

val om = bookkeeper.getOffsetManager

val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse(
val offsetInfo = source.getOffsetInfo.getOrElse(
throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.reader.model.{OffsetInfoParser, TableReaderJdbcConfig}
import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcNativeUtils, JdbcSparkUtils, TimeUtils}

import java.time.format.DateTimeFormatter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.reader
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.TableReader
import za.co.absa.pramen.api.offset.OffsetInfo
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig}
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.sql.SqlGeneratorLoader
Expand All @@ -44,18 +45,13 @@ abstract class TableReaderJdbcBase(jdbcReaderConfig: TableReaderJdbcConfig,
}

private[core] def getSqlConfig: SqlConfig = {
val dateFieldType = SqlColumnType.fromString(jdbcReaderConfig.infoDateType)
dateFieldType match {
case Some(infoDateType) =>
SqlConfig(jdbcReaderConfig.infoDateColumn,
infoDateType,
jdbcReaderConfig.infoDateFormat,
jdbcReaderConfig.identifierQuotingPolicy,
jdbcReaderConfig.sqlGeneratorClass,
ConfigUtils.getExtraConfig(conf, "sql"))
case None => throw new IllegalArgumentException(s"Unknown info date type specified (${jdbcReaderConfig.infoDateType}). " +
s"It should be one of: date, string, number")
}
SqlConfig(jdbcReaderConfig.infoDateColumn,
jdbcReaderConfig.infoDateType,
jdbcReaderConfig.infoDateFormat,
jdbcReaderConfig.offsetInfoOpt,
jdbcReaderConfig.identifierQuotingPolicy,
jdbcReaderConfig.sqlGeneratorClass,
ConfigUtils.getExtraConfig(conf, "sql"))
}

protected def logConfiguration(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.core.reader.model.{JdbcConfig, TableReaderJdbcConfig}
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.core.reader.model.{JdbcConfig, OffsetInfoParser, TableReaderJdbcConfig}
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, JdbcSparkUtils, StringUtils, TimeUtils}

import java.time.{Instant, LocalDate}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.reader.model

import com.typesafe.config.Config
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.core.utils.ConfigUtils

object OffsetInfoParser {
val OFFSET_COLUMN_NAME_KEY = "offset.column.name"
val OFFSET_COLUMN_TYPE_KEY = "offset.column.type"

def fromConfig(conf: Config): Option[OffsetInfo] = {
for {
columnName <- ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY)
columnType <- ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY)
} yield OffsetInfo(columnName, OffsetValue.getMinimumForType(columnType))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ package za.co.absa.pramen.core.reader.model

import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.sql.QuotingPolicy
import za.co.absa.pramen.api.offset.OffsetInfo
import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType}
import za.co.absa.pramen.core.utils.ConfigUtils

case class TableReaderJdbcConfig(
jdbcConfig: JdbcConfig,
hasInfoDate: Boolean,
infoDateColumn: String,
infoDateType: String,
infoDateType: SqlColumnType,
infoDateFormat: String = "yyyy-MM-dd",
hasOffsetColumn: Boolean,
offsetColumn: String,
offsetColumnType: String,
offsetInfoOpt: Option[OffsetInfo],
limitRecords: Option[Int] = None,
saveTimestampsAsDates: Boolean = false,
correctDecimalsInSchema: Boolean = false,
Expand All @@ -49,10 +48,6 @@ object TableReaderJdbcConfig {
val INFORMATION_DATE_FORMAT = "information.date.format"
val INFORMATION_DATE_APP_FORMAT = "information.date.app.format"

val OFFSET_COLUMN_ENABLED_KEY = "offset.column.enabled"
val OFFSET_COLUMN_NAME_KEY = "offset.column.name"
val OFFSET_COLUMN_TYPE_KEY = "offset.column.type"

val JDBC_SYNC_LIMIT_RECORDS = "limit.records"
val JDBC_TIMESTAMPS_AS_DATES = "save.timestamps.as.dates"
val CORRECT_DECIMALS_IN_SCHEMA = "correct.decimals.in.schema"
Expand All @@ -73,13 +68,9 @@ object TableReaderJdbcConfig {
INFORMATION_DATE_COLUMN :: INFORMATION_DATE_TYPE :: Nil)
}

val hasOffsetColumn = ConfigUtils.getOptionBoolean(conf, OFFSET_COLUMN_ENABLED_KEY).getOrElse(false)
val infoDateTypeStr = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date")

if (hasOffsetColumn) {
ConfigUtils.validatePathsExistence(conf,
parent,
OFFSET_COLUMN_NAME_KEY :: OFFSET_COLUMN_TYPE_KEY :: Nil)
}
val infoDateType = SqlColumnType.fromStringStrict(infoDateTypeStr, parent)

val saveTimestampsAsDates = ConfigUtils.getOptionBoolean(conf, JDBC_TIMESTAMPS_AS_DATES).getOrElse(false)

Expand All @@ -89,6 +80,8 @@ object TableReaderJdbcConfig {

val infoDateFormat = getInfoDateFormat(conf)

val offsetInfoOpt = OffsetInfoParser.fromConfig(conf)

val identifierQuotingPolicy = ConfigUtils.getOptionString(conf, IDENTIFIER_QUOTING_POLICY)
.map(s => QuotingPolicy.fromString(s))
.getOrElse(QuotingPolicy.Auto)
Expand All @@ -97,11 +90,9 @@ object TableReaderJdbcConfig {
jdbcConfig = JdbcConfig.load(conf, parent),
hasInfoDate = conf.getBoolean(HAS_INFO_DATE),
infoDateColumn = ConfigUtils.getOptionString(conf, INFORMATION_DATE_COLUMN).getOrElse(""),
infoDateType = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date"),
infoDateType = infoDateType,
infoDateFormat,
hasOffsetColumn,
ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY).getOrElse(""),
ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("integral"),
offsetInfoOpt,
limitRecords = ConfigUtils.getOptionInt(conf, JDBC_SYNC_LIMIT_RECORDS),
saveTimestampsAsDates,
correctDecimalsInSchema = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_IN_SCHEMA).getOrElse(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.sql.SqlColumnType
import za.co.absa.pramen.core.config.Keys.KEYS_TO_REDACT
import za.co.absa.pramen.core.reader.TableReaderSpark
import za.co.absa.pramen.core.reader.model.OffsetInfoParser
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig._
import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils}

Expand All @@ -36,9 +37,7 @@ class SparkSource(val format: Option[String],
val infoDateColumn: String,
val infoDateType: SqlColumnType,
val infoDateFormat: String,
val hasOffsetColumn: Boolean,
val offsetColumn: String,
val offsetColumnType: String,
val offsetInfo: Option[OffsetInfo],
val sourceConfig: Config,
val options: Map[String, String])(implicit spark: SparkSession) extends Source {
private val log = LoggerFactory.getLogger(this.getClass)
Expand All @@ -47,12 +46,8 @@ class SparkSource(val format: Option[String],

override def hasInfoDateColumn(query: Query): Boolean = hasInfoDateCol

override def getOffsetInfo(query: Query): Option[OffsetInfo] = {
if (hasOffsetColumn) {
Option(OffsetInfo(offsetColumn, OffsetValue.getMinimumForType(offsetColumnType)))
} else {
None
}
override def getOffsetInfo: Option[OffsetInfo] = {
offsetInfo
}

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
Expand All @@ -72,17 +67,16 @@ class SparkSource(val format: Option[String],
}

def getReader(query: Query): TableReader = {
val offsetInfoOpt = getOffsetInfo(query)
val tableReader = query match {
case Query.Table(table) =>
log.info(s"Using TableReaderSpark to read table: $table")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options)
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options)
case Query.Sql(sql) =>
log.info(s"Using TableReaderSpark to read SQL for: $sql")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options)
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options)
case Query.Path(path) =>
log.info(s"Using TableReaderSpark to read '$format' from: $path")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options)
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, getOffsetInfo, options)
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the Spark source. Use 'path', 'table' or 'sql' instead.")
}

Expand Down Expand Up @@ -144,16 +138,10 @@ object SparkSource extends ExternalChannelFactory[SparkSource] {
("", SqlColumnType.DATE, "")
}

val hasOffsetColumn = ConfigUtils.getOptionBoolean(conf, OFFSET_COLUMN_ENABLED_KEY).getOrElse(false)

val (offsetColumn, offsetDataType) = if (hasOffsetColumn) {
(conf.getString(OFFSET_COLUMN_NAME_KEY), conf.getString(OFFSET_COLUMN_TYPE_KEY))
} else {
("", "long")
}
val offsetInfoOpt = OffsetInfoParser.fromConfig(conf)

val options = ConfigUtils.getExtraOptions(conf, "option")

new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateType, infoDateFormat, hasOffsetColumn, offsetColumn, offsetDataType, conf, options)(spark)
new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, conf, options)(spark)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pramen.sources.1 = [
information.date.format = "yyyy-MM-dd"

offset.column {
enabled = true
name = "id"
type = "integral"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ pramen.sources.1 = [
information.date.type = "datetime"

offset.column {
enabled = true
name = "ts"
type = "datetime"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DisableCountQueryLongSuite extends AnyWordSpec with SparkTestBase with Tem
|{"id":"3","name":"Jill"}
|""".stripMargin

"be able to access inner source configuration" in {
"be able to access inner source configintegruration" in {
withTempDirectory("integration_disable_count_query") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
package za.co.absa.pramen.core.mocks

import com.typesafe.config.ConfigFactory
import za.co.absa.pramen.api.offset.OffsetInfo
import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType, SqlConfig}

object DummySqlConfigFactory {
def getDummyConfig(infoDateColumn: String = "col",
infoDateType: SqlColumnType = SqlColumnType.DATE,
dateFormatApp: String = "yyyy-MM-dd",
offsetInfo: Option[OffsetInfo] = None,
identifierQuotingPolicy: QuotingPolicy = QuotingPolicy.Auto,
sqlGeneratorClass: Option[String] = None
): SqlConfig = SqlConfig(
infoDateColumn = infoDateColumn,
infoDateType = infoDateType,
dateFormatApp = dateFormatApp,
offsetInfo = offsetInfo,
identifierQuotingPolicy = identifierQuotingPolicy,
sqlGeneratorClass = sqlGeneratorClass,
ConfigFactory.empty())
Expand Down
Loading

0 comments on commit 86f02ad

Please sign in to comment.