Skip to content

Commit

Permalink
#374 Add server timezone to JDBC reader configuration to accommodate …
Browse files Browse the repository at this point in the history
…for timezone differences in timestamp offsets.
  • Loading branch information
yruslan committed Sep 30, 2024
1 parent 5b8983c commit 8b4bfb1
Show file tree
Hide file tree
Showing 24 changed files with 184 additions and 77 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ is determined by the pipeline configuration.
# Specifies the maximum number of records to fetch. Good for testing purposes.
#limit.records = 100
# Specify the timezone of the database server, if it is different from the default timezone.
# It is needed for incremental ingestion based on offset field that has a timestamp or datetime data type.
#server.timezone = "Africa/Johannesburg"
# Optionally, you can specify a class for a custom SQL generator for your RDMS engine.
# The class whould extend 'za.co.absa.pramen.api.sql.SqlGenerator'
#sql.generator.class = "com.example.MySqlGenerator"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package za.co.absa.pramen.api.sql
import com.typesafe.config.Config
import za.co.absa.pramen.api.offset.OffsetInfo

import java.time.ZoneId

case class SqlConfig(
infoDateColumn: String,
infoDateType: SqlColumnType,
dateFormatApp: String,
offsetInfo: Option[OffsetInfo],
serverTimeZone: ZoneId,
identifierQuotingPolicy: QuotingPolicy,
sqlGeneratorClass: Option[String],
extraConfig: Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ trait SqlGenerator {
onlyForInfoDate: Option[LocalDate],
offsetFrom: Option[OffsetValue],
offsetTo: Option[OffsetValue],
columns: Seq[String], limit:
Option[Int]): String
columns: Seq[String]): String

/**
* Returns WHERE condition for table that has the information date field given the time period.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.api.sql
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ListBuffer

/**
Expand All @@ -29,6 +30,8 @@ import scala.collection.mutable.ListBuffer
abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
import SqlGeneratorBase._

protected val timestampGenericDbFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

/**
* This returns characters used for escaping into a mode that allows special characters in identifiers.
* For example,
Expand Down Expand Up @@ -85,14 +88,13 @@ abstract class SqlGeneratorBase(sqlConfig: SqlConfig) extends SqlGenerator {
onlyForInfoDate: Option[LocalDate],
offsetFromOpt: Option[OffsetValue],
offsetToOpt: Option[OffsetValue],
columns: Seq[String], limit:
Option[Int]): String = {
columns: Seq[String]): String = {
if (sqlConfig.offsetInfo.isEmpty)
throw new IllegalArgumentException(s"Offset information is not configured for database table: $tableName.")

val dataQuery = onlyForInfoDate match {
case Some(infoDate) => getDataQuery(tableName, infoDate, infoDate, columns, limit)
case None => getDataQuery(tableName, columns, limit)
case Some(infoDate) => getDataQuery(tableName, infoDate, infoDate, columns, None)
case None => getDataQuery(tableName, columns, None)
}

val offsetWhere = getOffsetWhereClause(sqlConfig.offsetInfo.get, offsetFromOpt, offsetToOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
offsetFrom: Option[OffsetValue],
offsetTo: Option[OffsetValue],
columns: Seq[String]): DataFrame = {
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns, jdbcReaderConfig.limitRecords)
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns)

log.info(s"JDBC Query: $sql")

Expand Down Expand Up @@ -248,8 +248,8 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

object TableReaderJdbc {
def apply(conf: Config, parent: String)(implicit spark: SparkSession): TableReaderJdbc = {
val jdbcTableReaderConfig = TableReaderJdbcConfig.load(conf, parent)
def apply(conf: Config, workflowConf: Config, parent: String)(implicit spark: SparkSession): TableReaderJdbc = {
val jdbcTableReaderConfig = TableReaderJdbcConfig.load(conf, workflowConf, parent)

val urlSelector = JdbcUrlSelector(jdbcTableReaderConfig.jdbcConfig)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class TableReaderJdbcBase(jdbcReaderConfig: TableReaderJdbcConfig,
jdbcReaderConfig.infoDateType,
jdbcReaderConfig.infoDateFormat,
jdbcReaderConfig.offsetInfoOpt,
jdbcReaderConfig.serverTimeZone,
jdbcReaderConfig.identifierQuotingPolicy,
jdbcReaderConfig.sqlGeneratorClass,
ConfigUtils.getExtraConfig(conf, "sql"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
offsetFrom: Option[OffsetValue],
offsetTo: Option[OffsetValue],
columns: Seq[String]): DataFrame = {
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns, jdbcReaderConfig.limitRecords)
val sql = sqlGen.getDataQueryIncremental(tableName, onlyForInfoDate, offsetFrom, offsetTo, columns)
log.info(s"JDBC Query: $sql")

var df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, url, sql)
Expand All @@ -144,9 +144,10 @@ object TableReaderJdbcNative {
val FETCH_SIZE_KEY = "option.fetchsize"

def apply(conf: Config,
workflowConf: Config,
parent: String = "")
(implicit spark: SparkSession): TableReaderJdbcNative = {
val tableReaderJdbcOrig = TableReaderJdbcConfig.load(conf, parent)
val tableReaderJdbcOrig = TableReaderJdbcConfig.load(conf, workflowConf, parent)
val jdbcConfig = getJdbcConfig(tableReaderJdbcOrig, conf)
val tableReaderJdbc = tableReaderJdbcOrig.copy(jdbcConfig = jdbcConfig)
val urlSelector = JdbcUrlSelector(tableReaderJdbc.jdbcConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.offset.OffsetInfo
import za.co.absa.pramen.api.sql.{QuotingPolicy, SqlColumnType}
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.utils.ConfigUtils

import java.time.ZoneId

case class TableReaderJdbcConfig(
jdbcConfig: JdbcConfig,
hasInfoDate: Boolean,
Expand All @@ -35,6 +38,7 @@ case class TableReaderJdbcConfig(
correctDecimalsFixPrecision: Boolean = false,
enableSchemaMetadata: Boolean = false,
useJdbcNative: Boolean = false,
serverTimeZone: ZoneId = ZoneId.systemDefault(),
identifierQuotingPolicy: QuotingPolicy = QuotingPolicy.Auto,
sqlGeneratorClass: Option[String] = None
)
Expand All @@ -54,10 +58,11 @@ object TableReaderJdbcConfig {
val CORRECT_DECIMALS_FIX_PRECISION = "correct.decimals.fix.precision"
val ENABLE_SCHEMA_METADATA_KEY = "enable.schema.metadata"
val USE_JDBC_NATIVE = "use.jdbc.native"
val SERVER_TIMEZONE = "server.timezone"
val IDENTIFIER_QUOTING_POLICY = "identifier.quoting.policy"
val SQL_GENERATOR_CLASS_KEY = "sql.generator.class"

def load(conf: Config, parent: String = ""): TableReaderJdbcConfig = {
def load(conf: Config, workflowConf: Config, parent: String = ""): TableReaderJdbcConfig = {
ConfigUtils.validatePathsExistence(conf, parent, HAS_INFO_DATE :: Nil)

val hasInformationDate = conf.getBoolean(HAS_INFO_DATE)
Expand All @@ -82,6 +87,9 @@ object TableReaderJdbcConfig {

val offsetInfoOpt = OffsetInfoParser.fromConfig(conf)

val defaultTimezone = ConfigUtils.getOptionString(workflowConf, Keys.TIMEZONE).getOrElse("Africa/Johannesburg")
val serverTimezone = ZoneId.of(ConfigUtils.getOptionString(conf, SERVER_TIMEZONE).getOrElse(defaultTimezone))

val identifierQuotingPolicy = ConfigUtils.getOptionString(conf, IDENTIFIER_QUOTING_POLICY)
.map(s => QuotingPolicy.fromString(s))
.getOrElse(QuotingPolicy.Auto)
Expand All @@ -99,6 +107,7 @@ object TableReaderJdbcConfig {
correctDecimalsFixPrecision = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_FIX_PRECISION).getOrElse(false),
enableSchemaMetadata = ConfigUtils.getOptionBoolean(conf, ENABLE_SCHEMA_METADATA_KEY).getOrElse(false),
useJdbcNative = ConfigUtils.getOptionBoolean(conf, USE_JDBC_NATIVE).getOrElse(false),
serverTimezone,
identifierQuotingPolicy = identifierQuotingPolicy,
sqlGeneratorClass = ConfigUtils.getOptionString(conf, SQL_GENERATOR_CLASS_KEY)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class JdbcSource(sourceConfig: Config,
}
}

object JdbcSource extends ExternalChannelFactory[JdbcSource] {
override def apply(conf: Config, parentPath: String, spark: SparkSession): JdbcSource = {
val tableReaderJdbc = TableReaderJdbcConfig.load(conf)
object JdbcSource extends ExternalChannelFactoryV2[JdbcSource] {
override def apply(conf: Config, workflowConf: Config, parentPath: String, spark: SparkSession): JdbcSource = {
val tableReaderJdbc = TableReaderJdbcConfig.load(conf, workflowConf)

new JdbcSource(conf, parentPath, tableReaderJdbc)(spark)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.sql
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

class SqlGeneratorDb2(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
Expand Down Expand Up @@ -97,7 +97,9 @@ class SqlGeneratorDb2(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig)
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition TIMESTAMP('1970-01-01', '00:00:00') + (${ts.toEpochMilli} / 1000) SECONDS"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition TIMESTAMP('$tsLiteral')"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}
import za.co.absa.pramen.core.sql.dialects.DenodoDialect

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

object SqlGeneratorDenodo {
Expand Down Expand Up @@ -115,7 +115,9 @@ class SqlGeneratorDenodo(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfi
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition TIMESTAMPADD('MILLISECOND', ${ts.toEpochMilli}, TO_TIMESTAMP('1970-01-01 00:00:00'))"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition TIMESTAMP '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.sql
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

class SqlGeneratorGeneric(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
Expand Down Expand Up @@ -97,7 +97,9 @@ class SqlGeneratorGeneric(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConf
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition TIMESTAMP '1970-01-01 00:00:00' + INTERVAL '${ts.toEpochMilli}' MILLISECOND"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}
import za.co.absa.pramen.core.sql.dialects.HiveDialect

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

object SqlGeneratorHive {
Expand Down Expand Up @@ -111,7 +111,9 @@ class SqlGeneratorHive(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig)
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition from_unixtime(${ts.toEpochMilli} / 1000)"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package za.co.absa.pramen.core.sql
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.TimeZone
import java.time.{LocalDate, LocalDateTime}

class SqlGeneratorHsqlDb(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
private val dateFormatterApp = DateTimeFormatter.ofPattern(sqlConfig.dateFormatApp)
Expand Down Expand Up @@ -98,10 +97,9 @@ class SqlGeneratorHsqlDb(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfi
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
// UNIX_MILLIS() in HSQLDB always produces the epoch milli as if the timestamp is in UTC timezone
// But the actual timestamp is in Africa/Johannesburg timezone, so the time set ends up 2 hours bigger then expected.
val timeZoneOffsetHours = TimeZone.getDefault.getRawOffset
s"UNIX_MILLIS($column)-$timeZoneOffsetHours $condition ${ts.toEpochMilli}"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition TIMESTAMP '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import za.co.absa.pramen.api.sql.SqlGeneratorBase.{needsEscaping, validateIdenti
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGenerator}
import za.co.absa.pramen.core.utils.MutableStack

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ListBuffer

class SqlGeneratorMicrosoft(sqlConfig: SqlConfig) extends SqlGenerator {
private val dateFormatterApp = DateTimeFormatter.ofPattern(sqlConfig.dateFormatApp)
private val timestampMsDbFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
private val isIso = sqlConfig.dateFormatApp.toLowerCase.startsWith("yyyy-mm-dd")

// 23 is "yyyy-MM-dd", see https://www.mssqltips.com/sqlservertip/1145/date-and-time-conversions-using-sql-server/
Expand Down Expand Up @@ -132,14 +133,13 @@ class SqlGeneratorMicrosoft(sqlConfig: SqlConfig) extends SqlGenerator {
onlyForInfoDate: Option[LocalDate],
offsetFromOpt: Option[OffsetValue],
offsetToOpt: Option[OffsetValue],
columns: Seq[String], limit:
Option[Int]): String = {
columns: Seq[String]): String = {
if (sqlConfig.offsetInfo.isEmpty)
throw new IllegalArgumentException(s"Offset information is not configured for database table: $tableName.")

val dataQuery = onlyForInfoDate match {
case Some(infoDate) => getDataQuery(tableName, infoDate, infoDate, columns, limit)
case None => getDataQuery(tableName, columns, limit)
case Some(infoDate) => getDataQuery(tableName, infoDate, infoDate, columns, None)
case None => getDataQuery(tableName, columns, None)
}

val offsetWhere = getOffsetWhereClause(sqlConfig.offsetInfo.get, offsetFromOpt, offsetToOpt)
Expand Down Expand Up @@ -173,7 +173,9 @@ class SqlGeneratorMicrosoft(sqlConfig: SqlConfig) extends SqlGenerator {
private[core] def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition DATEADD(MILLISECOND, ${ts.toEpochMilli}, '1970-01-01')"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampMsDbFormatter.format(ldt)
s"$column $condition '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.sql
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

class SqlGeneratorMySQL(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
Expand Down Expand Up @@ -98,7 +98,9 @@ class SqlGeneratorMySQL(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition FROM_UNIXTIME(${ts.toEpochMilli} / 1000)"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.sql
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.api.sql.{SqlColumnType, SqlConfig, SqlGeneratorBase}

import java.time.LocalDate
import java.time.{LocalDate, LocalDateTime}
import java.time.format.DateTimeFormatter

class SqlGeneratorOracle(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfig) {
Expand Down Expand Up @@ -101,7 +101,9 @@ class SqlGeneratorOracle(sqlConfig: SqlConfig) extends SqlGeneratorBase(sqlConfi
override def getOffsetWhereCondition(column: String, condition: String, offset: OffsetValue): String = {
offset match {
case OffsetValue.DateTimeType(ts) =>
s"$column $condition (TIMESTAMP '1970-01-01 00:00:00 UTC' + NUMTODSINTERVAL(${ts.toEpochMilli} / 1000, 'SECOND'))"
val ldt = LocalDateTime.ofInstant(ts, sqlConfig.serverTimeZone)
val tsLiteral = timestampGenericDbFormatter.format(ldt)
s"$column $condition TIMESTAMP '$tsLiteral'"
case OffsetValue.IntegralType(value) =>
s"$column $condition $value"
case OffsetValue.StringType(value) =>
Expand Down
Loading

0 comments on commit 8b4bfb1

Please sign in to comment.