From fe94be884756c5c214f733884edea6ef70feb5e3 Mon Sep 17 00:00:00 2001 From: Ruslan Yushchenko Date: Thu, 21 Nov 2024 10:55:54 +0100 Subject: [PATCH] #520 Fix handling of incremental ingestion for 'raw' format of metastore tables. (#521) --- .../absa/pramen/api/offset/OffsetValue.scala | 4 +- .../pramen/core/metastore/MetastoreImpl.scala | 4 +- .../peristence/MetastorePersistence.scala | 2 +- .../peristence/MetastorePersistenceRaw.scala | 38 ++++++++++++++----- .../peristence/TransientTableManager.scala | 3 ++ .../PipelineNotificationBuilderHtml.scala | 1 + .../pipeline/IncrementalIngestionJob.scala | 16 +++++--- 7 files changed, 48 insertions(+), 20 deletions(-) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala index dd3a3642a..b6d6b7ea7 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala @@ -77,9 +77,9 @@ object OffsetValue { } def fromString(dataType: String, value: String): Option[OffsetValue] = { - if (value.isEmpty) + if (value == null || value.isEmpty) { None - else + } else dataType match { case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong))) case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong)) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 092e5dee6..ab2d3a6c9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config, override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { val mt = getTableDef(tableName) - MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo) + MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo) } override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = { @@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config, var stats = MetaTableStats(Some(0), None, None) withSparkConfig(mt.sparkConfig) { - stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount) + stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount) } val finish = Instant.now.getEpochSecond diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala index 86a0d4777..93bde1b65 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala @@ -43,7 +43,7 @@ trait MetastorePersistence { } object MetastorePersistence { - def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = { + def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = { val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt) metaTable.format match { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index ed95b41a8..b8ccbe095 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -17,10 +17,12 @@ package za.co.absa.pramen.core.metastore.peristence import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig +import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY} import za.co.absa.pramen.core.utils.hive.QueryExecutor import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils} @@ -30,30 +32,30 @@ import scala.collection.mutable class MetastorePersistenceRaw(path: String, infoDateColumn: String, infoDateFormat: String, - saveModeOpt: Option[SaveMode] - )(implicit spark: SparkSession) extends MetastorePersistence { + saveModeOpt: Option[SaveMode]) + (implicit spark: SparkSession) extends MetastorePersistence { + + import spark.implicits._ private val log = LoggerFactory.getLogger(this.getClass) override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { - import spark.implicits._ - (infoDateFrom, infoDateTo) match { case (Some(from), Some(to)) if from.isEqual(to) => - getListOfFiles(from).map(_.getPath.toString).toDF("path") + listOfPathsToDf(getListOfFiles(from)) case (Some(from), Some(to)) => - getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path") + listOfPathsToDf(getListOfFilesRange(from, to)) case _ => throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.") } } override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = { - if (!df.schema.exists(_.name == "path")) { + if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) { throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.") } - val files = df.select("path").collect().map(_.getString(0)) + val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0)) val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path) @@ -159,4 +161,22 @@ class MetastorePersistenceRaw(path: String, fsUtils.getHadoopFiles(subPath).toSeq } } + + private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = { + val list = listOfPaths.map { path => + (path.getPath.toString, path.getPath.getName) + } + if (list.isEmpty) + getEmptyRawDf + else { + list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY) + } + } + + private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = { + val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType))) + + val emptyRDD = spark.sparkContext.emptyRDD[Row] + spark.createDataFrame(emptyRDD, schema) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala index 9c107ecfe..edb1ff9ad 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala @@ -32,6 +32,9 @@ import scala.util.Random object TransientTableManager { private val log = LoggerFactory.getLogger(this.getClass) + val RAW_PATH_FIELD_KEY = "path" + val RAW_OFFSET_FIELD_KEY = "file_name" + private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]() private val persistedLocations = new mutable.HashMap[MetastorePartition, String]() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index f051330df..ecbf834a3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val errorMessage = ex.getMessage val errorMessageTruncated = maxReasonLength match { + case _ if errorMessage == null => "" case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..." case _ => StringUtils.escapeHTML(errorMessage) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index ea816828c..502e56597 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} import za.co.absa.pramen.api.sql.SqlGeneratorBase -import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason} -import za.co.absa.pramen.api.{Reason, Source} +import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.{DataFormat, Reason, Source} import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager} import za.co.absa.pramen.core.metastore.Metastore @@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef, metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append)) } - val updatedDf = metastore.getBatch(outputTable.name, infoDate, None) + val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw]) + df + else + metastore.getBatch(outputTable.name, infoDate, None) if (updatedDf.isEmpty) { om.rollbackOffsets(req) } else { - val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo) + val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo) if (isRerun) { om.commitRerun(req, minOffset, maxOffset) @@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef, val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)), max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType)) .collect()(0) - val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get - val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get + + val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}")) + val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}")) SqlGeneratorBase.validateOffsetValue(minValue) SqlGeneratorBase.validateOffsetValue(maxValue)