Skip to content

Commit

Permalink
#520 Make the default batchid field for tables having 'raw' format to…
Browse files Browse the repository at this point in the history
… match raw file ingestion.
  • Loading branch information
yruslan committed Nov 22, 2024
1 parent 6f944bf commit d90bd7d
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, MetaTableDef}
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.config.InfoDateOverride
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.RAW_OFFSET_FIELD_KEY
import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils}

import java.time.LocalDate
Expand Down Expand Up @@ -137,7 +138,6 @@ object MetaTable {
val startDate = infoDateOverride.startDate.getOrElse(defaultStartDate)
val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays)
val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY)
val batchIdColumn = ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(defaultBatchIdColumn)

val format = Try {
DataFormatParser.fromConfig(conf, appConf)
Expand All @@ -146,6 +146,12 @@ object MetaTable {
case Failure(ex) => throw new IllegalArgumentException(s"Unable to read data format from config for the metastore table: $name", ex)
}

val batchIdColumn = if (format.isInstanceOf[DataFormat.Raw]) {
ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(RAW_OFFSET_FIELD_KEY)
} else {
ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(defaultBatchIdColumn)
}

val hiveTable = ConfigUtils.getOptionString(conf, HIVE_TABLE_KEY)
val hivePath = ConfigUtils.getOptionString(conf, HIVE_PATH_KEY)
val hivePreferAddPartition = ConfigUtils.getOptionBoolean(conf, HIVE_PREFER_ADD_PARTITION_KEY).getOrElse(defaultPreferAddPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.HiveConfig
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.hive.QueryExecutor
import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils}

Expand All @@ -35,6 +34,7 @@ class MetastorePersistenceRaw(path: String,
saveModeOpt: Option[SaveMode])
(implicit spark: SparkSession) extends MetastorePersistence {

import MetastorePersistenceRaw._
import spark.implicits._

private val log = LoggerFactory.getLogger(this.getClass)
Expand Down Expand Up @@ -180,3 +180,8 @@ class MetastorePersistenceRaw(path: String,
spark.createDataFrame(emptyRDD, schema)
}
}

object MetastorePersistenceRaw {
val RAW_PATH_FIELD_KEY = "path"
val RAW_OFFSET_FIELD_KEY = "file_name"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ import scala.util.Random
object TransientTableManager {
private val log = LoggerFactory.getLogger(this.getClass)

val RAW_PATH_FIELD_KEY = "path"
val RAW_OFFSET_FIELD_KEY = "file_name"

private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package za.co.absa.pramen.core.source

import com.typesafe.config.Config
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.OffsetValue
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceRaw.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils}

import java.io.FileNotFoundException
Expand Down Expand Up @@ -106,13 +108,38 @@ class RawFileSource(val sourceConfig: Config,
}

override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): SourceResult = {
val files = getPaths(query, infoDateBegin, infoDateEnd)
val df = files.map(_.getPath.toString).toDF(PATH_FIELD)
val fileNames = files.map(_.getPath.getName).sorted
val filePaths = getPaths(query, infoDateBegin, infoDateEnd)
val fileNames = filePaths.map(_.getPath.getName).sorted
val list = filePaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}

val df = listOfFilesToDataFrame(list)

SourceResult(df, fileNames)
}

override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFromOpt: Option[OffsetValue], offsetToOpt: Option[OffsetValue], columns: Seq[String]): SourceResult = {
val filePaths = getPaths(query, onlyForInfoDate.get, onlyForInfoDate.get)
val fileNames = filePaths.map(_.getPath.getName).sorted
val list = filePaths.map { path =>
(path.getPath.toString, path.getPath.getName)
}.filter {
case (_, fileName) =>
(offsetFromOpt, offsetToOpt) match {
case (Some(offsetFrom), Some(offsetTo)) => fileName >= offsetFrom.valueString && fileName <= offsetTo.valueString
case (Some(offsetFrom), None) => fileName > offsetFrom.valueString
case (None, Some(offsetTo)) => fileName <= offsetTo.valueString
case _ => true
}
}

val df = listOfFilesToDataFrame(list)

SourceResult(df, fileNames)
}


@throws[FileNotFoundException]
private[source] def getPaths(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[FileStatus] = {
query match {
Expand Down Expand Up @@ -140,13 +167,17 @@ class RawFileSource(val sourceConfig: Config,
}
}

override def getDataIncremental(query: Query, onlyForInfoDate: Option[LocalDate], offsetFrom: Option[OffsetValue], offsetTo: Option[OffsetValue], columns: Seq[String]): SourceResult = ???
private[source] def listOfFilesToDataFrame(list: Seq[(String, String)]): DataFrame = {
if (list.isEmpty)
getEmptyRawDf
else
list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY)
}
}

object RawFileSource extends ExternalChannelFactory[RawFileSource] {
private val log = LoggerFactory.getLogger(this.getClass)

val PATH_FIELD = "path"
val FILE_PREFIX = "file"
val FILE_PATTERN_CASE_SENSITIVE_KEY = "file.pattern.case.sensitive"

Expand Down Expand Up @@ -231,4 +262,11 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] {
filePattern
}
}

private[core] def getEmptyRawDf(implicit spark: SparkSession): DataFrame = {
val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType)))

val emptyRDD = spark.sparkContext.emptyRDD[Row]
spark.createDataFrame(emptyRDD, schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,33 @@ class MetaTableSuite extends AnyWordSpec {
assert(metaTable.infoDateColumn == "INFO_DATE")
assert(metaTable.infoDateFormat == "dd-MM-yyyy")
assert(metaTable.infoDateStart.toString == "2020-01-31")
assert(metaTable.batchIdColumn == "batchid")
assert(metaTable.sparkConfig("key1") == "value1")
assert(metaTable.saveModeOpt.contains(SaveMode.Append))
}

"load a metatable definition for raw format" in {
val conf = ConfigFactory.parseString(
"""
|name = my_table
|format = raw
|path = /a/b/c
|""".stripMargin)

val defaultHiveConfig = HiveDefaultConfig.getNullConfig

val metaTable = MetaTable.fromConfigSingleEntity(conf, conf, "INFO_DATE", "dd-MM-yyyy", defaultPartitionByInfoDate = true, LocalDate.parse("2020-01-31"), 0, defaultHiveConfig, defaultPreferAddPartition = true, "batchid")

assert(metaTable.name == "my_table")
assert(metaTable.format.name == "raw")
assert(metaTable.hiveTable.isEmpty)
assert(metaTable.hivePath.isEmpty)
assert(metaTable.infoDateColumn == "INFO_DATE")
assert(metaTable.infoDateFormat == "dd-MM-yyyy")
assert(metaTable.infoDateStart.toString == "2020-01-31")
assert(metaTable.batchIdColumn == "file_name")
}

"load a metatable definition with hive table defined" in {
val conf = ConfigFactory.parseString(
"""
Expand Down

0 comments on commit d90bd7d

Please sign in to comment.