Skip to content

Commit

Permalink
#374 Add support for incremental ingestion into Delta metastore tables.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 18, 2024
1 parent 5f5d7d6 commit 98a9072
Show file tree
Hide file tree
Showing 13 changed files with 1,418 additions and 1,132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package za.co.absa.pramen.api.offset

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType => SparkStringType}
import org.apache.spark.sql.{Column, functions}

import java.time.Instant

Expand Down Expand Up @@ -46,7 +46,7 @@ object OffsetValue {

override def getSparkLit: Column = lit(t.toEpochMilli)

override def getSparkCol(c: Column): Column = functions.concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType)
override def getSparkCol(c: Column): Column = concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType)

override def compareTo(o: OffsetValue): Int = {
o match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,44 @@

package za.co.absa.pramen.api.offset

import org.apache.spark.sql.functions.{from_unixtime, lit}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.offset.OffsetValue.MINIMUM_TIMESTAMP_EPOCH_MILLI

import java.time.Instant

class OffsetValueSuite extends AnyWordSpec {
"OffsetValue" should {
"be able to create a TimestampType instance" in {
"be able to create a DateTimeType instance" in {
val offsetValue = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
assert(offsetValue.dataTypeString == "timestamp")
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == "1726564198000")
assert(offsetValue.getSparkLit == from_unixtime(lit(1726564198000L)))
assert(offsetValue.getSparkLit == lit(1726564198000L))
assert(offsetValue.getSparkCol(col("a")) == concat(unix_timestamp(col("a")), date_format(col("a"), "SSS")).cast(LongType))
}

"be able to create a IntegralType instance" in {
val offsetValue = OffsetValue.IntegralType(42)
assert(offsetValue.dataTypeString == "integral")
assert(offsetValue.valueString == "42")
assert(offsetValue.getSparkLit == lit(42L))
assert(offsetValue.getSparkCol(col("a")) == col("a").cast(LongType))
}

"be able to create a StringType instance" in {
val offsetValue = OffsetValue.StringType("foo")
assert(offsetValue.dataTypeString == "string")
assert(offsetValue.valueString == "foo")
assert(offsetValue.getSparkLit == lit("foo"))
assert(offsetValue.getSparkCol(col("a")) == col("a").cast(StringType))
}
}

"getMinimumForType" should {
"be able to get minimum value for timestamp type" in {
val offsetValue = OffsetValue.getMinimumForType("timestamp")
assert(offsetValue.dataTypeString == "timestamp")
"be able to get minimum value for datetime type" in {
val offsetValue = OffsetValue.getMinimumForType("datetime")
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == MINIMUM_TIMESTAMP_EPOCH_MILLI.toString)
}

Expand All @@ -73,6 +77,12 @@ class OffsetValueSuite extends AnyWordSpec {
}

"fromString" should {
"be able to create a DateTimeType instance from a string" in {
val offsetValue = OffsetValue.fromString("datetime", "1726552310000")
assert(offsetValue.dataTypeString == "datetime")
assert(offsetValue.valueString == "1726552310000")
}

"be able to create a IntegralType instance from a string" in {
val offsetValue = OffsetValue.fromString("integral", "42")
assert(offsetValue.dataTypeString == "integral")
Expand All @@ -92,4 +102,59 @@ class OffsetValueSuite extends AnyWordSpec {
}
}

"compareTo" should {
"compare 2 datetime values" in {
val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198001L))

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare a datetime value with value of some other type" in {
val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))
val offsetValue2 = OffsetValue.IntegralType(42)

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}

"compare 2 integral values" in {
val offsetValue1 = OffsetValue.IntegralType(42)
val offsetValue2 = OffsetValue.IntegralType(43)

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare an integral value with value of some other type" in {
val offsetValue1 = OffsetValue.IntegralType(42)
val offsetValue2 = OffsetValue.StringType("foo")

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}

"compare 2 string values" in {
val offsetValue1 = OffsetValue.StringType("bar")
val offsetValue2 = OffsetValue.StringType("foo")

assert(offsetValue1.compareTo(offsetValue2) < 0)
assert(offsetValue2.compareTo(offsetValue1) > 0)
assert(offsetValue2.compareTo(offsetValue2) == 0)
}

"throw an exception when attempting to compare a string value with value of some other type" in {
val offsetValue1 = OffsetValue.StringType("foo")
val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L))

assertThrows[IllegalArgumentException] {
offsetValue1.compareTo(offsetValue2)
}
}
}
}
7 changes: 1 addition & 6 deletions pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@
* limitations under the License.
*/

import Dependencies._
import Versions._
import BuildInfoTemplateSettings._
import com.github.sbt.jacoco.report.JacocoReportSettings

val scala211 = "2.11.12"
val scala212 = "2.12.19"
val scala213 = "2.13.13"

ThisBuild / organization := "za.co.absa.pramen"

ThisBuild / scalaVersion := scala211
ThisBuild / scalaVersion := scala212
ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213)

ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object MetastorePersistence {
)
case DataFormat.Delta(query, recordsPerPartition) =>
new MetastorePersistenceDelta(
query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
)
case DataFormat.Raw(path) =>
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.util.Try
class MetastorePersistenceDelta(query: Query,
infoDateColumn: String,
infoDateFormat: String,
batchIdColumn: String,
batchId: Long,
recordsPerPartition: Option[Long],
saveModeOpt: Option[SaveMode],
readOptions: Map[String, String],
Expand Down Expand Up @@ -66,23 +68,22 @@ class MetastorePersistenceDelta(query: Query,

val whereCondition = s"$infoDateColumn='$infoDateStr'"

val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
}
val dfRepartitioned = if (recordsPerPartition.nonEmpty) {
val recordCount = numberOfRecordsEstimate match {
case Some(count) => count
case None => dfIn.count()
}

val dfRepartitioned = applyRepartitioning(dfIn, recordCount)
applyRepartitioning(dfIn, recordCount)
} else {
dfIn
}

val saveMode = saveModeOpt.getOrElse(SaveMode.Overwrite)

val operationStr = saveMode match {
case SaveMode.Append => "Appending to"
case _ => "Writing to"
}

val isAppend = saveMode match {
case SaveMode.Append => true
case _ => false
val (isAppend, operationStr) = saveMode match {
case SaveMode.Append => (true, "Appending to")
case _ => (false, "Writing to")
}

if (log.isDebugEnabled) {
Expand Down Expand Up @@ -116,16 +117,29 @@ class MetastorePersistenceDelta(query: Query,
val stats = getStats(infoDate, isAppend)

stats.dataSizeBytes match {
case Some(size) => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records (${StringUtils.prettySize(size)}) to ${query.query}")
case None => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}")
case Some(size) =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " +
s"new size: ${StringUtils.prettySize(size)}) to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " +
s"(${StringUtils.prettySize(size)}) to ${query.query}")
}
case None =>
stats.recordCountAppended match {
case Some(recordsAppended) =>
log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}")
case None =>
log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}")
}
}

stats
}

override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = {
val df = loadTable(Option(infoDate), Option(infoDate))
val recordCount = df.count()

val sizeOpt = query match {
case Query.Path(path) =>
Expand All @@ -142,7 +156,16 @@ class MetastorePersistenceDelta(query: Query,
None
}

MetaTableStats(recordCount, None, sizeOpt)
if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) {
val batchCount = df.filter(col(batchIdColumn) === batchId).count()
val countAll = df.count()

MetaTableStats(countAll, Option(batchCount), sizeOpt)
} else {
val countAll = df.count()

MetaTableStats(countAll, None, sizeOpt)
}
}

override def createOrUpdateHiveTable(infoDate: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,15 @@ class IncrementalIngestionJob(operationDef: OperationDef,
mt.getTable(outputTable.name, Option(infoDate), Option(infoDate))
} catch {
case ex: AnalysisException =>
log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...", ex)

uncommittedOffsets.foreach { of =>
log.warn(s"Cleaning uncommitted offset: $of...")
om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt))
}

latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
rollbackOffsets(infoDate, om, uncommittedOffsets)
return
}

if (df.isEmpty) {
rollbackOffsets(infoDate, om, uncommittedOffsets)
return
}

if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) {
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.")
}
Expand All @@ -116,6 +114,17 @@ class IncrementalIngestionJob(operationDef: OperationDef,
latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
}

private[core] def rollbackOffsets(infoDate: LocalDate, om: OffsetManager, uncommittedOffsets: Array[DataOffset]): Unit = {
log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...")

uncommittedOffsets.foreach { of =>
log.warn(s"Cleaning uncommitted offset: $of...")
om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt))
}

latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
if (source.getOffsetInfo(sourceTable.query).nonEmpty) {
Reason.Ready
Expand Down

This file was deleted.

Loading

0 comments on commit 98a9072

Please sign in to comment.