Skip to content

Commit

Permalink
#520 Simplify the way metastore reader is created for various purposes.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 29, 2024
1 parent ffa27ae commit ec92fc8
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SaveMode}
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.utils.hive.HiveHelper

import java.time.LocalDate
Expand Down Expand Up @@ -54,7 +54,7 @@ trait Metastore {

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader
def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader

def commitIncrementalTables(): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest, OffsetManagerUtils}
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable}
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable}
import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper}
Expand Down Expand Up @@ -204,7 +204,7 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false)
}

override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = {
val metastore = this

new MetastoreReaderCore {
Expand All @@ -219,9 +219,9 @@ class MetastoreImpl(appConfig: Config,

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
if (isIncremental && !isRerun && isPostProcessing) {
if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) {
metastore.getBatch(tableName, infoDate, None)
} else if (isIncremental && !isRerun) {
} else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) {
getIncremental(tableName, outputTable, infoDate)
} else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
Expand Down Expand Up @@ -281,6 +281,7 @@ class MetastoreImpl(appConfig: Config,
}

private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = {
val commitChanges = readMode == ReaderMode.IncrementalRun
val trackingName = s"$tableName->$transformationOutputTable"
val tableDef = getTableDef(tableName)
val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.metastore.model

trait ReaderMode

object ReaderMode {
case object Batch extends ReaderMode

case object IncrementalValidation extends ReaderMode

case object IncrementalRun extends ReaderMode

case object IncrementalPostProcessing extends ReaderMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import za.co.absa.pramen.api.{DataFormat, Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManagerUtils}
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
import za.co.absa.pramen.core.utils.SparkUtils._

Expand Down Expand Up @@ -169,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, commitChanges = false, isPostProcessing = true),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.IncrementalPostProcessing),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.utils.ConfigUtils
Expand Down Expand Up @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, commitChanges = false, isPostProcessing = true),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.Batch),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.jobdef.SinkTable
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Sink}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore}
import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
Expand Down Expand Up @@ -122,7 +122,9 @@ class SinkJob(operationDef: OperationDef,
case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex)
}

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false)
val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode)

try {
val sinkResult = sink.send(df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Transformer}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.{Metastore, MetastoreReaderCore}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing}

Expand Down Expand Up @@ -54,11 +54,13 @@ class TransformationJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = false), infoDate, operationDef.extraOptions)
val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode), infoDate, operationDef.extraOptions)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false)
val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode)
val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))

metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage()
Expand All @@ -83,7 +85,8 @@ class TransformationJob(operationDef: OperationDef,
else
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))

val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = true)
val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch
val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, readerMode)

try {
transformer.postProcess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.metadata.MetadataManagerNull
import za.co.absa.pramen.core.metastore.model.ReaderMode
import za.co.absa.pramen.core.metastore.peristence.TransientJobManager
import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock
import za.co.absa.pramen.core.mocks.job.JobSpy
Expand Down Expand Up @@ -390,7 +391,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch)

val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate))

Expand All @@ -404,7 +405,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch)

val ex = intercept[TableNotConfigured] {
reader.getTable("table1", Some(infoDate), Some(infoDate))
Expand All @@ -420,7 +421,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch)
val runInfo1 = reader.getTableRunInfo("table1", infoDate)
val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1))

Expand All @@ -438,7 +439,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF

m.saveTable("table1", infoDate, getDf)

val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch)
val metadataManager = reader.metadataManager

metadataManager.setMetadata("table1", infoDate, "key1", "value1")
Expand All @@ -456,7 +457,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF
m.saveTable("table1", infoDate, getDf)
m.saveTable("table1", infoDate.plusDays(1), getDf)

val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false)
val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, ReaderMode.Batch)

val date1 = reader.getLatestAvailableDate("table1")
val date2 = reader.getLatestAvailableDate("table1", Some(infoDate))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader}
import za.co.absa.pramen.core.metadata.MetadataManagerNull
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore, TableNotConfigured}
import za.co.absa.pramen.core.mocks.MetaTableFactory
import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock
Expand Down Expand Up @@ -105,7 +105,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"),
stats
}

override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = {
val metastore = this

new MetastoreReaderCore {
Expand Down

0 comments on commit ec92fc8

Please sign in to comment.