Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#520 Fix handling of incremental ingestion for 'raw' format of metastore tables. #521

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ object OffsetValue {
}

def fromString(dataType: String, value: String): Option[OffsetValue] = {
if (value.isEmpty)
if (value == null || value.isEmpty) {
None
else
} else
dataType match {
case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong)))
case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config,
override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
val mt = getTableDef(tableName)

MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo)
MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo)
}

override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = {
Expand Down Expand Up @@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config,
var stats = MetaTableStats(Some(0), None, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount)
}

val finish = Instant.now.getEpochSecond
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait MetastorePersistence {
}

object MetastorePersistence {
def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = {
def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = {
val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt)

metaTable.format match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.HiveConfig
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.hive.QueryExecutor
import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils}

Expand All @@ -30,30 +32,30 @@ import scala.collection.mutable
class MetastorePersistenceRaw(path: String,
infoDateColumn: String,
infoDateFormat: String,
saveModeOpt: Option[SaveMode]
)(implicit spark: SparkSession) extends MetastorePersistence {
saveModeOpt: Option[SaveMode])
(implicit spark: SparkSession) extends MetastorePersistence {

import spark.implicits._

private val log = LoggerFactory.getLogger(this.getClass)

override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
import spark.implicits._

(infoDateFrom, infoDateTo) match {
case (Some(from), Some(to)) if from.isEqual(to) =>
getListOfFiles(from).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFiles(from))
case (Some(from), Some(to)) =>
getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path")
listOfPathsToDf(getListOfFilesRange(from, to))
case _ =>
throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.")
}
}

override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = {
if (!df.schema.exists(_.name == "path")) {
if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) {
throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.")
}

val files = df.select("path").collect().map(_.getString(0))
val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0))

val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path)

Expand Down Expand Up @@ -159,4 +161,22 @@ class MetastorePersistenceRaw(path: String,
fsUtils.getHadoopFiles(subPath).toSeq
}
}

private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = {
val list = listOfPaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}
if (list.isEmpty)
getEmptyRawDf
else {
list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY)
}
}

private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = {
val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType)))

val emptyRDD = spark.sparkContext.emptyRDD[Row]
spark.createDataFrame(emptyRDD, schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import scala.util.Random
object TransientTableManager {
private val log = LoggerFactory.getLogger(this.getClass)

val RAW_PATH_FIELD_KEY = "path"
val RAW_OFFSET_FIELD_KEY = "file_name"

private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
val errorMessage = ex.getMessage

val errorMessageTruncated = maxReasonLength match {
case _ if errorMessage == null => "<null error message>"
case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
case _ => StringUtils.escapeHTML(errorMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
import za.co.absa.pramen.core.metastore.Metastore
Expand Down Expand Up @@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef,
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
}

val updatedDf = metastore.getBatch(outputTable.name, infoDate, None)
val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw])
df
else
metastore.getBatch(outputTable.name, infoDate, None)

if (updatedDf.isEmpty) {
om.rollbackOffsets(req)
} else {
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo)

if (isRerun) {
om.commitRerun(req, minOffset, maxOffset)
Expand Down Expand Up @@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef,
val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)),
max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType))
.collect()(0)
val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get

val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))

SqlGeneratorBase.validateOffsetValue(minValue)
SqlGeneratorBase.validateOffsetValue(maxValue)
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.