Skip to content

Commit

Permalink
#374 Add the notion of 'batchId' and 'getCurrentBatch' for the metast…
Browse files Browse the repository at this point in the history
…ore.
  • Loading branch information
yruslan committed Sep 10, 2024
1 parent 392003d commit 9264c6b
Show file tree
Hide file tree
Showing 30 changed files with 286 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.time.LocalDate
* @param format The format of the table.
* @param infoDateColumn The name of the column that contains the information date (partitioned by).
* @param infoDateFormat The format of the information date.
* @param batchIdColumn The name of the column that contains the batch id.
* @param hiveTable The name of the Hive table.
* @param hivePath The path of the Hive table (if it differs from the path in the underlying format).
* @param infoDateStart The start date of the information date.
Expand All @@ -38,6 +39,7 @@ case class MetaTableDef(
format: DataFormat,
infoDateColumn: String,
infoDateFormat: String,
batchIdColumn: String,
hiveTable: Option[String],
hivePath: Option[String],
infoDateStart: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.time.LocalDate
trait MetastoreReader {

/**
* Reads a table given th range of information dates, and returns back the dataframe.
* Reads a table given the range of information dates, and returns back the dataframe.
*
* In order to read a table it is not sufficient the table to be registered in the metastore. It also
* should be defined as input tables of the job. Otherwise, a runtime exception will be thrown.
Expand All @@ -41,6 +41,28 @@ trait MetastoreReader {
infoDateFrom: Option[LocalDate] = None,
infoDateTo: Option[LocalDate] = None): DataFrame

/**
* Reads the 'current batch' of the table.
*
* For incremental processing this method returns the current chunk being processed.
*
* For non-incremental processing the call to this method is equivalent to:
* {{{
* val df = getTable(tableName)
* }}}
*
* which returns all data for the current information date being processed.
*
* This method is the method to use for transformers that
*
* In order to read a table it is not sufficient the table to be registered in the metastore. It also
* should be defined as input tables of the job. Otherwise, a runtime exception will be thrown.
*
* @param tableName The name of the table to read.
* @return The dataframe containing data from the table.
*/
def getCurrentBatch(tableName: String): DataFrame

/**
* Reads the latest partition of a given table.
*
Expand All @@ -66,7 +88,6 @@ trait MetastoreReader {
*/
def getLatestAvailableDate(tableName: String, until: Option[LocalDate] = None): Option[LocalDate]


/**
* Returns true if data for the specified table is available for the specified range.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import za.co.absa.pramen.api.PipelineInfo

case class PipelineStateSnapshot(
pipelineInfo: PipelineInfo,
batchId: Long,
isFinished: Boolean,
warningFlag: Boolean,
exitedNormally: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ class AppContextImpl(val appConfig: AppConfig,
}

object AppContextImpl {
def apply(conf: Config)(implicit spark: SparkSession): AppContextImpl = {
def apply(conf: Config, batchId: Long)(implicit spark: SparkSession): AppContextImpl = {

val appConfig = AppConfig.fromConfig(conf)

val (bookkeeper, tokenLockFactory, journal, metadataManager, closable) = Bookkeeper.fromConfig(appConfig.bookkeepingConfig, appConfig.runtimeConfig)

val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId)

PramenImpl.instance.asInstanceOf[PramenImpl].setMetadataManager(metadataManager)
PramenImpl.instance.asInstanceOf[PramenImpl].setWorkflowConfig(conf)
Expand All @@ -84,7 +84,7 @@ object AppContextImpl {

val metadataManager = new MetadataManagerNull(isPersistenceEnabled = false)

val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager, 0L)

val appContext = new AppContextImpl(
appConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trait Bookkeeper {

private[pramen] def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit

private[pramen] def getOffsetManager: OffsetManager = ???
private[pramen] def getOffsetManager: OffsetManager
}

object Bookkeeper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper
}
}

private[pramen] override def getOffsetManager: OffsetManager = {
throw new IllegalArgumentException(s"This implementation of bookeeping does not support offset management and incremental pipelines. " +
"Please, use JDBC for bookkeeping to enable this.")
}

private def getLatestTransientDate(table: String, from: Option[LocalDate], until: Option[LocalDate]): Option[LocalDate] = {
val chunks = getTransientDataChunks(table, from, until)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import java.time.LocalDate
import scala.util.control.NonFatal

class BookkeeperJdbc(db: Database) extends BookkeeperBase(true) {

import za.co.absa.pramen.core.utils.FutureImplicits._

private val log = LoggerFactory.getLogger(this.getClass)
private val offsetManagement = new OffsetManagerJdbc(db)

override val bookkeepingEnabled: Boolean = true

Expand Down Expand Up @@ -121,6 +121,10 @@ class BookkeeperJdbc(db: Database) extends BookkeeperBase(true) {
}
}

private[pramen] override def getOffsetManager: OffsetManager = {
offsetManagement
}

private def toChunk(r: BookkeepingRecord): DataChunk = {
DataChunk(
r.pramenTableName, r.infoDate, r.infoDateBegin, r.infoDateEnd, r.inputRecordCount, r.outputRecordCount, r.jobStarted, r.jobFinished)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.metastore

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api._
import za.co.absa.pramen.core.metastore.model.MetaTable
Expand All @@ -37,9 +37,11 @@ trait Metastore {

def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame

def getCurrentBatch(tableName: String, infoDate: LocalDate): DataFrame

def getLatest(tableName: String, until: Option[LocalDate]): DataFrame

def saveTable(tableName: String, infoDate: LocalDate, df: DataFrame, inputRecordCount: Option[Long] = None): MetaTableStats
def saveTable(tableName: String, infoDate: LocalDate, df: DataFrame, inputRecordCount: Option[Long] = None, saveModeOverride: Option[SaveMode] = None): MetaTableStats

def getHiveHelper(tableName: String): HiveHelper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ package za.co.absa.pramen.core.metastore

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.core.app.config.InfoDateConfig
Expand All @@ -38,6 +39,7 @@ class MetastoreImpl(appConfig: Config,
tableDefs: Seq[MetaTable],
bookkeeper: Bookkeeper,
metadata: MetadataManager,
batchId: Long,
skipBookKeepingUpdates: Boolean)(implicit spark: SparkSession) extends Metastore {
import MetastoreImpl._

Expand Down Expand Up @@ -78,6 +80,18 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig).loadTable(infoDateFrom, infoDateTo)
}

override def getCurrentBatch(tableName: String, infoDate: LocalDate): DataFrame = {
val mt = getTableDef(tableName)

val df = MetastorePersistence.fromMetaTable(mt, appConfig).loadTable(Option(infoDate), Option(infoDate))

if (df.schema.fields.exists(_.name.equalsIgnoreCase(mt.batchIdColumn))) {
df.filter(col(mt.batchIdColumn) === lit(batchId))
} else {
df
}
}

override def getLatest(tableName: String, until: Option[LocalDate]): DataFrame = {
val mt = getTableDef(tableName)
val isLazy = mt.format.isLazy
Expand All @@ -91,15 +105,15 @@ class MetastoreImpl(appConfig: Config,
}
}

override def saveTable(tableName: String, infoDate: LocalDate, df: DataFrame, inputRecordCount: Option[Long]): MetaTableStats = {
override def saveTable(tableName: String, infoDate: LocalDate, df: DataFrame, inputRecordCount: Option[Long], saveModeOverride: Option[SaveMode]): MetaTableStats = {
val mt = getTableDef(tableName)
val isTransient = mt.format.isTransient
val start = Instant.now.getEpochSecond

var stats = MetaTableStats(0, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig).saveTable(infoDate, df, inputRecordCount)
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride).saveTable(infoDate, df, inputRecordCount)
}

val finish = Instant.now.getEpochSecond
Expand Down Expand Up @@ -191,6 +205,11 @@ class MetastoreImpl(appConfig: Config,
metastore.getTable(tableName, from, to)
}

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
metastore.getCurrentBatch(tableName, infoDate)
}

override def getLatest(tableName: String, until: Option[LocalDate] = None): DataFrame = {
validateTable(tableName)
val untilDate = until.orElse(Option(infoDate))
Expand Down Expand Up @@ -253,12 +272,13 @@ object MetastoreImpl {
def fromConfig(conf: Config,
infoDateConfig: InfoDateConfig,
bookkeeper: Bookkeeper,
metadataManager: MetadataManager)(implicit spark: SparkSession): MetastoreImpl = {
metadataManager: MetadataManager,
batchId: Long)(implicit spark: SparkSession): MetastoreImpl = {
val tableDefs = MetaTable.fromConfig(conf, infoDateConfig, METASTORE_KEY)

val isUndercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false)

new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, isUndercover)
new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, batchId, isUndercover)
}

private[core] def withSparkConfig(sparkConfig: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import scala.util.{Failure, Success, Try}
* @param format The format of the table.
* @param infoDateColumn The name of the column that contains the information date (partitioned by).
* @param infoDateFormat The format of the information date.
* @param batchIdColumn The name of the column that contains the batch id.
* @param hiveConfig The effective Hive configuration to use for Hive operations.
* @param hiveTable The name of the Hive table.
* @param hivePath The path of the Hive table (if it differs from the path in the underlying format).
Expand All @@ -53,6 +54,7 @@ case class MetaTable(
format: DataFormat,
infoDateColumn: String,
infoDateFormat: String,
batchIdColumn: String,
hiveConfig: HiveConfig,
hiveTable: Option[String],
hivePath: Option[String],
Expand Down Expand Up @@ -82,6 +84,9 @@ object MetaTable {
val TABLE_HIVE_CONFIG_PREFIX = "hive"
val DEFAULT_HIVE_CONFIG_PREFIX = "pramen.hive"
val SPARK_CONFIG_PREFIX = "spark.conf"
val BATCH_ID_COLUMN_KEY = "batchid.column"
val DEFAULT_BATCH_ID_COLUMN_KEY = "pramen.batchid.column.default"
val DEFAULT_BATCH_ID_COLUMN_NAME = "pramen_batchid"

def fromConfig(conf: Config, infoDateConfig: InfoDateConfig, key: String): Seq[MetaTable] = {
val defaultInfoDateColumnName = infoDateConfig.columnName
Expand All @@ -90,6 +95,7 @@ object MetaTable {
val defaultTrackDays = infoDateConfig.defaultTrackDays
val defaultHiveConfig = HiveDefaultConfig.fromConfig(ConfigUtils.getOptionConfig(conf, DEFAULT_HIVE_CONFIG_PREFIX))
val defaultPreferAddPartition = conf.getBoolean(s"pramen.$HIVE_PREFER_ADD_PARTITION_KEY")
val defaultBatchIdColumnName = ConfigUtils.getOptionString(conf, DEFAULT_BATCH_ID_COLUMN_KEY).getOrElse(DEFAULT_BATCH_ID_COLUMN_NAME)

val tableConfigs = ConfigUtils.getOptionConfigList(conf, key)

Expand All @@ -98,7 +104,7 @@ object MetaTable {
}

val metatables = tableConfigs
.map(tableConfig => fromConfigSingleEntity(tableConfig, conf, defaultInfoDateColumnName, defaultInfoDateFormat, defaultStartDate, defaultTrackDays, defaultHiveConfig, defaultPreferAddPartition))
.map(tableConfig => fromConfigSingleEntity(tableConfig, conf, defaultInfoDateColumnName, defaultInfoDateFormat, defaultStartDate, defaultTrackDays, defaultHiveConfig, defaultPreferAddPartition, defaultBatchIdColumnName))
.toSeq

val duplicates = AlgorithmUtils.findDuplicates(metatables.map(_.name))
Expand All @@ -115,7 +121,8 @@ object MetaTable {
defaultStartDate: LocalDate,
defaultTrackDays: Int,
defaultHiveConfig: HiveDefaultConfig,
defaultPreferAddPartition: Boolean): MetaTable = {
defaultPreferAddPartition: Boolean,
defaultBatchIdColumn: String): MetaTable = {
val name = ConfigUtils.getOptionString(conf, NAME_KEY).getOrElse(throw new IllegalArgumentException(s"Mandatory option missing: $NAME_KEY"))
val description = ConfigUtils.getOptionString(conf, NAME_DESCRIPTION).getOrElse("")
val infoDateOverride = InfoDateOverride.fromConfig(conf)
Expand All @@ -125,6 +132,7 @@ object MetaTable {
val startDate = infoDateOverride.startDate.getOrElse(defaultStartDate)
val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays)
val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY)
val batchIdColumn = ConfigUtils.getOptionString(conf, BATCH_ID_COLUMN_KEY).getOrElse(defaultBatchIdColumn)

val format = Try {
DataFormatParser.fromConfig(conf, appConf)
Expand Down Expand Up @@ -153,6 +161,7 @@ object MetaTable {
format,
infoDateColumn,
infoDateFormat,
batchIdColumn,
hiveConfig,
hiveTable,
hivePath,
Expand All @@ -174,6 +183,7 @@ object MetaTable {
table.format,
table.infoDateColumn,
table.infoDateFormat,
table.batchIdColumn,
table.hiveTable,
table.hivePath,
table.infoDateStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.metastore.peristence

import com.typesafe.config.Config
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.DataFormat
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.{HiveConfig, MetaTable}
Expand All @@ -43,18 +43,20 @@ trait MetastorePersistence {
}

object MetastorePersistence {
def fromMetaTable(metaTable: MetaTable, conf: Config)(implicit spark: SparkSession): MetastorePersistence = {
def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = {
val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt)

metaTable.format match {
case DataFormat.Parquet(path, recordsPerPartition) =>
new MetastorePersistenceParquet(
path, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, metaTable.saveModeOpt, metaTable.readOptions, metaTable.writeOptions
path, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
)
case DataFormat.Delta(query, recordsPerPartition) =>
new MetastorePersistenceDelta(
query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, metaTable.saveModeOpt, metaTable.readOptions, metaTable.writeOptions
query, metaTable.infoDateColumn, metaTable.infoDateFormat, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions
)
case DataFormat.Raw(path) =>
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.saveModeOpt)
new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt)
case DataFormat.TransientEager(cachePolicy) =>
new MetastorePersistenceTransientEager(TransientTableManager.getTempDirectory(cachePolicy, conf), metaTable.name, cachePolicy)
case DataFormat.Transient(cachePolicy) =>
Expand Down
Loading

0 comments on commit 9264c6b

Please sign in to comment.