diff --git a/pramen/core/src/main/resources/reference.conf b/pramen/core/src/main/resources/reference.conf index bf79f7d6d..4a7112bb1 100644 --- a/pramen/core/src/main/resources/reference.conf +++ b/pramen/core/src/main/resources/reference.conf @@ -104,6 +104,13 @@ pramen { # If this is set the current date will overridden by the specified value. #current.date = + # Optionally, you can specify which dates to run historical pipeline for a date range: + #load.date.from = "2022-01-01" + #load.date.to = "2022-01-15" + + # Specify one of run modes for historical run: fill_gaps, check_updates (default), force. + #runtime.run.mode = force + #spark.conf = { # Pass arbitrary Spark Configuration when initializing Spark Session diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 46bbaf185..0a6d06404 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -18,13 +18,13 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.StringType -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} -import za.co.absa.pramen.api.offset.OffsetValue +import org.apache.spark.sql.types.{IntegerType, LongType, ShortType, StringType} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} +import za.co.absa.pramen.api.offset.{DataOffset, OffsetInfo, OffsetValue} import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} import za.co.absa.pramen.api.{Reason, Source} -import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated +import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} +import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager} import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} @@ -36,7 +36,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, metastore: Metastore, bookkeeper: Bookkeeper, notificationTargets: Seq[JobNotificationTarget], - latestOffset: Option[DataOffsetAggregated], + latestOffsetIn: Option[DataOffsetAggregated], batchId: Long, sourceName: String, source: Source, @@ -46,28 +46,76 @@ class IncrementalIngestionJob(operationDef: OperationDef, (implicit spark: SparkSession) extends IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, sourceTable, outputTable, specialCharacters, None, false) { + private var latestOffset = latestOffsetIn + override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset, source.hasInfoDateColumn(sourceTable.query)) override def trackDays: Int = 0 override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { + val om = bookkeeper.getOffsetManager + + val uncommittedOffsets = om.getOffsets(outputTable.name, infoDate).filter(_.committedAt.isEmpty) + + if (uncommittedOffsets.nonEmpty) { + log.warn(s"Found uncommitted offsets for ${outputTable.name} at $infoDate. Fixing...") + handleUncommittedOffsets(om, metastore, infoDate, uncommittedOffsets) + } + val hasInfoDateColumn = source.hasInfoDateColumn(sourceTable.query) if (hasInfoDateColumn && runReason == TaskRunReason.Rerun) { super.preRunCheckJob(infoDate, runReason, jobConfig, dependencyWarnings) } else { - latestOffset match { - case Some(offset) => - if (offset.maximumInfoDate.isAfter(infoDate) && !hasInfoDateColumn) { - JobPreRunResult(JobPreRunStatus.Skip("Retrospective runs are not allowed yet"), None, dependencyWarnings, Nil) - } else { - JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) - } - case None => - JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) - } + JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Nil) } } + private def handleUncommittedOffsets(om: OffsetManager, mt: Metastore, infoDate: LocalDate, uncommittedOffsets: Array[DataOffset]): Unit = { + val minOffset = uncommittedOffsets.map(_.minOffset).min + + val offsetInfo = source.getOffsetInfo(sourceTable.query).getOrElse(throw new IllegalArgumentException(s"Offset column not defined for the ingestion job '${operationDef.name}', " + + s"query: '${sourceTable.query.query}''")) + + val df = try { + mt.getTable(outputTable.name, Option(infoDate), Option(infoDate)) + } catch { + case ex: AnalysisException => + log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...", ex) + + uncommittedOffsets.foreach { of => + log.warn(s"Cleaning uncommitted offset: $of...") + om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) + } + + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + return + } + + if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { + throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.") + } + + val newMaxOffset = if (df.isEmpty) { + minOffset + } else { + val row = df.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0) + OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String]) + } + + log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " + + s"min offset: ${minOffset.valueString}, max offset: ${newMaxOffset.valueString}.") + + val req = om.startWriteOffsets(outputTable.name, infoDate, minOffset) + om.commitOffsets(req, newMaxOffset) + + uncommittedOffsets.foreach { of => + log.warn(s"Cleaning uncommitted offset: $of...") + om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) + } + + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + } + override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { if (source.getOffsetInfo(sourceTable.query).nonEmpty) { Reason.Ready @@ -116,6 +164,8 @@ class IncrementalIngestionJob(operationDef: OperationDef, throw new IllegalArgumentException(s"Offset type is not configured for the source '$sourceName' outputting to '${outputTable.name}''") ) + validateOffsetColumn(df, offsetInfo) + val minimumOffset = latestOffset.map(_.maximumOffset).getOrElse(offsetInfo.minimalOffset) val req = om.startWriteOffsets(outputTable.name, infoDate, minimumOffset) @@ -171,4 +221,25 @@ class IncrementalIngestionJob(operationDef: OperationDef, SaveResult(stats, warnings = tooLongWarnings) } + + private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = { + if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { + throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.") + } + + val field = df.schema.fields.find(_.name.equalsIgnoreCase(offsetInfo.offsetColumn)).get + + offsetInfo.minimalOffset match { + case v: OffsetValue.LongType => + if (!field.dataType.isInstanceOf[ShortType] && !field.dataType.isInstanceOf[IntegerType] && !field.dataType.isInstanceOf[LongType]) { + throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " + + s"But only integral types are supported for offset type '${v.dataTypeString}'.") + } + case v: OffsetValue.StringType => + if (!field.dataType.isInstanceOf[StringType]) { + throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " + + s"But only string type is supported for offset type '${v.dataTypeString}'.") + } + } + } } diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 9f4443766..6ba0a5e40 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -58,6 +58,7 @@ pramen.sources.1 = [ option { header = true + inferSchema = ${infer.schema} } } ] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala similarity index 50% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala rename to pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala index db53e8366..2378c0a9b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala @@ -18,9 +18,12 @@ package za.co.absa.pramen.core.integration import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.fs.Path +import org.apache.spark.sql.functions.lit import org.scalatest.wordspec.AnyWordSpec import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import za.co.absa.pramen.api.offset.OffsetValue import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.bookkeeper.OffsetManagerJdbc import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.core.rdb.PramenDb import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl @@ -30,7 +33,7 @@ import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils} import java.time.LocalDate -class IncrementalPipelineSuite extends AnyWordSpec +class IncrementalPipelineLongSuite extends AnyWordSpec with SparkTestBase with RelationalDbFixture with BeforeAndAfter @@ -55,18 +58,24 @@ class IncrementalPipelineSuite extends AnyWordSpec "For inputs without information date the pipeline" should { val expected1 = - """{"id":"1","name":"John"} - |{"id":"2","name":"Jack"} - |{"id":"3","name":"Jill"} + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} |""".stripMargin val expected2 = - """{"id":"1","name":"John"} - |{"id":"2","name":"Jack"} - |{"id":"3","name":"Jill"} - |{"id":"4","name":"Mary"} - |{"id":"5","name":"Jane"} - |{"id":"6","name":"Kate"} + """{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} + |""".stripMargin + + val expectedAll = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} |""".stripMargin "work end to end as a normal run" in { @@ -108,8 +117,8 @@ class IncrementalPipelineSuite extends AnyWordSpec val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - compareText(actualTable1After, expected2) - compareText(actualTable2After, expected2) + compareText(actualTable1After, expectedAll) + compareText(actualTable2After, expectedAll) } } @@ -152,8 +161,8 @@ class IncrementalPipelineSuite extends AnyWordSpec val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - compareText(actualTable1After, expected2) - compareText(actualTable2After, expected2) + compareText(actualTable1After, expectedAll) + compareText(actualTable2After, expectedAll) } } @@ -298,12 +307,263 @@ class IncrementalPipelineSuite extends AnyWordSpec } } - "fail to run for a historical date range" in { + "run for a historical date range with force update" in { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf1 = getConfig(tempDir) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + val dfTable1Before = spark.read.parquet(table1Path1.toString) + val dfTable2Before = spark.read.parquet(table2Path1.toString) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expected1) + compareText(actualTable2Before, expected1) + + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, isHistoricalRun = true, useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") + val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") + val dfTable1After1 = spark.read.parquet(table1Path1.toString) + val dfTable2After1 = spark.read.parquet(table2Path1.toString) + val dfTable1After2 = spark.read.parquet(table1Path2.toString) + val dfTable2After2 = spark.read.parquet(table2Path2.toString) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should be the same since they were re-ran + assert(batchIdsOld.head == batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expected1) + compareText(actualTable2After1, expected1) + compareText(actualTable1After2, expected2) + compareText(actualTable2After2, expected2) + } + } + + "run for a historical date range with fill gaps update" in { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf1 = getConfig(tempDir) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + val dfTable1Before = spark.read.parquet(table1Path1.toString) + val dfTable2Before = spark.read.parquet(table2Path1.toString) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expected1) + compareText(actualTable2Before, expected1) + + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") + val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") + val dfTable1After1 = spark.read.parquet(table1Path1.toString) + val dfTable2After1 = spark.read.parquet(table2Path1.toString) + val dfTable1After2 = spark.read.parquet(table1Path2.toString) + val dfTable2After2 = spark.read.parquet(table2Path2.toString) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should not be the same since they we ran only for missing data + assert(batchIdsOld.head != batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expected1) + compareText(actualTable2After1, expected1) + compareText(actualTable1After2, expected2) + compareText(actualTable2After2, expected2) + } + } + + "deal with uncommitted changes when no data" in { + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + om.startWriteOffsets("table1", infoDate, OffsetValue.fromString("long", "1")) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val dfTable1 = spark.read.parquet(table1Path.toString) + val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1, expected1) + + val batchIds = dfTable1.select("pramen_batchid").distinct().collect() + + assert(batchIds.length == 1) + + val offsets = om.getOffsets("table1", infoDate).sortBy(_.createdAt) + + assert(offsets.length == 1) + + assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets.head.maxOffset.get.valueString.toLong == 3) + assert(offsets.head.committedAt.nonEmpty) + } + } + + "deal with uncommitted changes when there is data" in { + val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) + om1.startWriteOffsets("table1", infoDate, OffsetValue.LongType(Long.MinValue)) + + Thread.sleep(10) + + val om2 = new OffsetManagerJdbc(pramenDb.db, 123L) + om2.startWriteOffsets("table1", infoDate, OffsetValue.LongType(2)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1.toString) + .withColumn("pramen_batchid", lit(123L)) + + df.write.parquet(table1Path.toString) + + val conf = getConfig(tempDir) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val dfTable1 = spark.read.parquet(table1Path.toString) + val dfTable2 = spark.read.parquet(table2Path.toString) + val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1, expectedAll) + compareText(actualTable2, expected2) // ToDo This logic is to be changed when incremental transformations are supported + + val batchIds = dfTable1.select("pramen_batchid").distinct().collect() + + assert(batchIds.length == 2) + + val offsets = om2.getOffsets("table1", infoDate).sortBy(_.createdAt) + + assert(offsets.length == 2) + + assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets.head.maxOffset.get.valueString.toLong == 3) + assert(offsets.head.committedAt.nonEmpty) + assert(offsets(1).minOffset.valueString.toLong == 3) + assert(offsets(1).maxOffset.get.valueString.toLong == 6) + assert(offsets(1).committedAt.nonEmpty) + } + } + + "fail is the input data type does not conform" in { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, inferSchema = false) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 2) + } } - "deal with uncommitted changes" in { + "fail if the output table does not have the offset field" in { + val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) + om1.startWriteOffsets("table1", infoDate, OffsetValue.LongType(Long.MinValue)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "name\nJohn\nJack\nJill\n") + fsUtils.writeFile(path2, "name\nMary\nJane\nKate\n") + + val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1.toString) + .withColumn("pramen_batchid", lit(123L)) + + df.write.parquet(table1Path.toString) + + val conf = getConfig(tempDir) + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 2) + } } } @@ -312,24 +572,24 @@ class IncrementalPipelineSuite extends AnyWordSpec val csv2Str = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" val expected1 = - """{"id":"1","name":"John"} - |{"id":"2","name":"Jack"} - |{"id":"3","name":"Jill"} + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} |""".stripMargin val expected2 = - """{"id":"4","name":"Mary"} - |{"id":"5","name":"Jane"} - |{"id":"6","name":"Kate"} + """{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} |""".stripMargin val expectedAll = - """{"id":"1","name":"John"} - |{"id":"2","name":"Jack"} - |{"id":"3","name":"Jill"} - |{"id":"4","name":"Mary"} - |{"id":"5","name":"Jane"} - |{"id":"6","name":"Kate"} + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} |""".stripMargin "work end to end as a normal run" in { @@ -467,16 +727,127 @@ class IncrementalPipelineSuite extends AnyWordSpec } "work for historical runs" in { + val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(2)}\n" + val csv2Str = s"id,name,info_date\n4,Mary,${infoDate.plusDays(1)}\n5,Jane,${infoDate.plusDays(1)}\n6,Kate,${infoDate.plusDays(1)}\n999,New2,${infoDate.plusDays(2)}\n" + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, csv1Str) + fsUtils.writeFile(path2, csv2Str) + + val conf1 = getConfig(tempDir, hasInfoDate = true) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + val dfTable1Before = spark.read.parquet(table1Path1.toString) + val dfTable2Before = spark.read.parquet(table2Path1.toString) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expected1) + compareText(actualTable2Before, expected1) + + + val conf2 = getConfig(tempDir, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") + val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") + val dfTable1After1 = spark.read.parquet(table1Path1.toString) + val dfTable2After1 = spark.read.parquet(table2Path1.toString) + val dfTable1After2 = spark.read.parquet(table1Path2.toString) + val dfTable2After2 = spark.read.parquet(table2Path2.toString) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should not be the same since they we ran only for missing data + assert(batchIdsOld.head != batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expected1) + compareText(actualTable2After1, expected1) + compareText(actualTable1After2, expected2) + compareText(actualTable2After2, expected2) + } } } "Edge cases" should { + val expected1 = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |""".stripMargin + + val expected2 = + """{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |""".stripMargin + "offsets cross info days" in { + val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(2)}\n1,John,${infoDate.minusDays(1)}\n2,Jack,${infoDate.minusDays(1)}\n3,Jill,$infoDate\n4,Mary,$infoDate\n" - } + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + + fsUtils.writeFile(path1, csv1Str) - "recover from a failed transformer" in { + val conf1 = getConfig(tempDir, hasInfoDate = true) + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.minusDays(1)}") + val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.minusDays(1)}") + val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") + val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + val dfTable1_1 = spark.read.parquet(table1Path1.toString) + val dfTable2_1 = spark.read.parquet(table2Path1.toString) + val dfTable1_2 = spark.read.parquet(table1Path2.toString) + val dfTable2_2 = spark.read.parquet(table2Path2.toString) + + val actualTable1_1 = dfTable1_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2_1 = dfTable2_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1_2 = dfTable1_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2_2 = dfTable2_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1_1, expected1) + compareText(actualTable2_1, expected1) + compareText(actualTable1_2, expected2) + compareText(actualTable2_2, expected2) + + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) + assert(offsets1.length == 1) + assert(offsets1.head.minOffset.valueString.toLong == Long.MinValue ) + assert(offsets1.head.maxOffset.get.valueString.toLong == 2) + assert(offsets1.head.committedAt.nonEmpty) + + val offsets2 = om.getOffsets("table1", infoDate) + + assert(offsets2.length == 1) + assert(offsets2.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets2.head.maxOffset.get.valueString.toLong == 4) + assert(offsets2.head.committedAt.nonEmpty) + } } } @@ -484,11 +855,22 @@ class IncrementalPipelineSuite extends AnyWordSpec isRerun: Boolean = false, useDataFrame: Boolean = false, isTransformerIncremental: Boolean = true, + isHistoricalRun: Boolean = false, + historyRunMode: String = "force", + inferSchema: Boolean = true, hasInfoDate: Boolean = false, useInfoDate: LocalDate = infoDate): Config = { val configContents = ResourceUtils.getResourceString("/test/config/incremental_pipeline.conf") val basePathEscaped = basePath.replace("\\", "\\\\") val transformerSchedule = if (isTransformerIncremental) "incremental" else "daily" + val historicalConfigStr = if (isHistoricalRun) { + s"""pramen.load.date.from = "${useInfoDate.minusDays(1)}" + |pramen.load.date.to = "$useInfoDate" + |pramen.runtime.run.mode = "$historyRunMode" + |""".stripMargin + } else { + "" + } val conf = ConfigFactory.parseString( s"""base.path = "$basePathEscaped" @@ -496,7 +878,8 @@ class IncrementalPipelineSuite extends AnyWordSpec |pramen.runtime.is.rerun = $isRerun |pramen.current.date = "$useInfoDate" |transformer.schedule = "$transformerSchedule" - | + |infer.schema = $inferSchema + |$historicalConfigStr |has.information.date.column = $hasInfoDate | |pramen.bookkeeping.jdbc {