From a8a77dc2c250aa55aa6172a65c1c7541541436e7 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 17 Sep 2024 14:19:15 +0200 Subject: [PATCH] #374 Implement the offset type: 'datetime' for incremental ingestion. --- .../absa/pramen/api/offset/OffsetValue.scala | 45 +- .../pramen/api/offset/OffsetValueSuite.scala | 101 +- pramen/build.sbt | 2 +- .../peristence/MetastorePersistence.scala | 2 +- .../MetastorePersistenceDelta.scala | 57 +- .../pipeline/IncrementalIngestionJob.scala | 48 +- .../pramen/core/reader/TableReaderDelta.scala | 79 -- .../pramen/core/reader/TableReaderSpark.scala | 41 +- .../reader/model/TableReaderJdbcConfig.scala | 2 +- .../jobrunner/ConcurrentJobRunnerImpl.scala | 7 +- .../ScheduleStrategyIncremental.scala | 20 +- .../absa/pramen/core/source/SparkSource.scala | 18 +- .../test/config/incremental_pipeline.conf | 6 +- .../test/config/incremental_pipeline_ts.conf | 97 ++ .../absa/pramen/core/base/SparkTestBase.scala | 1 + .../IncrementalPipelineLongSuite.scala | 917 ------------------ .../reader/TableReaderSparkFactory.scala | 4 +- .../bookkeeper/OffsetManagerJdbcSuite.scala | 116 +-- .../tests/reader/TableReaderSparkSuite.scala | 3 +- 19 files changed, 423 insertions(+), 1143 deletions(-) delete mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala create mode 100644 pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf delete mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala index 289e82166..b53247dea 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala @@ -17,7 +17,10 @@ package za.co.absa.pramen.api.offset import org.apache.spark.sql.Column -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{LongType, StringType => SparkStringType} + +import java.time.Instant sealed trait OffsetValue extends Comparable[OffsetValue] { def dataTypeString: String @@ -25,22 +28,46 @@ sealed trait OffsetValue extends Comparable[OffsetValue] { def valueString: String def getSparkLit: Column + + def getSparkCol(col: Column): Column } object OffsetValue { - val LONG_TYPE_STR = "long" + val DATETIME_TYPE_STR = "datetime" + val INTEGRAL_TYPE_STR = "integral" val STRING_TYPE_STR = "string" - case class LongType(value: Long) extends OffsetValue { - override val dataTypeString: String = LONG_TYPE_STR + val MINIMUM_TIMESTAMP_EPOCH_MILLI: Long = -62135596800000L + + case class DateTimeType(t: Instant) extends OffsetValue { + override val dataTypeString: String = DATETIME_TYPE_STR + + override def valueString: String = t.toEpochMilli.toString + + override def getSparkLit: Column = lit(t.toEpochMilli) + + override def getSparkCol(c: Column): Column = concat(unix_timestamp(c), date_format(c, "SSS")).cast(LongType) + + override def compareTo(o: OffsetValue): Int = { + o match { + case DateTimeType(otherValue) => t.compareTo(otherValue) + case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}") + } + } + } + + case class IntegralType(value: Long) extends OffsetValue { + override val dataTypeString: String = INTEGRAL_TYPE_STR override def valueString: String = value.toString override def getSparkLit: Column = lit(value) + override def getSparkCol(c: Column): Column = c.cast(LongType) + override def compareTo(o: OffsetValue): Int = { o match { - case LongType(otherValue) => value.compareTo(otherValue) + case IntegralType(otherValue) => value.compareTo(otherValue) case _ => throw new IllegalArgumentException(s"Cannot compare $dataTypeString with ${o.dataTypeString}") } } @@ -53,6 +80,8 @@ object OffsetValue { override def getSparkLit: Column = lit(s) + override def getSparkCol(c: Column): Column = c.cast(SparkStringType) + override def compareTo(o: OffsetValue): Int = { o match { case StringType(otherValue) => s.compareTo(otherValue) @@ -63,14 +92,16 @@ object OffsetValue { def getMinimumForType(dataType: String): OffsetValue = { dataType match { - case LONG_TYPE_STR => LongType(Long.MinValue) + case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(MINIMUM_TIMESTAMP_EPOCH_MILLI)) // LocalDateTime.of(1, 1, 1, 0, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli + case INTEGRAL_TYPE_STR => IntegralType(Long.MinValue) case STRING_TYPE_STR => StringType("") case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType") } } def fromString(dataType: String, value: String): OffsetValue = dataType match { - case LONG_TYPE_STR => LongType(value.toLong) + case DATETIME_TYPE_STR => DateTimeType(Instant.ofEpochMilli(value.toLong)) + case INTEGRAL_TYPE_STR => IntegralType(value.toLong) case STRING_TYPE_STR => StringType(value) case _ => throw new IllegalArgumentException(s"Unknown offset data type: $dataType") } diff --git a/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala b/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala index 9cc34e262..a7ed2820c 100644 --- a/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala +++ b/pramen/api/src/test/scala/za/co/absa/pramen/api/offset/OffsetValueSuite.scala @@ -16,16 +16,29 @@ package za.co.absa.pramen.api.offset -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{LongType, StringType} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.offset.OffsetValue.MINIMUM_TIMESTAMP_EPOCH_MILLI + +import java.time.Instant class OffsetValueSuite extends AnyWordSpec { "OffsetValue" should { - "be able to create a LongType instance" in { - val offsetValue = OffsetValue.LongType(42) - assert(offsetValue.dataTypeString == "long") + "be able to create a DateTimeType instance" in { + val offsetValue = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + assert(offsetValue.dataTypeString == "datetime") + assert(offsetValue.valueString == "1726564198000") + assert(offsetValue.getSparkLit == lit(1726564198000L)) + assert(offsetValue.getSparkCol(col("a")) == concat(unix_timestamp(col("a")), date_format(col("a"), "SSS")).cast(LongType)) + } + + "be able to create a IntegralType instance" in { + val offsetValue = OffsetValue.IntegralType(42) + assert(offsetValue.dataTypeString == "integral") assert(offsetValue.valueString == "42") assert(offsetValue.getSparkLit == lit(42L)) + assert(offsetValue.getSparkCol(col("a")) == col("a").cast(LongType)) } "be able to create a StringType instance" in { @@ -33,13 +46,20 @@ class OffsetValueSuite extends AnyWordSpec { assert(offsetValue.dataTypeString == "string") assert(offsetValue.valueString == "foo") assert(offsetValue.getSparkLit == lit("foo")) + assert(offsetValue.getSparkCol(col("a")) == col("a").cast(StringType)) } } "getMinimumForType" should { - "be able to get minimum value for long type" in { - val offsetValue = OffsetValue.getMinimumForType("long") - assert(offsetValue.dataTypeString == "long") + "be able to get minimum value for datetime type" in { + val offsetValue = OffsetValue.getMinimumForType("datetime") + assert(offsetValue.dataTypeString == "datetime") + assert(offsetValue.valueString == MINIMUM_TIMESTAMP_EPOCH_MILLI.toString) + } + + "be able to get minimum value for integral type" in { + val offsetValue = OffsetValue.getMinimumForType("integral") + assert(offsetValue.dataTypeString == "integral") assert(offsetValue.valueString == Long.MinValue.toString) } @@ -57,9 +77,15 @@ class OffsetValueSuite extends AnyWordSpec { } "fromString" should { - "be able to create a LongType instance from a string" in { - val offsetValue = OffsetValue.fromString("long", "42") - assert(offsetValue.dataTypeString == "long") + "be able to create a DateTimeType instance from a string" in { + val offsetValue = OffsetValue.fromString("datetime", "1726552310000") + assert(offsetValue.dataTypeString == "datetime") + assert(offsetValue.valueString == "1726552310000") + } + + "be able to create a IntegralType instance from a string" in { + val offsetValue = OffsetValue.fromString("integral", "42") + assert(offsetValue.dataTypeString == "integral") assert(offsetValue.valueString == "42") } @@ -76,4 +102,59 @@ class OffsetValueSuite extends AnyWordSpec { } } + "compareTo" should { + "compare 2 datetime values" in { + val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198001L)) + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare a datetime value with value of some other type" in { + val offsetValue1 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + val offsetValue2 = OffsetValue.IntegralType(42) + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + + "compare 2 integral values" in { + val offsetValue1 = OffsetValue.IntegralType(42) + val offsetValue2 = OffsetValue.IntegralType(43) + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare an integral value with value of some other type" in { + val offsetValue1 = OffsetValue.IntegralType(42) + val offsetValue2 = OffsetValue.StringType("foo") + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + + "compare 2 string values" in { + val offsetValue1 = OffsetValue.StringType("bar") + val offsetValue2 = OffsetValue.StringType("foo") + + assert(offsetValue1.compareTo(offsetValue2) < 0) + assert(offsetValue2.compareTo(offsetValue1) > 0) + assert(offsetValue2.compareTo(offsetValue2) == 0) + } + + "throw an exception when attempting to compare a string value with value of some other type" in { + val offsetValue1 = OffsetValue.StringType("foo") + val offsetValue2 = OffsetValue.DateTimeType(Instant.ofEpochMilli(1726564198000L)) + + assertThrows[IllegalArgumentException] { + offsetValue1.compareTo(offsetValue2) + } + } + } } diff --git a/pramen/build.sbt b/pramen/build.sbt index 2eb1fc058..17e987852 100644 --- a/pramen/build.sbt +++ b/pramen/build.sbt @@ -25,7 +25,7 @@ val scala213 = "2.13.13" ThisBuild / organization := "za.co.absa.pramen" -ThisBuild / scalaVersion := scala211 +ThisBuild / scalaVersion := scala212 ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213) ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala index 11564ebe6..2feb18b66 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala @@ -53,7 +53,7 @@ object MetastorePersistence { ) case DataFormat.Delta(query, recordsPerPartition) => new MetastorePersistenceDelta( - query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions + query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions ) case DataFormat.Raw(path) => new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala index 5827dcb1b..672b6ee9a 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala @@ -35,6 +35,8 @@ import scala.util.Try class MetastorePersistenceDelta(query: Query, infoDateColumn: String, infoDateFormat: String, + batchIdColumn: String, + batchId: Long, recordsPerPartition: Option[Long], saveModeOpt: Option[SaveMode], readOptions: Map[String, String], @@ -66,23 +68,22 @@ class MetastorePersistenceDelta(query: Query, val whereCondition = s"$infoDateColumn='$infoDateStr'" - val recordCount = numberOfRecordsEstimate match { - case Some(count) => count - case None => dfIn.count() - } + val dfRepartitioned = if (recordsPerPartition.nonEmpty) { + val recordCount = numberOfRecordsEstimate match { + case Some(count) => count + case None => dfIn.count() + } - val dfRepartitioned = applyRepartitioning(dfIn, recordCount) + applyRepartitioning(dfIn, recordCount) + } else { + dfIn + } val saveMode = saveModeOpt.getOrElse(SaveMode.Overwrite) - val operationStr = saveMode match { - case SaveMode.Append => "Appending to" - case _ => "Writing to" - } - - val isAppend = saveMode match { - case SaveMode.Append => true - case _ => false + val (isAppend, operationStr) = saveMode match { + case SaveMode.Append => (true, "Appending to") + case _ => (false, "Writing to") } if (log.isDebugEnabled) { @@ -116,8 +117,22 @@ class MetastorePersistenceDelta(query: Query, val stats = getStats(infoDate, isAppend) stats.dataSizeBytes match { - case Some(size) => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records (${StringUtils.prettySize(size)}) to ${query.query}") - case None => log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}") + case Some(size) => + stats.recordCountAppended match { + case Some(recordsAppended) => + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " + + s"new size: ${StringUtils.prettySize(size)}) to ${query.query}") + case None => + log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + + s"(${StringUtils.prettySize(size)}) to ${query.query}") + } + case None => + stats.recordCountAppended match { + case Some(recordsAppended) => + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}") + case None => + log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}") + } } stats @@ -125,7 +140,6 @@ class MetastorePersistenceDelta(query: Query, override def getStats(infoDate: LocalDate, onlyForCurrentBatchId: Boolean): MetaTableStats = { val df = loadTable(Option(infoDate), Option(infoDate)) - val recordCount = df.count() val sizeOpt = query match { case Query.Path(path) => @@ -142,7 +156,16 @@ class MetastorePersistenceDelta(query: Query, None } - MetaTableStats(recordCount, None, sizeOpt) + if (onlyForCurrentBatchId && df.schema.exists(_.name.equalsIgnoreCase(batchIdColumn))) { + val batchCount = df.filter(col(batchIdColumn) === batchId).count() + val countAll = df.count() + + MetaTableStats(countAll, Option(batchCount), sizeOpt) + } else { + val countAll = df.count() + + MetaTableStats(countAll, None, sizeOpt) + } } override def createOrUpdateHiveTable(infoDate: LocalDate, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 0a6d06404..3ebbfe0cd 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{IntegerType, LongType, ShortType, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.offset.{DataOffset, OffsetInfo, OffsetValue} import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} @@ -54,6 +54,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { val om = bookkeeper.getOffsetManager + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) val uncommittedOffsets = om.getOffsets(outputTable.name, infoDate).filter(_.committedAt.isEmpty) @@ -80,17 +81,15 @@ class IncrementalIngestionJob(operationDef: OperationDef, mt.getTable(outputTable.name, Option(infoDate), Option(infoDate)) } catch { case ex: AnalysisException => - log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...", ex) - - uncommittedOffsets.foreach { of => - log.warn(s"Cleaning uncommitted offset: $of...") - om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) - } - - latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + rollbackOffsets(infoDate, om, uncommittedOffsets) return } + if (df.isEmpty) { + rollbackOffsets(infoDate, om, uncommittedOffsets) + return + } + if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'. Cannot update uncommitted offsets.") } @@ -98,8 +97,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, val newMaxOffset = if (df.isEmpty) { minOffset } else { - val row = df.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0) - OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String]) + getMaximumOffsetFromDf(df, offsetInfo) } log.warn(s"Fixing uncommitted offsets. New offset to commit for ${outputTable.name} at $infoDate: " + @@ -116,6 +114,17 @@ class IncrementalIngestionJob(operationDef: OperationDef, latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) } + private[core] def rollbackOffsets(infoDate: LocalDate, om: OffsetManager, uncommittedOffsets: Array[DataOffset]): Unit = { + log.warn(s"No data found for ${outputTable.name}. Rolling back uncommitted offsets...") + + uncommittedOffsets.foreach { of => + log.warn(s"Cleaning uncommitted offset: $of...") + om.rollbackOffsets(DataOffsetRequest(outputTable.name, infoDate, of.minOffset, of.createdAt)) + } + + latestOffset = om.getMaxInfoDateAndOffset(outputTable.name, None) + } + override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { if (source.getOffsetInfo(sourceTable.query).nonEmpty) { Reason.Ready @@ -186,8 +195,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, om.rollbackOffsets(req) } } else { - val row = updatedDf.agg(max(col(offsetInfo.offsetColumn)).cast(StringType)).collect()(0) - val maxOffset = OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String]) + val maxOffset = getMaximumOffsetFromDf(df, offsetInfo) if (isRerun) { om.commitRerun(req, maxOffset) @@ -222,6 +230,13 @@ class IncrementalIngestionJob(operationDef: OperationDef, SaveResult(stats, warnings = tooLongWarnings) } + private[core] def getMaximumOffsetFromDf(df: DataFrame, offsetInfo: OffsetInfo): OffsetValue = { + val row = df.agg(max(offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn))) + .cast(StringType)) + .collect()(0) + OffsetValue.fromString(offsetInfo.minimalOffset.dataTypeString, row(0).asInstanceOf[String]) + } + private[core] def validateOffsetColumn(df: DataFrame, offsetInfo: OffsetInfo): Unit = { if (!df.schema.fields.exists(_.name.equalsIgnoreCase(offsetInfo.offsetColumn))) { throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' not found in the output table '${outputTable.name}'.") @@ -230,7 +245,12 @@ class IncrementalIngestionJob(operationDef: OperationDef, val field = df.schema.fields.find(_.name.equalsIgnoreCase(offsetInfo.offsetColumn)).get offsetInfo.minimalOffset match { - case v: OffsetValue.LongType => + case v: OffsetValue.DateTimeType => + if (!field.dataType.isInstanceOf[TimestampType]) { + throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " + + s"But only '${TimestampType.typeName}' is supported for offset type '${v.dataTypeString}'.") + } + case v: OffsetValue.IntegralType => if (!field.dataType.isInstanceOf[ShortType] && !field.dataType.isInstanceOf[IntegerType] && !field.dataType.isInstanceOf[LongType]) { throw new IllegalArgumentException(s"Offset column '${offsetInfo.offsetColumn}' has type '${field.dataType}'. " + s"But only integral types are supported for offset type '${v.dataTypeString}'.") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala deleted file mode 100644 index 856dd2a5b..000000000 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderDelta.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.pramen.core.reader - -import org.apache.spark.sql.functions.{col, lit} -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.api.{Query, TableReader} - -import java.time.LocalDate -import java.time.format.DateTimeFormatter - -class TableReaderDelta(infoDateColumn: String, - infoDateFormat: String = "yyyy-MM-dd" - )(implicit spark: SparkSession) extends TableReader { - - private val log = LoggerFactory.getLogger(this.getClass) - private val dateFormatter = DateTimeFormatter.ofPattern(infoDateFormat) - - override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = { - if (infoDateBegin.equals(infoDateEnd)) { - log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateBegin)}'") - } else { - log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'") - } - getFilteredDataFrame(query, infoDateBegin, infoDateEnd).count() - } - - override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = { - if (infoDateBegin.equals(infoDateEnd)) { - log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'") - } else { - log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'") - } - getFilteredDataFrame(query, infoDateBegin, infoDateEnd) - } - - override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ??? - - override def getIncrementalDataRange(query: Query, minOffset: OffsetValue, maxOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = ??? - - private def getFilteredDataFrame(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): DataFrame = { - val infoDateBeginStr = dateFormatter.format(infoDateBegin) - val infoDateEndStr = dateFormatter.format(infoDateEnd) - - val reader = query match { - case Query.Path(path) => - spark - .read - .format("delta") - .load(path) - case Query.Table(table) => - spark - .table(table) - case _ => throw new IllegalArgumentException(s"Arguments of type ${query.getClass} are not supported for Delta format.") - } - - if (infoDateBegin.equals(infoDateEnd)) { - reader.filter(col(s"$infoDateColumn") === lit(infoDateBeginStr)) - } else { - reader.filter(col(s"$infoDateColumn") >= lit(infoDateBeginStr) && col(s"$infoDateColumn") <= lit(infoDateEndStr)) - } - } -} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala index 9137d1ef7..d18d25ca9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/TableReaderSpark.scala @@ -18,11 +18,14 @@ package za.co.absa.pramen.core.reader import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.DateType import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} +import za.co.absa.pramen.api.sql.SqlColumnType import za.co.absa.pramen.api.{Query, TableReader} +import java.sql.Date import java.time.LocalDate import java.time.format.DateTimeFormatter @@ -30,6 +33,7 @@ class TableReaderSpark(formatOpt: Option[String], schemaOpt: Option[String], hasInfoDateColumn: Boolean, infoDateColumn: String, + infoDateDataType: SqlColumnType, infoDateFormat: String = "yyyy-MM-dd", offsetInfoOpt: Option[OffsetInfo], options: Map[String, String] = Map.empty[String, String] @@ -68,15 +72,16 @@ class TableReaderSpark(formatOpt: Option[String], override def getIncrementalData(query: Query, minOffset: OffsetValue, infoDateOpt: Option[LocalDate], columns: Seq[String]): DataFrame = { val offsetInfo = offsetInfoOpt.getOrElse(throw new IllegalArgumentException(s"Offset column and type is not defined for ${query.query}.")) + val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn)) infoDateOpt match { case Some(infoDate) if hasInfoDateColumn => log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate' AND ${offsetInfo.offsetColumn} > ${minOffset.valueString}") getData(query, infoDate, infoDate, columns) - .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit) + .filter(offsetCol > minOffset.getSparkLit) case _ => log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString}") getBaseDataFrame(query) - .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit) + .filter(offsetCol > minOffset.getSparkLit) } } @@ -87,9 +92,10 @@ class TableReaderSpark(formatOpt: Option[String], log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='$infoDate'") getData(query, infoDate, infoDate, columns) case _ => + val offsetCol = offsetInfo.minimalOffset.getSparkCol(col(offsetInfo.offsetColumn)) log.info(s"Reading * FROM ${query.query} WHERE ${offsetInfo.offsetColumn} > ${minOffset.valueString} AND ${offsetInfo.offsetColumn} <= ${maxOffset.valueString}") getBaseDataFrame(query) - .filter(col(offsetInfo.offsetColumn) > minOffset.getSparkLit && col(offsetInfo.offsetColumn) <= maxOffset.getSparkLit) + .filter(offsetCol > minOffset.getSparkLit && offsetCol <= maxOffset.getSparkLit) } } @@ -128,15 +134,26 @@ class TableReaderSpark(formatOpt: Option[String], } private[core] def getFilteredDataFrame(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): DataFrame = { - val infoDateBeginStr = dateFormatter.format(infoDateBegin) - val infoDateEndStr = dateFormatter.format(infoDateEnd) - - if (infoDateBegin.equals(infoDateEnd)) { - getBaseDataFrame(query) - .filter(col(s"$infoDateColumn") === lit(infoDateBeginStr)) - } else { - getBaseDataFrame(query) - .filter(col(s"$infoDateColumn") >= lit(infoDateBeginStr) && col(s"$infoDateColumn") <= lit(infoDateEndStr)) + infoDateDataType match { + case SqlColumnType.DATETIME => + if (infoDateBegin.equals(infoDateEnd)) { + getBaseDataFrame(query) + .filter(col(infoDateColumn).cast(DateType) === lit(Date.valueOf(infoDateBegin))) + } else { + getBaseDataFrame(query) + .filter(col(infoDateColumn).cast(DateType) >= lit(Date.valueOf(infoDateBegin)) && col(infoDateColumn).cast(DateType) <= lit(Date.valueOf(infoDateEnd))) + } + case _ => + val infoDateBeginStr = dateFormatter.format(infoDateBegin) + val infoDateEndStr = dateFormatter.format(infoDateEnd) + + if (infoDateBegin.equals(infoDateEnd)) { + getBaseDataFrame(query) + .filter(col(infoDateColumn) === lit(infoDateBeginStr)) + } else { + getBaseDataFrame(query) + .filter(col(infoDateColumn) >= lit(infoDateBeginStr) && col(infoDateColumn) <= lit(infoDateEndStr)) + } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala index e3f5d8b9e..db49337d9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/reader/model/TableReaderJdbcConfig.scala @@ -101,7 +101,7 @@ object TableReaderJdbcConfig { infoDateFormat, hasOffsetColumn, ConfigUtils.getOptionString(conf, OFFSET_COLUMN_NAME_KEY).getOrElse(""), - ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("long"), + ConfigUtils.getOptionString(conf, OFFSET_COLUMN_TYPE_KEY).getOrElse("integral"), limitRecords = ConfigUtils.getOptionInt(conf, JDBC_SYNC_LIMIT_RECORDS), saveTimestampsAsDates, correctDecimalsInSchema = ConfigUtils.getOptionBoolean(conf, CORRECT_DECIMALS_IN_SCHEMA).getOrElse(false), diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index 4122bea5c..8703c7295 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -92,7 +92,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.send((job, Nil, isSucceeded)) } catch { case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient) - case NonFatal(ex) => sendFailure(ex, job, isTransient) + case NonFatal(ex) => onNonFatalException(ex, job, isTransient) case ex: Throwable => onFatalException(ex, job, isTransient) } } @@ -105,6 +105,11 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, sendFailure(fatalEx, job, isTransient) } + private[core] def onNonFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = { + log.error(s"${Emoji.FAILURE} Job '${job.name}' outputting to '${job.outputTable.name}' has thrown an error", ex) + sendFailure(ex, job, isTransient) + } + private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = { completedJobsChannel.send((job, TaskResult(job.name, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala index 9bbccfcb2..0af717648 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala @@ -43,19 +43,13 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has val infoDate = evaluateRunDate(runDate, infoDateExpression) log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate") - val runInfoDays = lastOffsets match { - case Some(offset) => - if (offset.maximumInfoDate.isAfter(infoDate)) { - Seq.empty - } else { - Seq(infoDate) - .map(d => TaskPreDef(d, TaskRunReason.New)) - } - case None => - if (hasInfoDateColumn) - Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) - else - Seq(TaskPreDef(infoDate, TaskRunReason.New)) + val runInfoDays = if (hasInfoDateColumn) + Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) + else { + lastOffsets match { + case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty + case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New)) + } } log.info(s"Days to run: ${runInfoDays.map(_.infoDate).mkString(", ")}") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala index b5277a65a..757d6d111 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue} +import za.co.absa.pramen.api.sql.SqlColumnType import za.co.absa.pramen.core.config.Keys.KEYS_TO_REDACT import za.co.absa.pramen.core.reader.TableReaderSpark import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig._ @@ -33,6 +34,7 @@ class SparkSource(val format: Option[String], val schema: Option[String], val hasInfoDateCol: Boolean, val infoDateColumn: String, + val infoDateType: SqlColumnType, val infoDateFormat: String, val hasOffsetColumn: Boolean, val offsetColumn: String, @@ -74,13 +76,13 @@ class SparkSource(val format: Option[String], val tableReader = query match { case Query.Table(table) => log.info(s"Using TableReaderSpark to read table: $table") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) case Query.Sql(sql) => log.info(s"Using TableReaderSpark to read SQL for: $sql") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) case Query.Path(path) => log.info(s"Using TableReaderSpark to read '$format' from: $path") - new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the Spark source. Use 'path', 'table' or 'sql' instead.") } @@ -134,10 +136,12 @@ object SparkSource extends ExternalChannelFactory[SparkSource] { val schema = ConfigUtils.getOptionString(conf, SCHEMA) val hasInfoDate = conf.hasPath(HAS_INFO_DATE) && conf.getBoolean(HAS_INFO_DATE) - val (infoDateColumn, infoDateFormat) = if (hasInfoDate) { - (conf.getString(INFORMATION_DATE_COLUMN), getInfoDateFormat(conf)) + val (infoDateColumn, infoDateType, infoDateFormat) = if (hasInfoDate) { + val infoDateTypeStr = ConfigUtils.getOptionString(conf, INFORMATION_DATE_TYPE).getOrElse("date") + val infoDateType = SqlColumnType.fromString(infoDateTypeStr).getOrElse(throw new IllegalArgumentException(s"Unknown info date type: $infoDateTypeStr")) + (conf.getString(INFORMATION_DATE_COLUMN), infoDateType, getInfoDateFormat(conf)) } else { - ("", "") + ("", SqlColumnType.DATE, "") } val hasOffsetColumn = ConfigUtils.getOptionBoolean(conf, OFFSET_COLUMN_ENABLED_KEY).getOrElse(false) @@ -150,6 +154,6 @@ object SparkSource extends ExternalChannelFactory[SparkSource] { val options = ConfigUtils.getExtraOptions(conf, "option") - new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateFormat, hasOffsetColumn, offsetColumn, offsetDataType, conf, options)(spark) + new SparkSource(format, schema, hasInfoDate, infoDateColumn, infoDateType, infoDateFormat, hasOffsetColumn, offsetColumn, offsetDataType, conf, options)(spark) } } diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf index 6ba0a5e40..8474a4f34 100644 --- a/pramen/core/src/test/resources/test/config/incremental_pipeline.conf +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline.conf @@ -27,12 +27,12 @@ pramen.metastore { tables = [ { name = "table1" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table1 }, { name = "table2" - format = "parquet" + format = ${metastore.format} path = ${base.path}/table2 } ] @@ -51,7 +51,7 @@ pramen.sources.1 = [ offset.column { enabled = true name = "id" - type = "long" + type = "integral" } format = "csv" diff --git a/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf new file mode 100644 index 000000000..eee4f9601 --- /dev/null +++ b/pramen/core/src/test/resources/test/config/incremental_pipeline_ts.conf @@ -0,0 +1,97 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This variable is expected to be set up by the test suite +#base.path = "/tmp" + +pramen { + pipeline.name = "Integration test with a file-based source" + + bookkeeping.enabled = true + stop.spark.session = false + + track.days = 2 # Previous day and today +} + +pramen.metastore { + tables = [ + { + name = "table1" + format = ${metastore.format} + path = ${base.path}/table1 + }, + { + name = "table2" + format = ${metastore.format} + path = ${base.path}/table2 + } + ] +} + +pramen.sources.1 = [ + { + name = "spark_source" + factory.class = "za.co.absa.pramen.core.source.SparkSource" + + has.information.date.column = ${has.information.date.column} + information.date.column = "ts" + information.date.type = "datetime" + + offset.column { + enabled = true + name = "ts" + type = "datetime" + } + + format = "parquet" + } +] + +pramen.operations = [ + { + name = "Sourcing from a folder" + type = "ingestion" + schedule.type = "incremental" + + source = "spark_source" + + info.date.expr = "@runDate" + + tables = [ + { + input.path = ${base.path}/landing + output.metastore.table = table1 + } + ] + }, + { + name = "Running a transformer" + type = "transformation" + + class = "za.co.absa.pramen.core.transformers.IdentityTransformer" + schedule.type = ${transformer.schedule} + + output.table = "table2" + + dependencies = [ + { + tables = [ table1 ] + date.from = "@infoDate" + } + ] + + option { + input.table = "table1" + } + } +] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/base/SparkTestBase.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/base/SparkTestBase.scala index 892d62a31..34114145c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/base/SparkTestBase.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/base/SparkTestBase.scala @@ -33,6 +33,7 @@ trait SparkTestBase { .config("spark.sql.session.timeZone", "Africa/Johannesburg") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") .config("spark.sql.shuffle.partitions", "1") .getOrCreate() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala deleted file mode 100644 index 2378c0a9b..000000000 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/IncrementalPipelineLongSuite.scala +++ /dev/null @@ -1,917 +0,0 @@ -/* - * Copyright 2022 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.pramen.core.integration - -import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.functions.lit -import org.scalatest.wordspec.AnyWordSpec -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import za.co.absa.pramen.api.offset.OffsetValue -import za.co.absa.pramen.core.base.SparkTestBase -import za.co.absa.pramen.core.bookkeeper.OffsetManagerJdbc -import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.rdb.PramenDb -import za.co.absa.pramen.core.reader.JdbcUrlSelectorImpl -import za.co.absa.pramen.core.reader.model.JdbcConfig -import za.co.absa.pramen.core.runner.AppRunner -import za.co.absa.pramen.core.utils.{FsUtils, JdbcNativeUtils, ResourceUtils} - -import java.time.LocalDate - -class IncrementalPipelineLongSuite extends AnyWordSpec - with SparkTestBase - with RelationalDbFixture - with BeforeAndAfter - with BeforeAndAfterAll - with TempDirFixture - with TextComparisonFixture { - - val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password)) - val pramenDb: PramenDb = PramenDb(jdbcConfig) - - before { - pramenDb.rdb.executeDDL("DROP SCHEMA PUBLIC CASCADE;") - pramenDb.setupDatabase() - } - - override def afterAll(): Unit = { - pramenDb.close() - super.afterAll() - } - - private val infoDate = LocalDate.of(2021, 2, 18) - - "For inputs without information date the pipeline" should { - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |""".stripMargin - - val expected2 = - """{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - val expectedAll = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - "work end to end as a normal run" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work with incremental ingestion and normal transformer" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir, isTransformerIncremental = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work end to end as rerun" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - // Expecting original records - compareText(actualTable1After, expected1) - compareText(actualTable2After, expected1) - } - } - - "work end to end as rerun with deletion of records" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.isEmpty) - - // Expecting empty records - assert(dfTable1After.isEmpty) - assert(dfTable2After.isEmpty) - } - } - - "work end to end as rerun with deletion of records with previous data present" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path0 = new Path(tempDir, new Path("landing", "landing_file0.csv")) - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path0, "id,name\n0,Old\n") - - val conf0 = getConfig(tempDir, useInfoDate = infoDate.minusDays(1)) - - val exitCode0 = AppRunner.runPipeline(conf0) - fsUtils.deleteFile(path1) - - assert(exitCode0 == 0) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isRerun = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.isEmpty) - - // Expecting empty records - assert(dfTable1After.isEmpty) - assert(dfTable2After.isEmpty) - } - } - - "run for a historical date range with force update" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isHistoricalRun = true, useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should be the same since they were re-ran - assert(batchIdsOld.head == batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - - "run for a historical date range with fill gaps update" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf1 = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val conf2 = getConfig(tempDir, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should not be the same since they we ran only for missing data - assert(batchIdsOld.head != batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - - "deal with uncommitted changes when no data" in { - val om = new OffsetManagerJdbc(pramenDb.db, 123L) - - om.startWriteOffsets("table1", infoDate, OffsetValue.fromString("long", "1")) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val dfTable1 = spark.read.parquet(table1Path.toString) - val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1, expected1) - - val batchIds = dfTable1.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val offsets = om.getOffsets("table1", infoDate).sortBy(_.createdAt) - - assert(offsets.length == 1) - - assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) - assert(offsets.head.maxOffset.get.valueString.toLong == 3) - assert(offsets.head.committedAt.nonEmpty) - } - } - - "deal with uncommitted changes when there is data" in { - val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) - om1.startWriteOffsets("table1", infoDate, OffsetValue.LongType(Long.MinValue)) - - Thread.sleep(10) - - val om2 = new OffsetManagerJdbc(pramenDb.db, 123L) - om2.startWriteOffsets("table1", infoDate, OffsetValue.LongType(2)) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - fsUtils.writeFile(path2, "id,name\n4,Mary\n5,Jane\n6,Kate\n") - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - - val df = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(path1.toString) - .withColumn("pramen_batchid", lit(123L)) - - df.write.parquet(table1Path.toString) - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val dfTable1 = spark.read.parquet(table1Path.toString) - val dfTable2 = spark.read.parquet(table2Path.toString) - val actualTable1 = dfTable1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2 = dfTable2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1, expectedAll) - compareText(actualTable2, expected2) // ToDo This logic is to be changed when incremental transformations are supported - - val batchIds = dfTable1.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val offsets = om2.getOffsets("table1", infoDate).sortBy(_.createdAt) - - assert(offsets.length == 2) - - assert(offsets.head.minOffset.valueString.toLong == Long.MinValue) - assert(offsets.head.maxOffset.get.valueString.toLong == 3) - assert(offsets.head.committedAt.nonEmpty) - assert(offsets(1).minOffset.valueString.toLong == 3) - assert(offsets(1).maxOffset.get.valueString.toLong == 6) - assert(offsets(1).committedAt.nonEmpty) - } - } - - "fail is the input data type does not conform" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - fsUtils.writeFile(path1, "id,name\n1,John\n2,Jack\n3,Jill\n") - - val conf = getConfig(tempDir, inferSchema = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 2) - } - } - - "fail if the output table does not have the offset field" in { - val om1 = new OffsetManagerJdbc(pramenDb.db, 123L) - om1.startWriteOffsets("table1", infoDate, OffsetValue.LongType(Long.MinValue)) - - Thread.sleep(10) - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, "name\nJohn\nJack\nJill\n") - fsUtils.writeFile(path2, "name\nMary\nJane\nKate\n") - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - - val df = spark.read - .option("header", "true") - .option("inferSchema", "true") - .csv(path1.toString) - .withColumn("pramen_batchid", lit(123L)) - - df.write.parquet(table1Path.toString) - - val conf = getConfig(tempDir) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 2) - } - } - } - - "For inputs with information date the pipeline" should { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(1)}\n" - val csv2Str = s"id,name,info_date\n1,John,${infoDate.minusDays(1)}\n4,Mary,$infoDate\n5,Jane,$infoDate\n6,Kate,$infoDate\n999,New2,${infoDate.plusDays(1)}\n" - - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |""".stripMargin - - val expected2 = - """{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - val expectedAll = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |{"id":5,"name":"Jane"} - |{"id":6,"name":"Kate"} - |""".stripMargin - - "work end to end as a normal run" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work with incremental ingestion and normal transformer" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true, isTransformerIncremental = false) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val exitCode2 = AppRunner.runPipeline(conf) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 2) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1After, expectedAll) - compareText(actualTable2After, expectedAll) - } - } - - "work end to end as rerun" in { - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - fsUtils.writeFile(path1, csv1Str) - - val conf = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf) - assert(exitCode1 == 0) - - val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path.toString) - val dfTable2Before = spark.read.parquet(table2Path.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - fsUtils.deleteFile(path1) - fsUtils.writeFile(path2, csv2Str) - - val conf2 = getConfig(tempDir, isRerun = true, hasInfoDate = true) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val dfTable1After = spark.read.parquet(table1Path.toString) - val dfTable2After = spark.read.parquet(table2Path.toString) - - val batchIds = dfTable1After.select("pramen_batchid").distinct().collect() - - assert(batchIds.length == 1) - - val actualTable1After = dfTable1After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After = dfTable2After.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - // Expecting original records - compareText(actualTable1After, expected2) - compareText(actualTable2After, expected2) - } - } - - "work for historical runs" in { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(1)}\n1,John,$infoDate\n2,Jack,$infoDate\n3,Jill,$infoDate\n99,New,${infoDate.plusDays(2)}\n" - val csv2Str = s"id,name,info_date\n4,Mary,${infoDate.plusDays(1)}\n5,Jane,${infoDate.plusDays(1)}\n6,Kate,${infoDate.plusDays(1)}\n999,New2,${infoDate.plusDays(2)}\n" - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - val path2 = new Path(tempDir, new Path("landing", "landing_file2.csv")) - - fsUtils.writeFile(path1, csv1Str) - fsUtils.writeFile(path2, csv2Str) - - val conf1 = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1Before = spark.read.parquet(table1Path1.toString) - val dfTable2Before = spark.read.parquet(table2Path1.toString) - val actualTable1Before = dfTable1Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2Before = dfTable2Before.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1Before, expected1) - compareText(actualTable2Before, expected1) - - - val conf2 = getConfig(tempDir, hasInfoDate = true, isHistoricalRun = true, historyRunMode = "fill_gaps", useInfoDate = infoDate.plusDays(1)) - val exitCode2 = AppRunner.runPipeline(conf2) - assert(exitCode2 == 0) - - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.plusDays(1)}") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.plusDays(1)}") - val dfTable1After1 = spark.read.parquet(table1Path1.toString) - val dfTable2After1 = spark.read.parquet(table2Path1.toString) - val dfTable1After2 = spark.read.parquet(table1Path2.toString) - val dfTable2After2 = spark.read.parquet(table2Path2.toString) - val actualTable1After1 = dfTable1After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After1 = dfTable2After1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1After2 = dfTable1After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2After2 = dfTable2After2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - val batchIdsOld = dfTable1After1.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - val batchIdsNew = dfTable1After2.select("pramen_batchid").distinct().collect().map(_.getLong(0)) - - assert(batchIdsOld.length == 1) - assert(batchIdsNew.length == 1) - - // The batch id for all info dates in range should not be the same since they we ran only for missing data - assert(batchIdsOld.head != batchIdsNew.head) - - // Expecting empty records - compareText(actualTable1After1, expected1) - compareText(actualTable2After1, expected1) - compareText(actualTable1After2, expected2) - compareText(actualTable2After2, expected2) - } - } - } - - "Edge cases" should { - val expected1 = - """{"id":1,"name":"John"} - |{"id":2,"name":"Jack"} - |""".stripMargin - - val expected2 = - """{"id":3,"name":"Jill"} - |{"id":4,"name":"Mary"} - |""".stripMargin - - "offsets cross info days" in { - val csv1Str = s"id,name,info_date\n0,Old,${infoDate.minusDays(2)}\n1,John,${infoDate.minusDays(1)}\n2,Jack,${infoDate.minusDays(1)}\n3,Jill,$infoDate\n4,Mary,$infoDate\n" - - withTempDirectory("incremental1") { tempDir => - val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) - - val path1 = new Path(tempDir, new Path("landing", "landing_file1.csv")) - - fsUtils.writeFile(path1, csv1Str) - - val conf1 = getConfig(tempDir, hasInfoDate = true) - - val exitCode1 = AppRunner.runPipeline(conf1) - assert(exitCode1 == 0) - - val table1Path1 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=${infoDate.minusDays(1)}") - val table2Path1 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=${infoDate.minusDays(1)}") - val table1Path2 = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate") - val table2Path2 = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") - val dfTable1_1 = spark.read.parquet(table1Path1.toString) - val dfTable2_1 = spark.read.parquet(table2Path1.toString) - val dfTable1_2 = spark.read.parquet(table1Path2.toString) - val dfTable2_2 = spark.read.parquet(table2Path2.toString) - - val actualTable1_1 = dfTable1_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2_1 = dfTable2_1.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable1_2 = dfTable1_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - val actualTable2_2 = dfTable2_2.select("id", "name").orderBy("id").toJSON.collect().mkString("\n") - - compareText(actualTable1_1, expected1) - compareText(actualTable2_1, expected1) - compareText(actualTable1_2, expected2) - compareText(actualTable2_2, expected2) - - val om = new OffsetManagerJdbc(pramenDb.db, 123L) - - val offsets1 = om.getOffsets("table1", infoDate.minusDays(1)) - assert(offsets1.length == 1) - assert(offsets1.head.minOffset.valueString.toLong == Long.MinValue ) - assert(offsets1.head.maxOffset.get.valueString.toLong == 2) - assert(offsets1.head.committedAt.nonEmpty) - - val offsets2 = om.getOffsets("table1", infoDate) - - assert(offsets2.length == 1) - assert(offsets2.head.minOffset.valueString.toLong == Long.MinValue) - assert(offsets2.head.maxOffset.get.valueString.toLong == 4) - assert(offsets2.head.committedAt.nonEmpty) - } - } - } - - def getConfig(basePath: String, - isRerun: Boolean = false, - useDataFrame: Boolean = false, - isTransformerIncremental: Boolean = true, - isHistoricalRun: Boolean = false, - historyRunMode: String = "force", - inferSchema: Boolean = true, - hasInfoDate: Boolean = false, - useInfoDate: LocalDate = infoDate): Config = { - val configContents = ResourceUtils.getResourceString("/test/config/incremental_pipeline.conf") - val basePathEscaped = basePath.replace("\\", "\\\\") - val transformerSchedule = if (isTransformerIncremental) "incremental" else "daily" - val historicalConfigStr = if (isHistoricalRun) { - s"""pramen.load.date.from = "${useInfoDate.minusDays(1)}" - |pramen.load.date.to = "$useInfoDate" - |pramen.runtime.run.mode = "$historyRunMode" - |""".stripMargin - } else { - "" - } - - val conf = ConfigFactory.parseString( - s"""base.path = "$basePathEscaped" - |use.dataframe = $useDataFrame - |pramen.runtime.is.rerun = $isRerun - |pramen.current.date = "$useInfoDate" - |transformer.schedule = "$transformerSchedule" - |infer.schema = $inferSchema - |$historicalConfigStr - |has.information.date.column = $hasInfoDate - | - |pramen.bookkeeping.jdbc { - | driver = "$driver" - | url = "$url" - | user = "$user" - | password = "$password" - |} - |$configContents - |""".stripMargin - ).withFallback(ConfigFactory.load()) - .resolve() - - conf - } - - /* This method is used to inspect offsets after operations */ - private def debugOffsets(): Unit = { - JdbcNativeUtils.withResultSet(new JdbcUrlSelectorImpl(jdbcConfig), "SELECT * FROM \"offsets\"", 1) { rs => - val mt = rs.getMetaData - - for (i <- 1 to mt.getColumnCount) { - print(mt.getColumnName(i) + "\t") - } - println("") - - while (rs.next()) { - for (i <- 1 to mt.getColumnCount) { - print(rs.getString(i) + "\t") - } - println("") - } - } - } -} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/TableReaderSparkFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/TableReaderSparkFactory.scala index d0c7e912f..728a6d5a6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/TableReaderSparkFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/reader/TableReaderSparkFactory.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.mocks.reader import org.apache.spark.sql.SparkSession import za.co.absa.pramen.api.offset.OffsetInfo +import za.co.absa.pramen.api.sql.SqlColumnType import za.co.absa.pramen.core.reader.TableReaderSpark object TableReaderSparkFactory { @@ -25,10 +26,11 @@ object TableReaderSparkFactory { schemaOpt: Option[String] = None, hasInfoDateColumn: Boolean = false, infoDateColumn: String = "info_date", + infoDateType: SqlColumnType = SqlColumnType.DATE, infoDateFormat: String = "yyyy-MM-dd", offsetInfoOpt: Option[OffsetInfo] = None, options: Map[String, String] = Map.empty[String, String] )(implicit spark: SparkSession): TableReaderSpark = { - new TableReaderSpark(formatOpt, schemaOpt, hasInfoDateColumn, infoDateColumn, infoDateFormat, offsetInfoOpt, options) + new TableReaderSpark(formatOpt, schemaOpt, hasInfoDateColumn, infoDateColumn, infoDateType, infoDateFormat, offsetInfoOpt, options) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala index 195a46516..640b9aa5d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerJdbcSuite.scala @@ -60,7 +60,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B val nextHour = now + 3600000 val om = getOffsetManager - om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) + om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) val actualNonEmpty = om.getOffsets("table1", infoDate) val actualEmpty1 = om.getOffsets("table1", infoDate.plusDays(1)) @@ -78,7 +78,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B assert(offset.createdAt >= now) assert(offset.createdAt < nextHour) assert(offset.committedAt.isEmpty) - assert(offset.minOffset == OffsetValue.getMinimumForType("long")) + assert(offset.minOffset == OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) assert(offset.maxOffset.isEmpty) } @@ -87,9 +87,9 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B val nextHour = now + 3600000 val om = getOffsetManager - val transactionReference = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) + val transactionReference = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) Thread.sleep(10) - om.commitOffsets(transactionReference, OffsetValue.LongType(100)) + om.commitOffsets(transactionReference, OffsetValue.IntegralType(100)) val actualNonEmpty = om.getOffsets("table1", infoDate) val actualEmpty1 = om.getOffsets("table1", infoDate.plusDays(1)) @@ -109,8 +109,8 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B assert(offset.committedAt.get >= now) assert(offset.committedAt.get < nextHour) assert(offset.committedAt.get > actualNonEmpty.head.createdAt) - assert(offset.minOffset == OffsetValue.getMinimumForType("long")) - assert(offset.maxOffset.get == OffsetValue.fromString("long", "100")) + assert(offset.minOffset == OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) + assert(offset.maxOffset.get == OffsetValue.IntegralType(100)) } } @@ -134,16 +134,16 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "return the maximum info date and offsets " in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType("long")) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitOffsets(t2, OffsetValue.LongType(200)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitOffsets(t2, OffsetValue.IntegralType(200)) Thread.sleep(10) - val t3 = om.startWriteOffsets("table1", infoDate.plusDays(1), OffsetValue.LongType(200)) - om.commitOffsets(t3, OffsetValue.LongType(300)) + val t3 = om.startWriteOffsets("table1", infoDate.plusDays(1), OffsetValue.IntegralType(200)) + om.commitOffsets(t3, OffsetValue.IntegralType(300)) val summaryMultiDay = om.getMaxInfoDateAndOffset("table1", None) val summarySingleDay = om.getMaxInfoDateAndOffset("table1", Some(infoDate)) @@ -156,14 +156,14 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B assert(actualM.tableName == "table1") assert(actualM.maximumInfoDate == infoDate.plusDays(1)) - assert(actualM.minimumOffset == OffsetValue.LongType(200)) - assert(actualM.maximumOffset == OffsetValue.LongType(300)) + assert(actualM.minimumOffset == OffsetValue.IntegralType(200)) + assert(actualM.maximumOffset == OffsetValue.IntegralType(300)) assert(actualM.offsetsForTheDay.length == 1) assert(actual1.tableName == "table1") assert(actual1.maximumInfoDate == infoDate) - assert(actual1.minimumOffset == OffsetValue.getMinimumForType("long")) - assert(actual1.maximumOffset == OffsetValue.LongType(200)) + assert(actual1.minimumOffset == OffsetValue.getMinimumForType(OffsetValue.INTEGRAL_TYPE_STR)) + assert(actual1.maximumOffset == OffsetValue.IntegralType(200)) assert(actual1.offsetsForTheDay.length == 2) } } @@ -172,8 +172,8 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work for empty offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) val offsets = om.getOffsets("table1", infoDate) @@ -182,19 +182,19 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B val offset = offsets.head assert(offset.tableName == "table1") assert(offset.infoDate == infoDate) - assert(offset.minOffset == OffsetValue.LongType(0)) - assert(offset.maxOffset.get == OffsetValue.LongType(100)) + assert(offset.minOffset == OffsetValue.IntegralType(0)) + assert(offset.maxOffset.get == OffsetValue.IntegralType(100)) } "work for non-empty offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitOffsets(t2, OffsetValue.LongType(200)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitOffsets(t2, OffsetValue.IntegralType(200)) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -203,14 +203,14 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B val offset1 = offsets.head assert(offset1.tableName == "table1") assert(offset1.infoDate == infoDate) - assert(offset1.minOffset == OffsetValue.LongType(0)) - assert(offset1.maxOffset.get == OffsetValue.LongType(100)) + assert(offset1.minOffset == OffsetValue.IntegralType(0)) + assert(offset1.maxOffset.get == OffsetValue.IntegralType(100)) val offset2 = offsets(1) assert(offset2.tableName == "table1") assert(offset2.infoDate == infoDate) - assert(offset2.minOffset == OffsetValue.LongType(100)) - assert(offset2.maxOffset.get == OffsetValue.LongType(200)) + assert(offset2.minOffset == OffsetValue.IntegralType(100)) + assert(offset2.maxOffset.get == OffsetValue.IntegralType(200)) } } @@ -218,16 +218,16 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work with reruns that do not change offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitOffsets(t2, OffsetValue.LongType(200)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitOffsets(t2, OffsetValue.IntegralType(200)) Thread.sleep(10) - val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(200)) - om.commitRerun(t3, OffsetValue.LongType(200)) + val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(200)) + om.commitRerun(t3, OffsetValue.IntegralType(200)) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -244,16 +244,16 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work with reruns that do change offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetValue.LongType(0)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetValue.IntegralType(0)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitOffsets(t2, OffsetValue.LongType(200)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitOffsets(t2, OffsetValue.IntegralType(200)) Thread.sleep(10) - val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(200)) - om.commitRerun(t3, OffsetValue.LongType(300)) + val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(200)) + om.commitRerun(t3, OffsetValue.IntegralType(300)) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -269,16 +269,16 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work with reruns that deletes all data and no previous offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(Long.MinValue)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(Long.MinValue)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitOffsets(t2, OffsetValue.LongType(200)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitOffsets(t2, OffsetValue.IntegralType(200)) Thread.sleep(10) - val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(200)) - om.commitRerun(t3, OffsetValue.LongType(-1)) + val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(200)) + om.commitRerun(t3, OffsetValue.IntegralType(-1)) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -294,16 +294,16 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work with reruns that deletes all data and there are previous offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetValue.LongType(Long.MinValue)) - om.commitOffsets(t1, OffsetValue.LongType(0)) + val t1 = om.startWriteOffsets("table1", infoDate.minusDays(1), OffsetValue.IntegralType(Long.MinValue)) + om.commitOffsets(t1, OffsetValue.IntegralType(0)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) - om.commitOffsets(t2, OffsetValue.LongType(100)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) + om.commitOffsets(t2, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) - om.commitRerun(t3, OffsetValue.LongType(Long.MinValue)) + val t3 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) + om.commitRerun(t3, OffsetValue.IntegralType(Long.MinValue)) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -321,7 +321,7 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work for empty offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) om.rollbackOffsets(t1) val offsets = om.getOffsets("table1", infoDate) @@ -332,11 +332,11 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B "work for non-empty offsets" in { val om = getOffsetManager - val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(0)) - om.commitOffsets(t1, OffsetValue.LongType(100)) + val t1 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(0)) + om.commitOffsets(t1, OffsetValue.IntegralType(100)) Thread.sleep(10) - val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.LongType(100)) + val t2 = om.startWriteOffsets("table1", infoDate, OffsetValue.IntegralType(100)) om.rollbackOffsets(t2) val offsets = om.getOffsets("table1", infoDate).sortBy(_.minOffset.valueString.toLong) @@ -346,8 +346,8 @@ class OffsetManagerJdbcSuite extends AnyWordSpec with RelationalDbFixture with B val offset1 = offsets.head assert(offset1.tableName == "table1") assert(offset1.infoDate == infoDate) - assert(offset1.minOffset == OffsetValue.LongType(0)) - assert(offset1.maxOffset.get == OffsetValue.LongType(100)) + assert(offset1.minOffset == OffsetValue.IntegralType(0)) + assert(offset1.maxOffset.get == OffsetValue.IntegralType(100)) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderSparkSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderSparkSuite.scala index f22456b4c..74b592b47 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderSparkSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/reader/TableReaderSparkSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DateType, IntegerType, StringType} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.sql.SqlColumnType import za.co.absa.pramen.api.{Query, TableReader} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.TempDirFixture @@ -327,7 +328,7 @@ class TableReaderSparkSuite extends AnyWordSpec with SparkTestBase with TempDirF val query = Query.Path(pathBase.toString) - (new TableReaderSpark(formatOpt, schemaOpt, hasInfoDate, "info_date", offsetInfoOpt = None, options = effectiveOptions), query) + (new TableReaderSpark(formatOpt, schemaOpt, hasInfoDate, "info_date", SqlColumnType.DATE, offsetInfoOpt = None, options = effectiveOptions), query) } }