diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala index 21f863f4e..b53247dea 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala @@ -16,9 +16,9 @@ package za.co.absa.pramen.api.offset +import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{LongType, StringType => SparkStringType} -import org.apache.spark.sql.{Column, functions} import java.time.Instant @@ -46,7 +46,7 @@ object OffsetValue { override def getSparkLit: Column = lit(t.toEpochMilli) - override def getSparkCol(c: Column): Column = functions.concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType) + override def getSparkCol(c: Column): Column = concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType) override def compareTo(o: OffsetValue): Int = { o match { diff --git a/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala b/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala index 641320313..a7ed2820c 100644 --- a/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala +++ b/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala @@ -16,7 +16,8 @@ package za.co.absa.pramen.api.offset -import org.apache.spark.sql.functions.{from_unixtime, lit} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{LongType, StringType} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.offset.OffsetValue.MINIMUM_TIMESTAMP_EPOCH_MILLI @@ -24,11 +25,12 @@ import java.time.Instant class OffsetValueSuite extends AnyWordSpec { "OffsetValue" should { - "be able to create a TimestampType instance" in { + "be able to create a DateTimeType instance" in { val offsetValue = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) - assert(offsetValue.dataTypeString == "timestamp") + assert(offsetValue.dataTypeString == "datetime") assert(offsetValue.valueString == "1726564198000") - assert(offsetValue.getSparkLit == from_unixtime(lit(1726564198000L))) + assert(offsetValue.getSparkLit == lit(1726564198000L)) + assert(offsetValue.getSparkCol(col("a")) == concat(unix_timestamp(col("a")), date_format(col("a"), "SSS")).cast(LongType)) } "be able to create a IntegralType instance" in { @@ -36,6 +38,7 @@ class OffsetValueSuite extends AnyWordSpec { assert(offsetValue.dataTypeString == "integral") assert(offsetValue.valueString == "42") assert(offsetValue.getSparkLit == lit(42L)) + assert(offsetValue.getSparkCol(col("a")) == col("a").cast(LongType)) } "be able to create a StringType instance" in { @@ -43,13 +46,14 @@ class OffsetValueSuite extends AnyWordSpec { assert(offsetValue.dataTypeString == "string") assert(offsetValue.valueString == "foo") assert(offsetValue.getSparkLit == lit("foo")) + assert(offsetValue.getSparkCol(col("a")) == col("a").cast(StringType)) } } "getMinimumForType" should { - "be able to get minimum value for timestamp type" in { - val offsetValue = OffsetValue.getMinimumForType("timestamp") - assert(offsetValue.dataTypeString == "timestamp") + "be able to get minimum value for datetime type" in { + val offsetValue = OffsetValue.getMinimumForType("datetime") + assert(offsetValue.dataTypeString == "datetime") assert(offsetValue.valueString == MINIMUM_TIMESTAMP_EPOCH_MILLI.toString) } @@ -73,6 +77,12 @@ class OffsetValueSuite extends AnyWordSpec { } "fromString" should { + "be able to create a DateTimeType instance from a string" in { + val offsetValue = OffsetValue.fromString("datetime", "1726552310000") + assert(offsetValue.dataTypeString == "datetime") + assert(offsetValue.valueString == "1726552310000") + } + "be able to create a IntegralType instance from a string" in { val offsetValue = OffsetValue.fromString("integral", "42") assert(offsetValue.dataTypeString == "integral") @@ -92,4 +102,59 @@ class OffsetValueSuite extends AnyWordSpec { } } + "compareTo" should { + "compare 2 datetime values" in { + val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198001L)) + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare a datetime value with value of some other type" in { + val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + val offsetValue2 = OffsetValue.IntegralType(42) + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + + "compare 2 integral values" in { + val offsetValue1 = OffsetValue.IntegralType(42) + val offsetValue2 = OffsetValue.IntegralType(43) + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare an integral value with value of some other type" in { + val offsetValue1 = OffsetValue.IntegralType(42) + val offsetValue2 = OffsetValue.StringType("foo") + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + + "compare 2 string values" in { + val offsetValue1 = OffsetValue.StringType("bar") + val offsetValue2 = OffsetValue.StringType("foo") + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare a string value with value of some other type" in { + val offsetValue1 = OffsetValue.StringType("foo") + val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + } } diff --git a/pramen/build.sbt b/pramen/build.sbt index 2eb1fc058..ddbbbb329 100644 --- a/pramen/build.sbt +++ b/pramen/build.sbt @@ -14,18 +14,13 @@ * limitations under the License. */ -import Dependencies._ -import Versions._ -import BuildInfoTemplateSettings._ -import com.github.sbt.jacoco.report.JacocoReportSettings - val scala211 = "2.11.12" val scala212 = "2.12.19" val scala213 = "2.13.13" ThisBuild / organization := "za.co.absa.pramen" -ThisBuild / scalaVersion := scala211 +ThisBuild / scalaVersion := scala212 ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213) ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala index 11564ebe6..2feb18b66 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala @@ -53,7 +53,7 @@ object MetastorePersistence { ) case DataFormat.Delta(query, recordsPerPartition) => new MetastorePersistenceDelta( - query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions + query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions ) case DataFormat.Raw(path) => new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala index 5827dcb1b..672b6ee9a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala @@ -35,6 +35,8 @@ import scala.util.Try class MetastorePersistenceDelta(query: Query, infoDateColumn: String, infoDateFormat: String, + batchIdColumn: String, + batchId: Long, recordsPerPartition: Option[Long], saveModeOpt: Option[SaveMode], readOptions: Map[String, String], @@ -66,23 +68,22 @@ class MetastorePersistenceDelta(query: Query, val whereCondition = s"$infoDateColumn='$infoDateStr'" - val recordCount = numberOfRecordsEstimate match { - case Some(count) => count - case None => dfIn.count() - } + val dfRepartitioned = if (recordsPerPartition.nonEmpty) { + val recordCount = numberOfRecordsEstimate match { + case Some(count) => count + case None => dfIn.count() + } - val dfRepartitioned = applyRepartitioning(dfIn, recordCount) + applyRepartitioning(dfIn, recordCount) + } else { + dfIn + } val saveMode = saveModeOpt.getOrElse(SaveMode.Overwrite) - val operationStr = saveMode match { - case SaveMode.Append => "Appending to" - case _ => "Writing to" - } - - val isAppend = saveMode match { - case SaveMode.Append => true - case _ => false + val (isAppend, operationStr) = saveMode match { + case SaveMode.Append => (true, "Appending to") + case _ => (false, "Writing to") } if (log.isDebugEnabled) { @@ -116,8 +117,22 @@ class MetastorePersistenceDelta(query: Query, val stats = getStats(infoDate, isAppend) stats.dataSizeBytes match { - case Some(size) => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records (${StringUtils.prettySize(size)}) to ${query.query}") - case None => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}") + case Some(size) => + stats.recordCountAppended match { + case Some(recordsAppended) => + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " + + s"new size: ${StringUtils.prettySize(size)}) to ${query.query}") + case None => + log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + + s"(${StringUtils.prettySize(size)}) to ${query.query}") + } + case None => + stats.recordCountAppended match { + case Some(recordsAppended) => + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}") + case None => + log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}") + } } stats @@ -125,7 +140,6 @@ class MetastorePersistenceDelta(query: Query, override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = { val df = loadTable(Option(infoDate), Option(infoDate)) - val recordCount = df.count() val sizeOpt = query match { case Query.Path(path) => @@ -142,7 +156,16 @@ class MetastorePersistenceDelta(query: Query, None } - MetaTableStats(recordCount, None, sizeOpt) + if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) { + val batchCount = df.filter(col(batchIdColumn) === batchId).count() + val countAll = df.count() + + MetaTableStats(countAll, Option(batchCount), sizeOpt) + } else { + val countAll = df.count() + + MetaTableStats(countAll, None, sizeOpt) + } } override def createOrUpdateHiveTable(infoDate: LocalDate, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index c0cca6b4e..3ebbfe0cd 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -81,17 +81,15 @@ class IncrementalIngestionJob(operationDef: OperationDef, mt.getTable(outputTable.name, Option(infoDate), Option(infoDate)) } catch { case ex: AnalysisException => - log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...", ex) - - uncommittedOffsets.foreach { of => - log.warn(s"Cleaning uncommitted offset: $of...") - om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) - } - - latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + rollbackOffsets(infoDate, om, uncommittedOffsets) return } + if (df.isEmpty) { + rollbackOffsets(infoDate, om, uncommittedOffsets) + return + } + if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.") } @@ -116,6 +114,17 @@ class IncrementalIngestionJob(operationDef: OperationDef, latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) } + private[core] def rollbackOffsets(infoDate: LocalDate, om: OffsetManager, uncommittedOffsets: Array[DataOffset]): Unit = { + log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...") + + uncommittedOffsets.foreach { of => + log.warn(s"Cleaning uncommitted offset: $of...") + om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) + } + + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + } + override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { if (source.getOffsetInfo(sourceTable.query).nonEmpty) { Reason.Ready diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala deleted file mode 100644 index 856dd2a5b..000000000 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.pramen.core.reader - -import org.apache.spark.sql.functions.{col, lit} -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.api.{Query, TableReader} - -import java.time.LocalDate -import java.time.format.DateTimeFormatter - -class TableReaderDelta(infoDateColumn: String, - infoDateFormat: String = "yyyy-MM-dd" - )(implicit spark: SparkSession) extends TableReader { - - private val log = LoggerFactory.getLogger(this.getClass) - private val dateFormatter = DateTimeFormatter.ofPattern(infoDateFormat) - - override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = { - if (infoDateBegin.equals(infoDateEnd)) { - log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateBegin)}'") - } else { - log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'") - } - getFilteredDataFrame(query, infoDateBegin, infoDateEnd).count() - } - - override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = { - if (infoDateBegin.equals(infoDateEnd)) { - log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'") - } else { - log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'") - } - getFilteredDataFrame(query, infoDateBegin, infoDateEnd) - } - - override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ??? - - override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ??? - - private def getFilteredDataFrame(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): DataFrame = { - val infoDateBeginStr = dateFormatter.format(infoDateBegin) - val infoDateEndStr = dateFormatter.format(infoDateEnd) - - val reader = query match { - case Query.Path(path) => - spark - .read - .format("delta") - .load(path) - case Query.Table(table) => - spark - .table(table) - case _ => throw new IllegalArgumentException(s"Arguments of type ${query.getClass} are not supported for Delta format.") - } - - if (infoDateBegin.equals(infoDateEnd)) { - reader.filter(col(s"$infoDateColumn") === lit(infoDateBeginStr)) - } else { - reader.filter(col(s"$infoDateColumn") >= lit(infoDateBeginStr) && col(s"$infoDateColumn") <= lit(infoDateEndStr)) - } - } -} diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index aa662566b..8474a4f34 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -27,12 +27,12 @@ pramen.metastore { tables = [ { name = "table1" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table1 }, { name = "table2" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table2 } ] diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf index b8b453ea6..eee4f9601 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf @@ -27,12 +27,12 @@ pramen.metastore { tables = [ { name = "table1" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table1 }, { name = "table2" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table2 } ] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala new file mode 100644 index 000000000..bf241ca24 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineDeltaLongSuite.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.integration + +class IncrementalPipelineDeltaLongSuite extends IncrementalPipelineLongFixture { + private val format = "delta" + + "For inputs without information date the pipeline" should { + "work end to end as a normal run" in { + testOffsetOnlyNormalRun(format) + } + + "work with incremental ingestion and normal transformer" in { + testOffsetOnlyIncrementalIngestionNormalTransformer(format) + } + + "work end to end as rerun" in { + testOffsetOnlyRerun(format) + } + + "work end to end as rerun with deletion of records" in { + testOffsetOnlyRerunWithRecordsDeletion(format) + } + + "work end to end as rerun with deletion of records with previous data present" in { + testOffsetOnlyRerunWithRecordsDeletionAndPreviousDataPresent(format) + } + + "run for a historical date range with force update" in { + testOffsetOnlyHistoricalDateRangeWithForceUpdate(format) + } + + "run for a historical date range with fill gaps update" in { + testOffsetOnlyHistoricalDateRangeWithFillGaps(format) + } + + "deal with uncommitted offsets when no path" in { + testOffsetOnlyDealWithUncommittedOffsetsWithNoPath(format) + } + + "deal with uncommitted offsets when no data" in { + testOffsetOnlyDealWithUncommittedOffsetsWithNoData(format) + } + + "deal with uncommitted changes when there is data" in { + testOffsetOnlyDealWithUncommittedOffsetsWithData(format) + } + + "fail is the input data type does not conform" in { + testOffsetOnlyFailWhenInputDataDoesNotConform(format) + } + + "fail if the output table does not have the offset field" in { + testOffsetOnlyFailWhenInputTableDoestHaveOffsetField(format) + } + } + + "For inputs with information date the pipeline" should { + "work end to end as a normal run" in { + testWithInfoDateNormalRun(format) + } + + "work with incremental ingestion and normal transformer" in { + testWithInfoDateIncrementalIngestionNormalTransformer(format) + } + + "work end to end as rerun" in { + testWithInfoDateRerun(format) + } + + "work for historical runs" in { + testWithInfoDateHistoricalDateRange(format) + } + } + + "When the input column is timestamp" should { + "work end to end as a normal run" in { + testWithTimestampNormalRun(format) + } + } + + "Edge cases" should { + "offsets cross info days" in { + testOffsetCrossInfoDateEdgeCase(format) + } + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala new file mode 100644 index 000000000..63b3dbd26 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongFixture.scala @@ -0,0 +1,1079 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.integration + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.TimestampType +import org.scalatest.wordspec.AnyWordSpec +import org.scalatest.{Assertion, BeforeAndAfter, BeforeAndAfterAll} +import za.co.absa.pramen.api.offset.OffsetValue +import za.co.absa.pramen.api.offset.OffsetValue.IntegralType +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.bookkeeper.OffsetManagerJdbc +import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} +import za.co.absa.pramen.core.rdb.PramenDb +import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl +import za.co.absa.pramen.core.reader.model.JdbcConfig +import za.co.absa.pramen.core.runner.AppRunner +import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils} + +import java.sql.Date +import java.time.LocalDate + +class IncrementalPipelineLongFixture extends AnyWordSpec + with SparkTestBase + with RelationalDbFixture + with BeforeAndAfter + with BeforeAndAfterAll + with TempDirFixture + with TextComparisonFixture { + + val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) + val pramenDb: PramenDb = PramenDb(jdbcConfig) + + before { + pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") + pramenDb.setupDatabase() + } + + override def afterAll(): Unit = { + pramenDb.close() + super.afterAll() + } + + private val infoDate = LocalDate.of(2021, 2, 18) + + private val BATCH_ID_COLUMN = "pramen_batchid" + private val INFO_DATE_COLUMN = "pramen_info_date" + + val expectedOffsetOnly1: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |""".stripMargin + + val expectedOffsetOnly2: String = + """{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} + |""".stripMargin + + val expectedOffsetOnlyAll: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} + |""".stripMargin + + val csv1WithInfoDateStr = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n" + val csv2WithInfoDateStr = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" + + val expectedWithInfoDate1: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |""".stripMargin + + val expectedWithInfoDate2: String = + """{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} + |""".stripMargin + + val expectedWithInfoDateAll: String = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |{"id":5,"name":"Jane"} + |{"id":6,"name":"Kate"} + |""".stripMargin + + val csv1WithTimestampStr = s"tss,name\n1613563930000,Old\n1613639398123,John\n1613639398124,Jack\n1613639399123,Jill\n1613740330000,New\n" + + val expectedWithTimestamp1: String = + """{"ts":"2021-02-17T14:12:10.000+02:00","name":"Old","pramen_info_date":"2021-02-17"} + |{"ts":"2021-02-18T11:09:58.123+02:00","name":"John","pramen_info_date":"2021-02-18"} + |{"ts":"2021-02-18T11:09:58.124+02:00","name":"Jack","pramen_info_date":"2021-02-18"} + |{"ts":"2021-02-18T11:09:59.123+02:00","name":"Jill","pramen_info_date":"2021-02-18"} + |""".stripMargin + + val expectedWithTimestampAll: String = + """{"ts":"2021-02-17T14:12:10.000+02:00","name":"Old","pramen_info_date":"2021-02-17"} + |{"ts":"2021-02-18T11:09:58.123+02:00","name":"John","pramen_info_date":"2021-02-18"} + |{"ts":"2021-02-18T11:09:58.124+02:00","name":"Jack","pramen_info_date":"2021-02-18"} + |{"ts":"2021-02-18T11:09:59.123+02:00","name":"Jill","pramen_info_date":"2021-02-18"} + |{"ts":"2021-02-19T15:12:10.000+02:00","name":"New","pramen_info_date":"2021-02-19"} + |""".stripMargin + + def testOffsetOnlyNormalRun(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val exitCode2 = AppRunner.runPipeline(conf) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedOffsetOnlyAll) + compareText(actualTable2After, expectedOffsetOnlyAll) + } + succeed + } + + def testOffsetOnlyIncrementalIngestionNormalTransformer(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat, isTransformerIncremental = false) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val exitCode2 = AppRunner.runPipeline(conf) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedOffsetOnlyAll) + compareText(actualTable2After, expectedOffsetOnlyAll) + } + succeed + } + + def testOffsetOnlyRerun(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 1) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + // Expecting original records + compareText(actualTable1After, expectedOffsetOnly1) + compareText(actualTable2After, expectedOffsetOnly1) + } + succeed + } + + def testOffsetOnlyRerunWithRecordsDeletion(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.isEmpty) + + // Expecting empty records + assert(dfTable1After.isEmpty) + assert(dfTable2After.isEmpty) + } + succeed + } + + def testOffsetOnlyRerunWithRecordsDeletionAndPreviousDataPresent(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path0 = new Path(tempDir, new Path("landing", "landing_file0.csv")) + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path0, "id,name\n0,Old\n") + + val conf0 = getConfig(tempDir, metastoreFormat, useInfoDate = infoDate.minusDays(1)) + + val exitCode0 = AppRunner.runPipeline(conf0) + fsUtils.deleteFile(path1) + + assert(exitCode0 == 0) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf1 = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.isEmpty) + + // Expecting empty records + assert(dfTable1After.isEmpty) + assert(dfTable2After.isEmpty) + } + succeed + } + + def testOffsetOnlyHistoricalDateRangeWithForceUpdate(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf1 = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(tempDir, "table1") + val table2Path1 = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, metastoreFormat, isHistoricalRun = true, useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val table1Path2 = new Path(tempDir, "table1") + val table2Path2 = new Path(tempDir, "table2") + val dfTable1After1 = spark.read.format(metastoreFormat).load(table1Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After1 = spark.read.format(metastoreFormat).load(table2Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable1After2 = spark.read.format(metastoreFormat).load(table1Path2.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val dfTable2After2 = spark.read.format(metastoreFormat).load(table2Path2.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should be the same since they were re-ran + assert(batchIdsOld.head == batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expectedOffsetOnly1) + compareText(actualTable2After1, expectedOffsetOnly1) + compareText(actualTable1After2, expectedOffsetOnly2) + compareText(actualTable2After2, expectedOffsetOnly2) + } + succeed + } + + def testOffsetOnlyHistoricalDateRangeWithFillGaps(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf1 = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path1 = new Path(tempDir, "table1") + val table2Path1 = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedOffsetOnly1) + compareText(actualTable2Before, expectedOffsetOnly1) + + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val conf2 = getConfig(tempDir, metastoreFormat, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val table1Path2 = new Path(tempDir, "table1") + val table2Path2 = new Path(tempDir, "table2") + val dfTable1After1 = spark.read.format(metastoreFormat).load(table1Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After1 = spark.read.format(metastoreFormat).load(table2Path1.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable1After2 = spark.read.format(metastoreFormat).load(table1Path2.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val dfTable2After2 = spark.read.format(metastoreFormat).load(table2Path2.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should not be the same since they we ran only for missing data + assert(batchIdsOld.head != batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expectedOffsetOnly1) + compareText(actualTable2After1, expectedOffsetOnly1) + compareText(actualTable1After2, expectedOffsetOnly2) + compareText(actualTable2After2, expectedOffsetOnly2) + } + succeed + } + + def testOffsetOnlyDealWithUncommittedOffsetsWithNoPath(metastoreFormat: String): Assertion = { + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + om.startWriteOffsets("table1", infoDate, IntegralType(1)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val table1Path = new Path(tempDir, "table1") + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val dfTable1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1, expectedOffsetOnly1) + + val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 1) + + val offsets = om.getOffsets("table1", infoDate).sortBy(_.createdAt) + + assert(offsets.length == 1) + + assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets.head.maxOffset.get.valueString.toLong == 3) + assert(offsets.head.committedAt.nonEmpty) + } + succeed + } + + def testOffsetOnlyDealWithUncommittedOffsetsWithNoData(metastoreFormat: String): Assertion = { + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + om.startWriteOffsets("table1", infoDate, IntegralType(1)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val table1Path = new Path(tempDir, "table1") + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1.toString) + .withColumn(BATCH_ID_COLUMN, lit(123L)) + .withColumn(INFO_DATE_COLUMN, lit(Date.valueOf(infoDate))) + + df.filter(col("id") < 0) + .write + .format(metastoreFormat) + .partitionBy(INFO_DATE_COLUMN) + .save(table1Path.toString) + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val dfTable1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1, expectedOffsetOnly1) + + val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 1) + + val offsets = om.getOffsets("table1", infoDate).sortBy(_.createdAt) + + assert(offsets.length == 1) + + assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets.head.maxOffset.get.valueString.toLong == 3) + assert(offsets.head.committedAt.nonEmpty) + } + succeed + } + + def testOffsetOnlyDealWithUncommittedOffsetsWithData(metastoreFormat: String): Assertion = { + val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) + om1.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(Long.MinValue)) + + Thread.sleep(10) + + val om2 = new OffsetManagerJdbc(pramenDb.db, 123L) + om2.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(2)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1.toString) + .withColumn(BATCH_ID_COLUMN, lit(123L)) + .withColumn(INFO_DATE_COLUMN, lit(Date.valueOf(infoDate))) + + df.write + .format(metastoreFormat) + .partitionBy(INFO_DATE_COLUMN) + .save(table1Path.toString) + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val dfTable1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1, expectedOffsetOnlyAll) + compareText(actualTable2, expectedOffsetOnly2) // ToDo This logic is to be changed when incremental transformations are supported + + val batchIds = dfTable1.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val offsets = om2.getOffsets("table1", infoDate).sortBy(_.createdAt) + + assert(offsets.length == 2) + + assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets.head.maxOffset.get.valueString.toLong == 3) + assert(offsets.head.committedAt.nonEmpty) + assert(offsets(1).minOffset.valueString.toLong == 3) + assert(offsets(1).maxOffset.get.valueString.toLong == 6) + assert(offsets(1).committedAt.nonEmpty) + } + succeed + } + + def testOffsetOnlyFailWhenInputDataDoesNotConform(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") + + val conf = getConfig(tempDir, metastoreFormat, inferSchema = false) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 2) + } + succeed + } + + def testOffsetOnlyFailWhenInputTableDoestHaveOffsetField(metastoreFormat: String): Assertion = { + val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) + om1.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(Long.MinValue)) + + Thread.sleep(10) + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, "name\nJohn\nJack\nJill\n") + fsUtils.writeFile(path2, "name\nMary\nJane\nKate\n") + + val table1Path = new Path(tempDir, "table1") + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1.toString) + .withColumn(BATCH_ID_COLUMN, lit(123L)) + .withColumn(INFO_DATE_COLUMN, lit(Date.valueOf(infoDate))) + + df.write.format(metastoreFormat) + .partitionBy(INFO_DATE_COLUMN) + .save(table1Path.toString) + + val conf = getConfig(tempDir, metastoreFormat) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 2) + } + succeed + } + + def testWithInfoDateNormalRun(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, csv1WithInfoDateStr) + + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedWithInfoDate1) + compareText(actualTable2Before, expectedWithInfoDate1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, csv2WithInfoDateStr) + + val exitCode2 = AppRunner.runPipeline(conf) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.parquet(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After = spark.read.parquet(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedWithInfoDateAll) + compareText(actualTable2After, expectedWithInfoDateAll) + } + succeed + } + + def testWithInfoDateIncrementalIngestionNormalTransformer(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, csv1WithInfoDateStr) + + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isTransformerIncremental = false) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedWithInfoDate1) + compareText(actualTable2Before, expectedWithInfoDate1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, csv2WithInfoDateStr) + + val exitCode2 = AppRunner.runPipeline(conf) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedWithInfoDateAll) + compareText(actualTable2After, expectedWithInfoDateAll) + } + succeed + } + + def testWithInfoDateRerun(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + fsUtils.writeFile(path1, csv1WithInfoDateStr) + + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedWithInfoDate1) + compareText(actualTable2Before, expectedWithInfoDate1) + + fsUtils.deleteFile(path1) + fsUtils.writeFile(path2, csv2WithInfoDateStr) + + val conf2 = getConfig(tempDir, metastoreFormat, isRerun = true, hasInfoDate = true) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 1) + + val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + // Expecting original records + compareText(actualTable1After, expectedWithInfoDate2) + compareText(actualTable2After, expectedWithInfoDate2) + } + succeed + } + + def testWithInfoDateHistoricalDateRange(metastoreFormat: String): Assertion = { + val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(2)}\n" + val csv2Str = s"id,name,info_date\n4,Mary,${infoDate.plusDays(1)}\n5,Jane,${infoDate.plusDays(1)}\n6,Kate,${infoDate.plusDays(1)}\n999,New2,${infoDate.plusDays(2)}\n" + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) + + fsUtils.writeFile(path1, csv1Str) + fsUtils.writeFile(path2, csv2Str) + + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedWithInfoDate1) + compareText(actualTable2Before, expectedWithInfoDate1) + + val conf2 = getConfig(tempDir, metastoreFormat, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2After1 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable1After2 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val dfTable2After2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.plusDays(1))) + val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + val batchIdsOld = dfTable1After1.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + val batchIdsNew = dfTable1After2.select(BATCH_ID_COLUMN).distinct().collect().map(_.getLong(0)) + + assert(batchIdsOld.length == 1) + assert(batchIdsNew.length == 1) + + // The batch id for all info dates in range should not be the same since they we ran only for missing data + assert(batchIdsOld.head != batchIdsNew.head) + + // Expecting empty records + compareText(actualTable1After1, expectedWithInfoDate1) + compareText(actualTable2After1, expectedWithInfoDate1) + compareText(actualTable1After2, expectedWithInfoDate2) + compareText(actualTable2After2, expectedWithInfoDate2) + } + succeed + } + + def testWithTimestampNormalRun(metastoreFormat: String): Assertion = { + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1Csv = new Path(tempDir, new Path("landing_csv", "landing_file1.csv")) + val path1Parquet = new Path(tempDir, "landing") + fsUtils.writeFile(path1Csv, csv1WithTimestampStr) + + val df = spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv(path1Csv.toString) + .withColumn("ts", (col("tss") / 1000).cast(TimestampType)) + + df.write.parquet(path1Parquet.toString) + + val conf = getConfig(tempDir, metastoreFormat, hasInfoDate = true, resource = "/test/config/incremental_pipeline_ts.conf") + + val exitCode1 = AppRunner.runPipeline(conf) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1Before = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2Before = spark.read.format(metastoreFormat).load(table2Path.toString) + val actualTable1Before = dfTable1Before.select("ts", "name", INFO_DATE_COLUMN).orderBy("ts").toJSON.collect().mkString("\n") + val actualTable2Before = dfTable2Before.select("ts", "name", INFO_DATE_COLUMN).orderBy("ts").toJSON.collect().mkString("\n") + + compareText(actualTable1Before, expectedWithTimestamp1) + compareText(actualTable2Before, expectedWithTimestamp1) + + val conf2 = getConfig(tempDir, metastoreFormat, isTransformerIncremental = false, hasInfoDate = true, useInfoDate = infoDate.plusDays(1), resource = "/test/config/incremental_pipeline_ts.conf") + val exitCode2 = AppRunner.runPipeline(conf2) + assert(exitCode2 == 0) + + val dfTable1After = spark.read.format(metastoreFormat).load(table1Path.toString) + val dfTable2After = spark.read.format(metastoreFormat).load(table2Path.toString) + + val batchIds = dfTable1After.select(BATCH_ID_COLUMN).distinct().collect() + + assert(batchIds.length == 2) + + val actualTable1After = dfTable1After.select("ts", "name", INFO_DATE_COLUMN).orderBy("ts").toJSON.collect().mkString("\n") + val actualTable2After = dfTable2After.select("ts", "name", INFO_DATE_COLUMN).orderBy("ts").toJSON.collect().mkString("\n") + + compareText(actualTable1After, expectedWithTimestampAll) + compareText(actualTable2After, expectedWithTimestampAll) + + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) + assert(offsets1.head.minOffset.valueString.toLong == -62135596800000L) + assert(offsets1.head.maxOffset.get.valueString.toLong == 1613563930000L) + assert(offsets1.head.committedAt.nonEmpty) + + + val offsets2 = om.getOffsets("table1", infoDate) + assert(offsets2.length == 1) + assert(offsets2.head.minOffset.valueString.toLong == 1613563930000L) + assert(offsets2.head.maxOffset.get.valueString.toLong == 1613639399123L) + assert(offsets2.head.committedAt.nonEmpty) + + val offsets3 = om.getOffsets("table1", infoDate.plusDays(1)) + assert(offsets3.length == 1) + assert(offsets3.head.minOffset.valueString.toLong == 1613639399123L) + assert(offsets3.head.maxOffset.get.valueString.toLong == 1613740330000L) + assert(offsets3.head.committedAt.nonEmpty) + } + succeed + } + + def testOffsetCrossInfoDateEdgeCase(metastoreFormat: String): Assertion = { + val expected1 = + """{"id":1,"name":"John"} + |{"id":2,"name":"Jack"} + |""".stripMargin + + val expected2 = + """{"id":3,"name":"Jill"} + |{"id":4,"name":"Mary"} + |""".stripMargin + + val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(2)}\n1,John,${infoDate.minusDays(1)}\n2,Jack,${infoDate.minusDays(1)}\n3,Jill,$infoDate\n4,Mary,$infoDate\n" + + withTempDirectory("incremental1") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) + + fsUtils.writeFile(path1, csv1Str) + + val conf1 = getConfig(tempDir, metastoreFormat, hasInfoDate = true) + + val exitCode1 = AppRunner.runPipeline(conf1) + assert(exitCode1 == 0) + + val table1Path = new Path(tempDir, "table1") + val table2Path = new Path(tempDir, "table2") + val dfTable1_1 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(1))) + val dfTable2_1 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate.minusDays(1))) + val dfTable1_2 = spark.read.format(metastoreFormat).load(table1Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + val dfTable2_2 = spark.read.format(metastoreFormat).load(table2Path.toString).filter(col(INFO_DATE_COLUMN) === Date.valueOf(infoDate)) + + val actualTable1_1 = dfTable1_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2_1 = dfTable2_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable1_2 = dfTable1_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + val actualTable2_2 = dfTable2_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") + + compareText(actualTable1_1, expected1) + compareText(actualTable2_1, expected1) + compareText(actualTable1_2, expected2) + compareText(actualTable2_2, expected2) + + val om = new OffsetManagerJdbc(pramenDb.db, 123L) + + val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) + assert(offsets1.length == 1) + assert(offsets1.head.minOffset.valueString.toLong == Long.MinValue) + assert(offsets1.head.maxOffset.get.valueString.toLong == 2) + assert(offsets1.head.committedAt.nonEmpty) + + val offsets2 = om.getOffsets("table1", infoDate) + assert(offsets2.length == 1) + assert(offsets2.head.minOffset.valueString.toLong == 2) + assert(offsets2.head.maxOffset.get.valueString.toLong == 4) + assert(offsets2.head.committedAt.nonEmpty) + } + succeed + } + + def getConfig(basePath: String, + metastoreFormat: String, + isRerun: Boolean = false, + useDataFrame: Boolean = false, + isTransformerIncremental: Boolean = true, + isHistoricalRun: Boolean = false, + historyRunMode: String = "force", + inferSchema: Boolean = true, + hasInfoDate: Boolean = false, + useInfoDate: LocalDate = infoDate, + resource: String = "/test/config/incremental_pipeline.conf"): Config = { + val configContents = ResourceUtils.getResourceString(resource) + val basePathEscaped = basePath.replace("\\", "\\\\") + val transformerSchedule = if (isTransformerIncremental) "incremental" else "daily" + val historicalConfigStr = if (isHistoricalRun) { + s"""pramen.load.date.from = "${useInfoDate.minusDays(1)}" + |pramen.load.date.to = "$useInfoDate" + |pramen.runtime.run.mode = "$historyRunMode" + |""".stripMargin + } else { + "" + } + + val conf = ConfigFactory.parseString( + s"""base.path = "$basePathEscaped" + |use.dataframe = $useDataFrame + |pramen.runtime.is.rerun = $isRerun + |pramen.current.date = "$useInfoDate" + |transformer.schedule = "$transformerSchedule" + |infer.schema = $inferSchema + |$historicalConfigStr + |has.information.date.column = $hasInfoDate + |metastore.format = $metastoreFormat + | + |pramen.bookkeeping.jdbc { + | driver = "$driver" + | url = "$url" + | user = "$user" + | password = "$password" + |} + |$configContents + |""".stripMargin + ).withFallback(ConfigFactory.load()) + .resolve() + + conf + } + + /* This method is used to inspect offsets after operations */ + private def debugOffsets(): Unit = { + JdbcNativeUtils.withResultSet(new JdbcUrlSelectorImpl(jdbcConfig), "SELECT * FROM \"offsets\"", 1) { rs => + val mt = rs.getMetaData + + for (i <- 1 to mt.getColumnCount) { + print(mt.getColumnName(i) + "\t") + } + println("") + + while (rs.next()) { + for (i <- 1 to mt.getColumnCount) { + print(rs.getString(i) + "\t") + } + println("") + } + } + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala deleted file mode 100644 index ac7dc644b..000000000 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala +++ /dev/null @@ -1,1008 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.pramen.core.integration - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.functions.{col, lit} -import org.apache.spark.sql.types.TimestampType -import org.scalatest.wordspec.AnyWordSpec -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.api.offset.OffsetValue.IntegralType -import za.co.absa.pramen.core.base.SparkTestBase -import za.co.absa.pramen.core.bookkeeper.OffsetManagerJdbc -import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.rdb.PramenDb -import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl -import za.co.absa.pramen.core.reader.model.JdbcConfig -import za.co.absa.pramen.core.runner.AppRunner -import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils} - -import java.time.LocalDate - -class IncrementalPipelineLongSuite extends AnyWordSpec - with SparkTestBase - with RelationalDbFixture - with BeforeAndAfter - with BeforeAndAfterAll - with TempDirFixture - with TextComparisonFixture { - - val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - val pramenDb: PramenDb = PramenDb(jdbcConfig) - - before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase() - } - - override def afterAll(): Unit = { - pramenDb.close() - super.afterAll() - } - - private val infoDate = LocalDate.of(2021, 2, 18) - - "For inputs without information date the pipeline" should { - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |""".stripMargin - - val expected2 = - """{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - val expectedAll = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - "work end to end as a normal run" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work with incremental ingestion and normal transformer" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir, isTransformerIncremental = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work end to end as rerun" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - // Expecting original records - compareText(actualTable1After, expected1) - compareText(actualTable2After, expected1) - } - } - - "work end to end as rerun with deletion of records" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.isEmpty) - - // Expecting empty records - assert(dfTable1After.isEmpty) - assert(dfTable2After.isEmpty) - } - } - - "work end to end as rerun with deletion of records with previous data present" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path0 = new Path(tempDir, new Path("landing", "landing_file0.csv")) - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path0, "id,name\n0,Old\n") - - val conf0 = getConfig(tempDir, useInfoDate = infoDate.minusDays(1)) - - val exitCode0 = AppRunner.runPipeline(conf0) - fsUtils.deleteFile(path1) - - assert(exitCode0 == 0) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.isEmpty) - - // Expecting empty records - assert(dfTable1After.isEmpty) - assert(dfTable2After.isEmpty) - } - } - - "run for a historical date range with force update" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isHistoricalRun = true, useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should be the same since they were re-ran - assert(batchIdsOld.head == batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - - "run for a historical date range with fill gaps update" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should not be the same since they we ran only for missing data - assert(batchIdsOld.head != batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - - "deal with uncommitted changes when no data" in { - val om = new OffsetManagerJdbc(pramenDb.db, 123L) - - om.startWriteOffsets("table1", infoDate, IntegralType(1)) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val dfTable1 = spark.read.parquet(table1Path.toString) - val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1, expected1) - - val batchIds = dfTable1.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val offsets = om.getOffsets("table1", infoDate).sortBy(_.createdAt) - - assert(offsets.length == 1) - - assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) - assert(offsets.head.maxOffset.get.valueString.toLong == 3) - assert(offsets.head.committedAt.nonEmpty) - } - } - - "deal with uncommitted changes when there is data" in { - val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) - om1.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(Long.MinValue)) - - Thread.sleep(10) - - val om2 = new OffsetManagerJdbc(pramenDb.db, 123L) - om2.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(2)) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - - val df = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(path1.toString) - .withColumn("pramen_batchid", lit(123L)) - - df.write.parquet(table1Path.toString) - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val dfTable1 = spark.read.parquet(table1Path.toString) - val dfTable2 = spark.read.parquet(table2Path.toString) - val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1, expectedAll) - compareText(actualTable2, expected2) // ToDo This logic is to be changed when incremental transformations are supported - - val batchIds = dfTable1.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val offsets = om2.getOffsets("table1", infoDate).sortBy(_.createdAt) - - assert(offsets.length == 2) - - assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) - assert(offsets.head.maxOffset.get.valueString.toLong == 3) - assert(offsets.head.committedAt.nonEmpty) - assert(offsets(1).minOffset.valueString.toLong == 3) - assert(offsets(1).maxOffset.get.valueString.toLong == 6) - assert(offsets(1).committedAt.nonEmpty) - } - } - - "fail is the input data type does not conform" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir, inferSchema = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 2) - } - } - - "fail if the output table does not have the offset field" in { - val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) - om1.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(Long.MinValue)) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "name\nJohn\nJack\nJill\n") - fsUtils.writeFile(path2, "name\nMary\nJane\nKate\n") - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - - val df = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(path1.toString) - .withColumn("pramen_batchid", lit(123L)) - - df.write.parquet(table1Path.toString) - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 2) - } - } - } - - "For inputs with information date the pipeline" should { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n" - val csv2Str = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" - - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |""".stripMargin - - val expected2 = - """{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - val expectedAll = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - "work end to end as a normal run" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work with incremental ingestion and normal transformer" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true, isTransformerIncremental = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work end to end as rerun" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val conf2 = getConfig(tempDir, isRerun = true, hasInfoDate = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - // Expecting original records - compareText(actualTable1After, expected2) - compareText(actualTable2After, expected2) - } - } - - "work for historical runs" in { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(2)}\n" - val csv2Str = s"id,name,info_date\n4,Mary,${infoDate.plusDays(1)}\n5,Jane,${infoDate.plusDays(1)}\n6,Kate,${infoDate.plusDays(1)}\n999,New2,${infoDate.plusDays(2)}\n" - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, csv1Str) - fsUtils.writeFile(path2, csv2Str) - - val conf1 = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - - val conf2 = getConfig(tempDir, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should not be the same since they we ran only for missing data - assert(batchIdsOld.head != batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - } - - "When the input column is timestamp" should { - val csv1Str = s"tss,name\n1613563930000,Old\n1613639398123,John\n1613639398124,Jack\n1613639399123,Jill\n1613740330000,New\n" - - val expected1 = - """{"ts":"2021-02-17T14:12:10.000+02:00","name":"Old","pramen_info_date":"2021-02-17"} - |{"ts":"2021-02-18T11:09:58.123+02:00","name":"John","pramen_info_date":"2021-02-18"} - |{"ts":"2021-02-18T11:09:58.124+02:00","name":"Jack","pramen_info_date":"2021-02-18"} - |{"ts":"2021-02-18T11:09:59.123+02:00","name":"Jill","pramen_info_date":"2021-02-18"} - |""".stripMargin - - val expectedAll = - """{"ts":"2021-02-17T14:12:10.000+02:00","name":"Old","pramen_info_date":"2021-02-17"} - |{"ts":"2021-02-18T11:09:58.123+02:00","name":"John","pramen_info_date":"2021-02-18"} - |{"ts":"2021-02-18T11:09:58.124+02:00","name":"Jack","pramen_info_date":"2021-02-18"} - |{"ts":"2021-02-18T11:09:59.123+02:00","name":"Jill","pramen_info_date":"2021-02-18"} - |{"ts":"2021-02-19T15:12:10.000+02:00","name":"New","pramen_info_date":"2021-02-19"} - |""".stripMargin - - "work end to end as a normal run" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1Csv = new Path(tempDir, new Path("landing_csv", "landing_file1.csv")) - val path1Parquet = new Path(tempDir, "landing") - fsUtils.writeFile(path1Csv, csv1Str) - - val df = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(path1Csv.toString) - .withColumn("ts", (col("tss") / 1000).cast(TimestampType)) - - df.write.parquet(path1Parquet.toString) - - val conf = getConfig(tempDir, hasInfoDate = true, resource = "/test/config/incremental_pipeline_ts.conf") - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(tempDir, "table1") - val table2Path = new Path(tempDir, "table2") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("ts", "name", "pramen_info_date").orderBy("ts").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("ts", "name", "pramen_info_date").orderBy("ts").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - val conf2 = getConfig(tempDir, isTransformerIncremental = false, hasInfoDate = true, useInfoDate = infoDate.plusDays(1), resource = "/test/config/incremental_pipeline_ts.conf") - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("ts", "name", "pramen_info_date").orderBy("ts").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("ts", "name", "pramen_info_date").orderBy("ts").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - - val om = new OffsetManagerJdbc(pramenDb.db, 123L) - - val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) - assert(offsets1.head.minOffset.valueString.toLong == -62135596800000L) - assert(offsets1.head.maxOffset.get.valueString.toLong == 1613563930000L) - assert(offsets1.head.committedAt.nonEmpty) - - - val offsets2 = om.getOffsets("table1", infoDate) - assert(offsets2.length == 1) - assert(offsets2.head.minOffset.valueString.toLong == 1613563930000L) - assert(offsets2.head.maxOffset.get.valueString.toLong == 1613639399123L) - assert(offsets2.head.committedAt.nonEmpty) - - val offsets3 = om.getOffsets("table1", infoDate.plusDays(1)) - assert(offsets3.length == 1) - assert(offsets3.head.minOffset.valueString.toLong == 1613639399123L) - assert(offsets3.head.maxOffset.get.valueString.toLong == 1613740330000L) - assert(offsets3.head.committedAt.nonEmpty) - } - } - } - - "Edge cases" should { - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |""".stripMargin - - val expected2 = - """{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |""".stripMargin - - "offsets cross info days" in { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(2)}\n1,John,${infoDate.minusDays(1)}\n2,Jack,${infoDate.minusDays(1)}\n3,Jill,$infoDate\n4,Mary,$infoDate\n" - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - - fsUtils.writeFile(path1, csv1Str) - - val conf1 = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.minusDays(1)}") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.minusDays(1)}") - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1_1 = spark.read.parquet(table1Path1.toString) - val dfTable2_1 = spark.read.parquet(table2Path1.toString) - val dfTable1_2 = spark.read.parquet(table1Path2.toString) - val dfTable2_2 = spark.read.parquet(table2Path2.toString) - - val actualTable1_1 = dfTable1_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2_1 = dfTable2_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1_2 = dfTable1_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2_2 = dfTable2_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1_1, expected1) - compareText(actualTable2_1, expected1) - compareText(actualTable1_2, expected2) - compareText(actualTable2_2, expected2) - - val om = new OffsetManagerJdbc(pramenDb.db, 123L) - - val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) - assert(offsets1.length == 1) - assert(offsets1.head.minOffset.valueString.toLong == Long.MinValue ) - assert(offsets1.head.maxOffset.get.valueString.toLong == 2) - assert(offsets1.head.committedAt.nonEmpty) - - val offsets2 = om.getOffsets("table1", infoDate) - assert(offsets2.length == 1) - assert(offsets2.head.minOffset.valueString.toLong == 2) - assert(offsets2.head.maxOffset.get.valueString.toLong == 4) - assert(offsets2.head.committedAt.nonEmpty) - } - } - } - - def getConfig(basePath: String, - isRerun: Boolean = false, - useDataFrame: Boolean = false, - isTransformerIncremental: Boolean = true, - isHistoricalRun: Boolean = false, - historyRunMode: String = "force", - inferSchema: Boolean = true, - hasInfoDate: Boolean = false, - useInfoDate: LocalDate = infoDate, - resource: String = "/test/config/incremental_pipeline.conf"): Config = { - val configContents = ResourceUtils.getResourceString(resource) - val basePathEscaped = basePath.replace("\\", "\\\\") - val transformerSchedule = if (isTransformerIncremental) "incremental" else "daily" - val historicalConfigStr = if (isHistoricalRun) { - s"""pramen.load.date.from = "${useInfoDate.minusDays(1)}" - |pramen.load.date.to = "$useInfoDate" - |pramen.runtime.run.mode = "$historyRunMode" - |""".stripMargin - } else { - "" - } - - val conf = ConfigFactory.parseString( - s"""base.path = "$basePathEscaped" - |use.dataframe = $useDataFrame - |pramen.runtime.is.rerun = $isRerun - |pramen.current.date = "$useInfoDate" - |transformer.schedule = "$transformerSchedule" - |infer.schema = $inferSchema - |$historicalConfigStr - |has.information.date.column = $hasInfoDate - | - |pramen.bookkeeping.jdbc { - | driver = "$driver" - | url = "$url" - | user = "$user" - | password = "$password" - |} - |$configContents - |""".stripMargin - ).withFallback(ConfigFactory.load()) - .resolve() - - conf - } - - /* This method is used to inspect offsets after operations */ - private def debugOffsets(): Unit = { - JdbcNativeUtils.withResultSet(new JdbcUrlSelectorImpl(jdbcConfig), "SELECT * FROM \"offsets\"", 1) { rs => - val mt = rs.getMetaData - - for (i <- 1 to mt.getColumnCount) { - print(mt.getColumnName(i) + "\t") - } - println("") - - while (rs.next()) { - for (i <- 1 to mt.getColumnCount) { - print(rs.getString(i) + "\t") - } - println("") - } - } - } -} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala new file mode 100644 index 000000000..eac39826a --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineParquetLongSuite.scala @@ -0,0 +1,101 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.integration + +class IncrementalPipelineParquetLongSuite extends IncrementalPipelineLongFixture { + private val format = "parquet" + + "For inputs without information date the pipeline" should { + "work end to end as a normal run" in { + testOffsetOnlyNormalRun(format) + } + + "work with incremental ingestion and normal transformer" in { + testOffsetOnlyIncrementalIngestionNormalTransformer(format) + } + + "work end to end as rerun" in { + testOffsetOnlyRerun(format) + } + + "work end to end as rerun with deletion of records" in { + testOffsetOnlyRerunWithRecordsDeletion(format) + } + + "work end to end as rerun with deletion of records with previous data present" in { + testOffsetOnlyRerunWithRecordsDeletionAndPreviousDataPresent(format) + } + + "run for a historical date range with force update" in { + testOffsetOnlyHistoricalDateRangeWithForceUpdate(format) + } + + "run for a historical date range with fill gaps update" in { + testOffsetOnlyHistoricalDateRangeWithFillGaps(format) + } + + "deal with uncommitted offsets when no path" in { + testOffsetOnlyDealWithUncommittedOffsetsWithNoPath(format) + } + + "deal with uncommitted offsets when no data" in { + testOffsetOnlyDealWithUncommittedOffsetsWithNoData(format) + } + + "deal with uncommitted changes when there is data" in { + testOffsetOnlyDealWithUncommittedOffsetsWithData(format) + } + + "fail is the input data type does not conform" in { + testOffsetOnlyFailWhenInputDataDoesNotConform(format) + } + + "fail if the output table does not have the offset field" in { + testOffsetOnlyFailWhenInputTableDoestHaveOffsetField(format) + } + } + + "For inputs with information date the pipeline" should { + "work end to end as a normal run" in { + testWithInfoDateNormalRun(format) + } + + "work with incremental ingestion and normal transformer" in { + testWithInfoDateIncrementalIngestionNormalTransformer(format) + } + + "work end to end as rerun" in { + testWithInfoDateRerun(format) + } + + "work for historical runs" in { + testWithInfoDateHistoricalDateRange(format) + } + } + + "When the input column is timestamp" should { + "work end to end as a normal run" in { + testWithTimestampNormalRun(format) + } + } + + "Edge cases" should { + "offsets cross info days" in { + testOffsetCrossInfoDateEdgeCase(format) + } + } +}