Skip to content

Commit

Permalink
#374 Implement the offset type: 'datetime' for incremental ingestion.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 17, 2024
1 parent d23cab2 commit 5f5d7d6
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,58 @@

package za.co.absa.pramen.api.offset

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType => SparkStringType}
import org.apache.spark.sql.{Column, functions}

import java.time.Instant

sealed trait OffsetValue extends Comparable[OffsetValue] {
def dataTypeString: String

def valueString: String

def getSparkLit: Column

def getSparkCol(col: Column): Column
}

object OffsetValue {
val LONG_TYPE_STR = "long"
val DATETIME_TYPE_STR = "datetime"
val INTEGRAL_TYPE_STR = "integral"
val STRING_TYPE_STR = "string"

case class LongType(value: Long) extends OffsetValue {
override val dataTypeString: String = LONG_TYPE_STR
val MINIMUM_TIMESTAMP_EPOCH_MILLI: Long = -62135596800000L

case class DateTimeType(t: Instant) extends OffsetValue {
override val dataTypeString: String = DATETIME_TYPE_STR

override def valueString: String = t.toEpochMilli.toString

override def getSparkLit: Column = lit(t.toEpochMilli)

override def getSparkCol(c: Column): Column = functions.concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType)

override def compareTo(o: OffsetValue): Int = {
o match {
case DateTimeType(otherValue) => t.compareTo(otherValue)
case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}")
}
}
}

case class IntegralType(value: Long) extends OffsetValue {
override val dataTypeString: String = INTEGRAL_TYPE_STR

override def valueString: String = value.toString

override def getSparkLit: Column = lit(value)

override def getSparkCol(c: Column): Column = c.cast(LongType)

override def compareTo(o: OffsetValue): Int = {
o match {
case LongType(otherValue) => value.compareTo(otherValue)
case IntegralType(otherValue) => value.compareTo(otherValue)
case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}")
}
}
Expand All @@ -53,6 +80,8 @@ object OffsetValue {

override def getSparkLit: Column = lit(s)

override def getSparkCol(c: Column): Column = c.cast(SparkStringType)

override def compareTo(o: OffsetValue): Int = {
o match {
case StringType(otherValue) => s.compareTo(otherValue)
Expand All @@ -63,14 +92,16 @@ object OffsetValue {

def getMinimumForType(dataType: String): OffsetValue = {
dataType match {
case LONG_TYPE_STR => LongType(Long.MinValue)
case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(MINIMUM_TIMESTAMP_EPOCH_MILLI)) // LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
case INTEGRAL_TYPE_STR => IntegralType(Long.MinValue)
case STRING_TYPE_STR => StringType("")
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}

def fromString(dataType: String, value: String): OffsetValue = dataType match {
case LONG_TYPE_STR => LongType(value.toLong)
case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(value.toLong))
case INTEGRAL_TYPE_STR => IntegralType(value.toLong)
case STRING_TYPE_STR => StringType(value)
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,24 @@

package za.co.absa.pramen.api.offset

import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{from_unixtime, lit}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.offset.OffsetValue.MINIMUM_TIMESTAMP_EPOCH_MILLI

import java.time.Instant

class OffsetValueSuite extends AnyWordSpec {
"OffsetValue" should {
"be able to create a LongType instance" in {
val offsetValue = OffsetValue.LongType(42)
assert(offsetValue.dataTypeString == "long")
"be able to create a TimestampType instance" in {
val offsetValue = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
assert(offsetValue.dataTypeString == "timestamp")
assert(offsetValue.valueString == "1726564198000")
assert(offsetValue.getSparkLit == from_unixtime(lit(1726564198000L)))
}

"be able to create a IntegralType instance" in {
val offsetValue = OffsetValue.IntegralType(42)
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == "42")
assert(offsetValue.getSparkLit == lit(42L))
}
Expand All @@ -37,9 +47,15 @@ class OffsetValueSuite extends AnyWordSpec {
}

"getMinimumForType" should {
"be able to get minimum value for long type" in {
val offsetValue = OffsetValue.getMinimumForType("long")
assert(offsetValue.dataTypeString == "long")
"be able to get minimum value for timestamp type" in {
val offsetValue = OffsetValue.getMinimumForType("timestamp")
assert(offsetValue.dataTypeString == "timestamp")
assert(offsetValue.valueString == MINIMUM_TIMESTAMP_EPOCH_MILLI.toString)
}

"be able to get minimum value for integral type" in {
val offsetValue = OffsetValue.getMinimumForType("integral")
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == Long.MinValue.toString)
}

Expand All @@ -57,9 +73,9 @@ class OffsetValueSuite extends AnyWordSpec {
}

"fromString" should {
"be able to create a LongType instance from a string" in {
val offsetValue = OffsetValue.fromString("long", "42")
assert(offsetValue.dataTypeString == "long")
"be able to create a IntegralType instance from a string" in {
val offsetValue = OffsetValue.fromString("integral", "42")
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == "42")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.pipeline

import com.typesafe.config.Config
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, LongType, ShortType, StringType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.offset.{DataOffset, OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
Expand Down Expand Up @@ -54,6 +54,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
val om = bookkeeper.getOffsetManager
latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)

val uncommittedOffsets = om.getOffsets(outputTable.name, infoDate).filter(_.committedAt.isEmpty)

Expand Down Expand Up @@ -98,8 +99,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
val newMaxOffset = if (df.isEmpty) {
minOffset
} else {
val row = df.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0)
OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String])
getMaximumOffsetFromDf(df, offsetInfo)
}

log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " +
Expand Down Expand Up @@ -186,8 +186,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
om.rollbackOffsets(req)
}
} else {
val row = updatedDf.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0)
val maxOffset = OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String])
val maxOffset = getMaximumOffsetFromDf(df, offsetInfo)

if (isRerun) {
om.commitRerun(req, maxOffset)
Expand Down Expand Up @@ -222,6 +221,13 @@ class IncrementalIngestionJob(operationDef: OperationDef,
SaveResult(stats, warnings = tooLongWarnings)
}

private[core] def getMaximumOffsetFromDf(df: DataFrame, offsetInfo: OffsetInfo): OffsetValue = {
val row = df.agg(max(offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn)))
.cast(StringType))
.collect()(0)
OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String])
}

private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = {
if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.")
Expand All @@ -230,7 +236,12 @@ class IncrementalIngestionJob(operationDef: OperationDef,
val field = df.schema.fields.find(_.name.equalsIgnoreCase(offsetInfo.offsetColumn)).get

offsetInfo.minimalOffset match {
case v: OffsetValue.LongType =>
case v: OffsetValue.DateTimeType =>
if (!field.dataType.isInstanceOf[TimestampType]) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " +
s"But only '${TimestampType.typeName}' is supported for offset type '${v.dataTypeString}'.")
}
case v: OffsetValue.IntegralType =>
if (!field.dataType.isInstanceOf[ShortType] && !field.dataType.isInstanceOf[IntegerType] && !field.dataType.isInstanceOf[LongType]) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " +
s"But only integral types are supported for offset type '${v.dataTypeString}'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ package za.co.absa.pramen.core.reader

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.sql.SqlColumnType
import za.co.absa.pramen.api.{Query, TableReader}

import java.sql.Date
import java.time.LocalDate
import java.time.format.DateTimeFormatter

class TableReaderSpark(formatOpt: Option[String],
schemaOpt: Option[String],
hasInfoDateColumn: Boolean,
infoDateColumn: String,
infoDateDataType: SqlColumnType,
infoDateFormat: String = "yyyy-MM-dd",
offsetInfoOpt: Option[OffsetInfo],
options: Map[String, String] = Map.empty[String, String]
Expand Down Expand Up @@ -68,15 +72,16 @@ class TableReaderSpark(formatOpt: Option[String],

override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = {
val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}."))
val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))
infoDateOpt match {
case Some(infoDate) if hasInfoDateColumn =>
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${minOffset.valueString}")
getData(query, infoDate, infoDate, columns)
.filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit)
.filter(offsetCol > minOffset.getSparkLit)
case _ =>
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString}")
getBaseDataFrame(query)
.filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit)
.filter(offsetCol > minOffset.getSparkLit)
}
}

Expand All @@ -87,9 +92,10 @@ class TableReaderSpark(formatOpt: Option[String],
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'")
getData(query, infoDate, infoDate, columns)
case _ =>
val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))
log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString} AND ${offsetInfo.offsetColumn} <= ${maxOffset.valueString}")
getBaseDataFrame(query)
.filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit && col(offsetInfo.offsetColumn) <= maxOffset.getSparkLit)
.filter(offsetCol > minOffset.getSparkLit && offsetCol <= maxOffset.getSparkLit)
}
}

Expand Down Expand Up @@ -128,15 +134,26 @@ class TableReaderSpark(formatOpt: Option[String],
}

private[core] def getFilteredDataFrame(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): DataFrame = {
val infoDateBeginStr = dateFormatter.format(infoDateBegin)
val infoDateEndStr = dateFormatter.format(infoDateEnd)

if (infoDateBegin.equals(infoDateEnd)) {
getBaseDataFrame(query)
.filter(col(s"$infoDateColumn") === lit(infoDateBeginStr))
} else {
getBaseDataFrame(query)
.filter(col(s"$infoDateColumn") >= lit(infoDateBeginStr) && col(s"$infoDateColumn") <= lit(infoDateEndStr))
infoDateDataType match {
case SqlColumnType.DATETIME =>
if (infoDateBegin.equals(infoDateEnd)) {
getBaseDataFrame(query)
.filter(col(infoDateColumn).cast(DateType) === lit(Date.valueOf(infoDateBegin)))
} else {
getBaseDataFrame(query)
.filter(col(infoDateColumn).cast(DateType) >= lit(Date.valueOf(infoDateBegin)) && col(infoDateColumn).cast(DateType) <= lit(Date.valueOf(infoDateEnd)))
}
case _ =>
val infoDateBeginStr = dateFormatter.format(infoDateBegin)
val infoDateEndStr = dateFormatter.format(infoDateEnd)

if (infoDateBegin.equals(infoDateEnd)) {
getBaseDataFrame(query)
.filter(col(infoDateColumn) === lit(infoDateBeginStr))
} else {
getBaseDataFrame(query)
.filter(col(infoDateColumn) >= lit(infoDateBeginStr) && col(infoDateColumn) <= lit(infoDateEndStr))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object TableReaderJdbcConfig {
infoDateFormat,
hasOffsetColumn,
ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY).getOrElse(""),
ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("long"),
ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("integral"),
limitRecords = ConfigUtils.getOptionInt(conf, JDBC_SYNC_LIMIT_RECORDS),
saveTimestampsAsDates,
correctDecimalsInSchema = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_IN_SCHEMA).getOrElse(false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
completedJobsChannel.send((job, Nil, isSucceeded))
} catch {
case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient)
case NonFatal(ex) => sendFailure(ex, job, isTransient)
case NonFatal(ex) => onNonFatalException(ex, job, isTransient)
case ex: Throwable => onFatalException(ex, job, isTransient)
}
}
Expand All @@ -105,6 +105,11 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
sendFailure(fatalEx, job, isTransient)
}

private[core] def onNonFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} Job '${job.name}' outputting to '${job.outputTable.name}' has thrown an error", ex)
sendFailure(ex, job, isTransient)
}

private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
completedJobsChannel.send((job,
TaskResult(job.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,13 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has
val infoDate = evaluateRunDate(runDate, infoDateExpression)
log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate")

val runInfoDays = lastOffsets match {
case Some(offset) =>
if (offset.maximumInfoDate.isAfter(infoDate)) {
Seq.empty
} else {
Seq(infoDate)
.map(d => TaskPreDef(d, TaskRunReason.New))
}
case None =>
if (hasInfoDateColumn)
Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
else
Seq(TaskPreDef(infoDate, TaskRunReason.New))
val runInfoDays = if (hasInfoDateColumn)
Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
else {
lastOffsets match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty
case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New))
}
}

log.info(s"Days to run: ${runInfoDays.map(_.infoDate).mkString(", ")}")
Expand Down
Loading

0 comments on commit 5f5d7d6

Please sign in to comment.