Skip to content

Commit

Permalink
#520 Add unit tests for the incremental processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 25, 2024
1 parent b8b85b1 commit 53acd43
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ class MetastoreImpl(appConfig: Config,
throw new IllegalArgumentException(s"Table '$tableName' does not contain column '${tableDef.batchIdColumn}' needed for incremental processing.")
}

// ToDo Handle uncommitted offsets

val offsets = om.getMaxInfoDateAndOffset(trackingName, Option(infoDate))

val df = offsets match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class TransformationJob(operationDef: OperationDef,
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
val isTransitive = outputTable.format.isTransient
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = !isTransitive)
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false)
val runResult = try {
RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ abstract class TaskRunnerBase(conf: Config,
dfWithTimestamp.withColumn(task.job.outputTable.infoDateColumn, lit(Date.valueOf(task.infoDate)))
}

val needAddBatchId = runtimeConfig.alwaysAddBatchIdColumn || task.job.operation.schedule == Schedule.Incremental
val needAddBatchId = (runtimeConfig.alwaysAddBatchIdColumn || task.job.operation.schedule == Schedule.Incremental) && !task.job.outputTable.format.isInstanceOf[DataFormat.Raw]

val dfWithBatchIdColumn = if (needAddBatchId) {
val batchIdColumn = task.job.outputTable.batchIdColumn
Expand All @@ -366,16 +366,16 @@ abstract class TaskRunnerBase(conf: Config,
dfWithInfoDate
}

val postProcessed = task.job.postProcessing(dfWithBatchIdColumn, task.infoDate, conf)

val dfTransformed = applyFilters(
applyTransformations(dfWithBatchIdColumn, task.job.operation.schemaTransformations),
applyTransformations(postProcessed, task.job.operation.schemaTransformations),
task.job.operation.filters,
task.infoDate,
task.infoDate,
task.infoDate
)

val postProcessed = task.job.postProcessing(dfTransformed, task.infoDate, conf)

val schemaChangesAfterTransform = if (task.job.operation.schemaTransformations.nonEmpty) {
val transformedTable = task.job.outputTable.copy(name = s"${task.job.outputTable.name}_transformed")
handleSchemaChange(dfTransformed, transformedTable, task.infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pramen.operations = [
{
name = "Running a transformer"
type = "transformation"
disabled = ${transformer.disabled}

class = "za.co.absa.pramen.core.transformers.IdentityTransformer"
schedule.type = ${transformer.schedule}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,9 @@ class IncrementalPipelineDeltaLongSuite extends IncrementalPipelineLongFixture {
"offsets cross info days" in {
testOffsetCrossInfoDateEdgeCase(format)
}

"transformer picks up doubly ingested offsets" in {
testTransformerPicksUpFromDoubleIngestedData(format)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actualTable1, expectedOffsetOnlyAll)
compareText(actualTable2, expectedOffsetOnly2) // ToDo This logic is to be changed when incremental transformations are supported
compareText(actualTable2, expectedOffsetOnlyAll)

val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect()

Expand Down Expand Up @@ -1207,11 +1207,84 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
succeed
}

def testTransformerPicksUpFromDoubleIngestedData(metastoreFormat: String): Assertion = {
val csv1DataStr = s"id,name,info_date\n1,John,$infoDate\n2,Jack,$infoDate\n"
val csv2DataStr = s"id,name,info_date\n3,Jill,$infoDate\n4,Mary,$infoDate\n"
val csv3DataStr = s"id,name,info_date\n5,Jane,$infoDate\n6,Kate,$infoDate\n"

val expectedStr1: String =
"""{"id":1,"name":"John"}
|{"id":2,"name":"Jack"}
|""".stripMargin

val expectedStr2: String =
"""{"id":1,"name":"John"}
|{"id":2,"name":"Jack"}
|{"id":3,"name":"Jill"}
|{"id":4,"name":"Mary"}
|""".stripMargin

withTempDirectory("incremental1") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv"))
val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv"))
val path3 = new Path(tempDir, new Path("landing", "landing_file3.csv"))

val table1Path = new Path(tempDir, "table1")
val table2Path = new Path(tempDir, "table2")

fsUtils.writeFile(path1, csv1DataStr)
val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
val exitCode1 = AppRunner.runPipeline(conf1)
assert(exitCode1 == 0)

fsUtils.writeFile(path2, csv2DataStr)
val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema, isTransformerDisabled = true)
val exitCode2 = AppRunner.runPipeline(conf2)
assert(exitCode2 == 0)

val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actualTable1Before, expectedStr2)
compareText(actualTable2Before, expectedStr1)

fsUtils.writeFile(path3, csv3DataStr)

val conf3 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, inferSchema = false, csvSchema = csvWithInfoDateSchema)
val exitCode3 = AppRunner.runPipeline(conf3)
assert(exitCode3 == 0)

val dfTable1After = spark.read.parquet(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))
val dfTable2After = spark.read.parquet(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate))

val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect()

assert(batchIds.length == 3)

val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")
val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n")

compareText(actualTable1After, expectedWithInfoDateAll)
compareText(actualTable2After, expectedWithInfoDateAll)

val om = new OffsetManagerJdbc(pramenDb.db, 123L)

val offsets = om.getOffsets("table1->table2", infoDate).map(_.asInstanceOf[CommittedOffset])
assert(offsets.length == 2)
}
succeed
}

def getConfig(basePath: String,
metastoreFormat: String,
isRerun: Boolean = false,
useDataFrame: Boolean = false,
isTransformerIncremental: Boolean = true,
isTransformerDisabled: Boolean = false,
isHistoricalRun: Boolean = false,
historyRunMode: String = "force",
inferSchema: Boolean = true,
Expand All @@ -1237,6 +1310,7 @@ class IncrementalPipelineLongFixture extends AnyWordSpec
|pramen.runtime.is.rerun = $isRerun
|pramen.current.date = "$useInfoDate"
|transformer.schedule = "$transformerSchedule"
|transformer.disabled = "$isTransformerDisabled"
|infer.schema = $inferSchema
|$historicalConfigStr
|has.information.date.column = $hasInfoDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,9 @@ class IncrementalPipelineParquetLongSuite extends IncrementalPipelineLongFixture
"offsets cross info days" in {
testOffsetCrossInfoDateEdgeCase(format)
}

"transformer picks up doubly ingested offsets" in {
testTransformerPicksUpFromDoubleIngestedData(format)
}
}
}

0 comments on commit 53acd43

Please sign in to comment.