Skip to content

Commit

Permalink
#374 Fix a scenario when uncommitted offsets are not properly handled.
Browse files Browse the repository at this point in the history
This is when the input table does not have an information date field, and uncommitted offsets are old. Then they wasn't checked.
  • Loading branch information
yruslan committed Oct 9, 2024
1 parent 30ee28b commit d44a8db
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.pramen.core.bookkeeper

import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}

Expand Down Expand Up @@ -43,6 +44,11 @@ trait OffsetManager {
*/
def getOffsets(table: String, infoDate: LocalDate): Array[DataOffset]

/**
* Returns only uncommitted offsets for a give table.
*/
def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset]

/**
* Returns the maximum information date the bookkeeping has offsets for.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.bookkeeper

import org.slf4j.LoggerFactory
import slick.jdbc.H2Profile.api._
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{DataOffset, OffsetType, OffsetValue}
import za.co.absa.pramen.core.bookkeeper.model._
import za.co.absa.pramen.core.utils.SlickUtils
Expand All @@ -40,6 +41,24 @@ class OffsetManagerJdbc(db: Database, batchId: Long) extends OffsetManager {
offsets.map(OffsetRecordConverter.toDataOffset)
}

override def getUncommittedOffsets(table: String, onlyForInfoDate: Option[LocalDate]): Array[UncommittedOffset] = {
val query = onlyForInfoDate match {
case Some(infoDate) =>
val infoDateStr = infoDate.toString
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.infoDate === infoDateStr && r.committedAt.isEmpty)
.sorted(r => r.infoDate)
case None =>
OffsetRecords.records
.filter(r => r.pramenTableName === table && r.committedAt.isEmpty)
.sorted(r => r.infoDate)
}

SlickUtils.executeQuery[OffsetRecords, OffsetRecord](db, query)
.toArray[OffsetRecord]
.map(record => OffsetRecordConverter.toDataOffset(record).asInstanceOf[UncommittedOffset])
}

override def getMaxInfoDateAndOffset(table: String, onlyForInfoDate: Option[LocalDate]): Option[DataOffsetAggregated] = {
val maxInfoDateOpt = onlyForInfoDate.orElse(getMaximumInfoDate(table))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class OffsetRecords(tag: Tag) extends Table[OffsetRecord](tag, "offsets") {
def committedAt = column[Option[Long]]("committed_at")
def * = (pramenTableName, infoDate, dataType, minOffset, maxOffset, batchId, createdAt, committedAt) <> (OffsetRecord.tupled, OffsetRecord.unapply)
def idx1 = index("offset_idx_1", (pramenTableName, infoDate, createdAt), unique = true)
def idx2 = index("offset_idx_2", (pramenTableName, committedAt), unique = false)
}

object OffsetRecords {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,28 @@ class IncrementalIngestionJob(operationDef: OperationDef,
override def trackDays: Int = 0

override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
if (source.getOffsetInfo.isEmpty) {
return throw new IllegalArgumentException(s"Offset column is not configured for source '$sourceName' of '${operationDef.name}'")
}

val om = bookkeeper.getOffsetManager
latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None)

val uncommittedOffsets = om.getOffsets(outputTable.name, infoDate)
.filter(!_.isCommitted)
.map(_.asInstanceOf[UncommittedOffset])
val onlyForInfoDate = if (source.hasInfoDateColumn(sourceTable.query))
Some(infoDate)
else
None

val uncommittedOffsets = om.getUncommittedOffsets(outputTable.name, onlyForInfoDate)

if (uncommittedOffsets.nonEmpty) {
log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...")
handleUncommittedOffsets(om, metastore, infoDate, uncommittedOffsets)
if (onlyForInfoDate.isEmpty) {
log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...")
} else {
log.warn(s"Found uncommitted offsets for ${outputTable.name}. Fixing...")
}

handleUncommittedOffsets(om, metastore, uncommittedOffsets)
}

JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil)
Expand Down Expand Up @@ -222,19 +234,31 @@ class IncrementalIngestionJob(operationDef: OperationDef,
SaveResult(stats, warnings = tooLongWarnings)
}

private[core] def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, infoDate: LocalDate, uncommittedOffsets: Array[UncommittedOffset]): Unit = {
private[core] def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, uncommittedOffsets: Array[UncommittedOffset]): Unit = {
import za.co.absa.pramen.core.utils.DateUtils._

val offsetInfo = source.getOffsetInfo.getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " +
s"query: '${sourceTable.query.query}''"))

val infoDates = uncommittedOffsets.map(_.infoDate).distinct.sorted

infoDates.foreach { infoDate =>
handleUncommittedOffsetsForDay(om, mt, uncommittedOffsets.filter(_.infoDate == infoDate), infoDate, offsetInfo)
}
}

private[core] def handleUncommittedOffsetsForDay(om: OffsetManager, mt: Metastore, uncommittedOffsets: Array[UncommittedOffset], infoDate: LocalDate, offsetInfo: OffsetInfo): Unit = {
val df = try {
mt.getTable(outputTable.name, Option(infoDate), Option(infoDate))
} catch {
case ex: AnalysisException =>
case _: AnalysisException =>
log.warn(s"Table ${outputTable.name} has empty partition for $infoDate. Rolling back uncommitted offsets..")
rollbackOffsets(infoDate, om, uncommittedOffsets)
return
}

if (df.isEmpty) {
log.warn(s"Table ${outputTable.name} is empty for $infoDate. Rolling back uncommitted offsets...")
rollbackOffsets(infoDate, om, uncommittedOffsets)
return
}
Expand All @@ -243,12 +267,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.")
}

val (newMinOffset, newMaxOffset) = if (df.isEmpty) {
rollbackOffsets(infoDate, om, uncommittedOffsets)
return
} else {
getMinMaxOffsetFromDf(df, offsetInfo)
}
val (newMinOffset, newMaxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)

log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " +
s"min offset: ${newMinOffset.valueString}, max offset: ${newMaxOffset.valueString}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class IncrementalPipelineDeltaLongSuite extends IncrementalPipelineLongFixture {
testOffsetOnlyRunningOutOfOrderOffsets(format)
}

"work for uncommitted late offsets" in {
testOffsetOnlyUncommittedLateOffsets(format)
}

"work with incremental ingestion and normal transformer" in {
testOffsetOnlyIncrementalIngestionNormalTransformer(format)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.integration

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.TimestampType
import org.scalatest.wordspec.AnyWordSpec
Expand Down Expand Up @@ -233,6 +234,110 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
succeed
}

def testOffsetOnlyUncommittedLateOffsets(metastoreFormat: String): Assertion = {
withTempDirectory("incremental1") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n")

val conf = getConfig(tempDir, metastoreFormat, useInfoDate = infoDate.minusDays(2))

// Running a job that ingests initial data for offsets 1..3 at 2021-02-16
val exitCode1 = AppRunner.runPipeline(conf)
assert(exitCode1 == 0)

// Adding an uncommitted offset for 2021-02-17
val om = new OffsetManagerJdbc(pramenDb.db, 123L)
om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetType.IntegralType)
Thread.sleep(10)

val table1Path = new Path(tempDir, "table1")
val table2Path = new Path(tempDir, "table2")

fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n")

// Writing data for the metastore table for the uncommitted offset at 2021-02-17
val dfIn = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(path2.toString)
.withColumn(INFO_DATE_COLUMN, lit(Date.valueOf(infoDate.minusDays(1))))

dfIn.write
.mode(SaveMode.Append)
.partitionBy(INFO_DATE_COLUMN)
.format(metastoreFormat)
.save(table1Path.toString)

val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(2)))
val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(2)))
val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actualTable1Before, expectedOffsetOnly1)
compareText(actualTable2Before, expectedOffsetOnly1)

// Checking expected offset table
val offsetsBefore0 = om.getOffsets("table1", infoDate.minusDays(2))
val offsetsBefore1 = om.getOffsets("table1", infoDate.minusDays(1))
val offsetsBefore2 = om.getOffsets("table1", infoDate)

assert(offsetsBefore0.length == 1)
assert(offsetsBefore1.length == 1)
assert(offsetsBefore2.isEmpty)

assert(offsetsBefore0.head.isCommitted)
assert(!offsetsBefore1.head.isCommitted)
assert(offsetsBefore0.head.asInstanceOf[CommittedOffset].minOffset.valueString == "1")
assert(offsetsBefore0.head.asInstanceOf[CommittedOffset].maxOffset.valueString == "3")

// Running the job for 2021-02-18. It should reconcile uncommitted offsets and return with 'No data' since no data arrived since 2021-02-17
val conf2 = getConfig(tempDir, metastoreFormat, useInfoDate = infoDate)
val exitCode2 = AppRunner.runPipeline(conf2)
assert(exitCode2 == 2)

// Checking expected offset table
val offsetsAfter0 = om.getOffsets("table1", infoDate.minusDays(2))
val offsetsAfter1 = om.getOffsets("table1", infoDate.minusDays(1))
val offsetsAfter2 = om.getOffsets("table1", infoDate)

assert(offsetsAfter0.length == 1)
assert(offsetsAfter1.length == 1)
assert(offsetsAfter2.isEmpty)

assert(offsetsAfter0.head.isCommitted)
assert(offsetsAfter1.head.isCommitted)
assert(offsetsAfter0.head.asInstanceOf[CommittedOffset].minOffset.valueString == "1")
assert(offsetsAfter0.head.asInstanceOf[CommittedOffset].maxOffset.valueString == "3")
assert(offsetsAfter1.head.asInstanceOf[CommittedOffset].minOffset.valueString == "4")
assert(offsetsAfter1.head.asInstanceOf[CommittedOffset].maxOffset.valueString == "6")

// Checking expected data
val dfTable1After0 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(2)))
val dfTable2After0 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(2)))
val dfTable1After1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(1)))
val dfTable2After1 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(1)))
val dfTable1After2 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
val dfTable2After2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
val actualTable1After0 = dfTable1After0.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
val actualTable2After0 = dfTable2After0.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
/* This will be enabled when incremental processing is implemented.
val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")*/

// Expecting empty records
compareText(actualTable1After0, expectedOffsetOnly1)
compareText(actualTable2After0, expectedOffsetOnly1)
compareText(actualTable1After1, expectedOffsetOnly2)
/*compareText(actualTable2After1, expectedOffsetOnly2) This will be enabled when incremental processing is implemented. */
assert(dfTable1After2.isEmpty)
assert(dfTable2After2.isEmpty)
}
succeed
}

def testOffsetOnlyIncrementalIngestionNormalTransformer(metastoreFormat: String): Assertion = {
withTempDirectory("incremental1") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class IncrementalPipelineParquetLongSuite extends IncrementalPipelineLongFixture
testOffsetOnlyRunningOutOfOrderOffsets(format)
}

"work for uncommitted late offsets" in {
testOffsetOnlyUncommittedLateOffsets(format)
}

"work with incremental ingestion and normal transformer" in {
testOffsetOnlyIncrementalIngestionNormalTransformer(format)
}
Expand Down

0 comments on commit d44a8db

Please sign in to comment.