Skip to content

Commit

Permalink
#374 Implement the offset type: 'datetime' for incremental ingestion.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 18, 2024
1 parent d23cab2 commit 80a2336
Show file tree
Hide file tree
Showing 19 changed files with 423 additions and 1,143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,57 @@
package za.co.absa.pramen.api.offset

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType => SparkStringType}

import java.time.Instant

sealed trait OffsetValue extends Comparable[OffsetValue] {
def dataTypeString: String

def valueString: String

def getSparkLit: Column

def getSparkCol(col: Column): Column
}

object OffsetValue {
val LONG_TYPE_STR = "long"
val DATETIME_TYPE_STR = "datetime"
val INTEGRAL_TYPE_STR = "integral"
val STRING_TYPE_STR = "string"

case class LongType(value: Long) extends OffsetValue {
override val dataTypeString: String = LONG_TYPE_STR
val MINIMUM_TIMESTAMP_EPOCH_MILLI: Long = -62135596800000L

case class DateTimeType(t: Instant) extends OffsetValue {
override val dataTypeString: String = DATETIME_TYPE_STR

override def valueString: String = t.toEpochMilli.toString

override def getSparkLit: Column = lit(t.toEpochMilli)

override def getSparkCol(c: Column): Column = concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType)

override def compareTo(o: OffsetValue): Int = {
o match {
case DateTimeType(otherValue) => t.compareTo(otherValue)
case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}")
}
}
}

case class IntegralType(value: Long) extends OffsetValue {
override val dataTypeString: String = INTEGRAL_TYPE_STR

override def valueString: String = value.toString

override def getSparkLit: Column = lit(value)

override def getSparkCol(c: Column): Column = c.cast(LongType)

override def compareTo(o: OffsetValue): Int = {
o match {
case LongType(otherValue) => value.compareTo(otherValue)
case IntegralType(otherValue) => value.compareTo(otherValue)
case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}")
}
}
Expand All @@ -53,6 +80,8 @@ object OffsetValue {

override def getSparkLit: Column = lit(s)

override def getSparkCol(c: Column): Column = c.cast(SparkStringType)

override def compareTo(o: OffsetValue): Int = {
o match {
case StringType(otherValue) => s.compareTo(otherValue)
Expand All @@ -63,14 +92,16 @@ object OffsetValue {

def getMinimumForType(dataType: String): OffsetValue = {
dataType match {
case LONG_TYPE_STR => LongType(Long.MinValue)
case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(MINIMUM_TIMESTAMP_EPOCH_MILLI)) // LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli
case INTEGRAL_TYPE_STR => IntegralType(Long.MinValue)
case STRING_TYPE_STR => StringType("")
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
}

def fromString(dataType: String, value: String): OffsetValue = dataType match {
case LONG_TYPE_STR => LongType(value.toLong)
case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(value.toLong))
case INTEGRAL_TYPE_STR => IntegralType(value.toLong)
case STRING_TYPE_STR => StringType(value)
case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,50 @@

package za.co.absa.pramen.api.offset

import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.offset.OffsetValue.MINIMUM_TIMESTAMP_EPOCH_MILLI

import java.time.Instant

class OffsetValueSuite extends AnyWordSpec {
"OffsetValue" should {
"be able to create a LongType instance" in {
val offsetValue = OffsetValue.LongType(42)
assert(offsetValue.dataTypeString == "long")
"be able to create a DateTimeType instance" in {
val offsetValue = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == "1726564198000")
assert(offsetValue.getSparkLit == lit(1726564198000L))
assert(offsetValue.getSparkCol(col("a")) == concat(unix_timestamp(col("a")), date_format(col("a"), "SSS")).cast(LongType))
}

"be able to create a IntegralType instance" in {
val offsetValue = OffsetValue.IntegralType(42)
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == "42")
assert(offsetValue.getSparkLit == lit(42L))
assert(offsetValue.getSparkCol(col("a")) == col("a").cast(LongType))
}

"be able to create a StringType instance" in {
val offsetValue = OffsetValue.StringType("foo")
assert(offsetValue.dataTypeString == "string")
assert(offsetValue.valueString == "foo")
assert(offsetValue.getSparkLit == lit("foo"))
assert(offsetValue.getSparkCol(col("a")) == col("a").cast(StringType))
}
}

"getMinimumForType" should {
"be able to get minimum value for long type" in {
val offsetValue = OffsetValue.getMinimumForType("long")
assert(offsetValue.dataTypeString == "long")
"be able to get minimum value for datetime type" in {
val offsetValue = OffsetValue.getMinimumForType("datetime")
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == MINIMUM_TIMESTAMP_EPOCH_MILLI.toString)
}

"be able to get minimum value for integral type" in {
val offsetValue = OffsetValue.getMinimumForType("integral")
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == Long.MinValue.toString)
}

Expand All @@ -57,9 +77,15 @@ class OffsetValueSuite extends AnyWordSpec {
}

"fromString" should {
"be able to create a LongType instance from a string" in {
val offsetValue = OffsetValue.fromString("long", "42")
assert(offsetValue.dataTypeString == "long")
"be able to create a DateTimeType instance from a string" in {
val offsetValue = OffsetValue.fromString("datetime", "1726552310000")
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == "1726552310000")
}

"be able to create a IntegralType instance from a string" in {
val offsetValue = OffsetValue.fromString("integral", "42")
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == "42")
}

Expand All @@ -76,4 +102,59 @@ class OffsetValueSuite extends AnyWordSpec {
}
}

"compareTo" should {
"compare 2 datetime values" in {
val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198001L))

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare a datetime value with value of some other type" in {
val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
val offsetValue2 = OffsetValue.IntegralType(42)

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}

"compare 2 integral values" in {
val offsetValue1 = OffsetValue.IntegralType(42)
val offsetValue2 = OffsetValue.IntegralType(43)

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare an integral value with value of some other type" in {
val offsetValue1 = OffsetValue.IntegralType(42)
val offsetValue2 = OffsetValue.StringType("foo")

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}

"compare 2 string values" in {
val offsetValue1 = OffsetValue.StringType("bar")
val offsetValue2 = OffsetValue.StringType("foo")

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare a string value with value of some other type" in {
val offsetValue1 = OffsetValue.StringType("foo")
val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}
}
}
2 changes: 1 addition & 1 deletion pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ val scala213 = "2.13.13"

ThisBuild / organization := "za.co.absa.pramen"

ThisBuild / scalaVersion := scala211
ThisBuild / scalaVersion := scala212
ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213)

ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object MetastorePersistence {
)
case DataFormat.Delta(query, recordsPerPartition) =>
new MetastorePersistenceDelta(
query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
)
case DataFormat.Raw(path) =>
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.util.Try
class MetastorePersistenceDelta(query: Query,
infoDateColumn: String,
infoDateFormat: String,
batchIdColumn: String,
batchId: Long,
recordsPerPartition: Option[Long],
saveModeOpt: Option[SaveMode],
readOptions: Map[String, String],
Expand Down Expand Up @@ -66,23 +68,22 @@ class MetastorePersistenceDelta(query: Query,

val whereCondition = s"$infoDateColumn='$infoDateStr'"

val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
}
val dfRepartitioned = if (recordsPerPartition.nonEmpty) {
val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
}

val dfRepartitioned = applyRepartitioning(dfIn, recordCount)
applyRepartitioning(dfIn, recordCount)
} else {
dfIn
}

val saveMode = saveModeOpt.getOrElse(SaveMode.Overwrite)

val operationStr = saveMode match {
case SaveMode.Append => "Appending to"
case _ => "Writing to"
}

val isAppend = saveMode match {
case SaveMode.Append => true
case _ => false
val (isAppend, operationStr) = saveMode match {
case SaveMode.Append => (true, "Appending to")
case _ => (false, "Writing to")
}

if (log.isDebugEnabled) {
Expand Down Expand Up @@ -116,16 +117,29 @@ class MetastorePersistenceDelta(query: Query,
val stats = getStats(infoDate, isAppend)

stats.dataSizeBytes match {
case Some(size) => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records (${StringUtils.prettySize(size)}) to ${query.query}")
case None => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}")
case Some(size) =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " +
s"new size: ${StringUtils.prettySize(size)}) to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
s"(${StringUtils.prettySize(size)}) to ${query.query}")
}
case None =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}")
}
}

stats
}

override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = {
val df = loadTable(Option(infoDate), Option(infoDate))
val recordCount = df.count()

val sizeOpt = query match {
case Query.Path(path) =>
Expand All @@ -142,7 +156,16 @@ class MetastorePersistenceDelta(query: Query,
None
}

MetaTableStats(recordCount, None, sizeOpt)
if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) {
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

MetaTableStats(countAll, Option(batchCount), sizeOpt)
} else {
val countAll = df.count()

MetaTableStats(countAll, None, sizeOpt)
}
}

override def createOrUpdateHiveTable(infoDate: LocalDate,
Expand Down
Loading

0 comments on commit 80a2336

Please sign in to comment.