From ec92fc84d48fbaf97b16de9446fe34a5992f6e19 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 29 Nov 2024 14:11:25 +0100 Subject: [PATCH] #520 Simplify the way metastore reader is created for various purposes. --- .../pramen/core/metastore/Metastore.scala | 4 +-- .../pramen/core/metastore/MetastoreImpl.scala | 9 +++--- .../core/metastore/model/ReaderMode.scala | 29 +++++++++++++++++++ .../pipeline/IncrementalIngestionJob.scala | 4 +-- .../pramen/core/pipeline/IngestionJob.scala | 4 +-- .../absa/pramen/core/pipeline/SinkJob.scala | 6 ++-- .../core/pipeline/TransformationJob.scala | 11 ++++--- .../core/metastore/MetastoreSuite.scala | 11 +++---- .../core/mocks/metastore/MetastoreSpy.scala | 4 +-- 9 files changed, 59 insertions(+), 23 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index 30bbc997..772c535d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} import za.co.absa.pramen.api._ import za.co.absa.pramen.api.status.TaskRunReason -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.utils.hive.HiveHelper import java.time.LocalDate @@ -54,7 +54,7 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats - def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader + def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader def commitIncrementalTables(): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 044a8ddc..bdf626d5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -29,7 +29,7 @@ import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetCommitRequest, OffsetManagerUtils} import za.co.absa.pramen.core.config.Keys -import za.co.absa.pramen.core.metastore.model.{MetaTable, TrackingTable} +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable} import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager} import za.co.absa.pramen.core.utils.ConfigUtils import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper} @@ -204,7 +204,7 @@ class MetastoreImpl(appConfig: Config, MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false) } - override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = { val metastore = this new MetastoreReaderCore { @@ -219,9 +219,9 @@ class MetastoreImpl(appConfig: Config, override def getCurrentBatch(tableName: String): DataFrame = { validateTable(tableName) - if (isIncremental && !isRerun && isPostProcessing) { + if (readMode == ReaderMode.IncrementalPostProcessing && !isRerun) { metastore.getBatch(tableName, infoDate, None) - } else if (isIncremental && !isRerun) { + } else if ((readMode == ReaderMode.IncrementalValidation || readMode == ReaderMode.IncrementalRun) && !isRerun) { getIncremental(tableName, outputTable, infoDate) } else metastore.getTable(tableName, Option(infoDate), Option(infoDate)) @@ -281,6 +281,7 @@ class MetastoreImpl(appConfig: Config, } private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = { + val commitChanges = readMode == ReaderMode.IncrementalRun val trackingName = s"$tableName->$transformationOutputTable" val tableDef = getTableDef(tableName) val offsetType = if (tableDef.format.isInstanceOf[DataFormat.Raw]) OffsetType.StringType else OffsetType.IntegralType diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala new file mode 100644 index 00000000..643d6e7b --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/ReaderMode.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore.model + +trait ReaderMode + +object ReaderMode { + case object Batch extends ReaderMode + + case object IncrementalValidation extends ReaderMode + + case object IncrementalRun extends ReaderMode + + case object IncrementalPostProcessing extends ReaderMode +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 73b50e4e..cae62fc9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -28,7 +28,7 @@ import za.co.absa.pramen.api.{DataFormat, Reason, Source} import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager, OffsetManagerUtils} import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} import za.co.absa.pramen.core.utils.SparkUtils._ @@ -169,7 +169,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, commitChanges = false, isPostProcessing = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.IncrementalPostProcessing), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index 5e370112..b7328537 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -25,7 +25,7 @@ import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.peristence.TransientTableManager import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} import za.co.absa.pramen.core.utils.ConfigUtils @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, commitChanges = false, isPostProcessing = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, ReaderMode.Batch), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index c53c1ca2..2de1f670 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.jobdef.SinkTable import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Sink} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore} import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} @@ -122,7 +122,9 @@ class SinkJob(operationDef: OperationDef, case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex) } - val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false) + val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch + + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, readerMode) try { val sinkResult = sink.send(df, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index e09d365c..85f9cdf2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Transformer} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.{Metastore, MetastoreReaderCore} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing} @@ -54,11 +54,13 @@ class TransformationJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = false), infoDate, operationDef.extraOptions) + val readerMode = if (isIncremental) ReaderMode.IncrementalValidation else ReaderMode.Batch + transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode), infoDate, operationDef.extraOptions) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, commitChanges = true, isPostProcessing = false) + val readerMode = if (isIncremental) ReaderMode.IncrementalRun else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, readerMode) val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncrementalStage() @@ -83,7 +85,8 @@ class TransformationJob(operationDef: OperationDef, else SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None)) - val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, commitChanges = false, isPostProcessing = true) + val readerMode = if (isIncremental) ReaderMode.IncrementalPostProcessing else ReaderMode.Batch + val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, readerMode) try { transformer.postProcess( diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 1b3e832e..d120557e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -28,6 +28,7 @@ import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.core.metadata.MetadataManagerNull +import za.co.absa.pramen.core.metastore.model.ReaderMode import za.co.absa.pramen.core.metastore.peristence.TransientJobManager import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock import za.co.absa.pramen.core.mocks.job.JobSpy @@ -390,7 +391,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -404,7 +405,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val ex = intercept[TableNotConfigured] { reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -420,7 +421,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val runInfo1 = reader.getTableRunInfo("table1", infoDate) val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) @@ -438,7 +439,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) val metadataManager = reader.metadataManager metadataManager.setMetadata("table1", infoDate, "key1", "value1") @@ -456,7 +457,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) m.saveTable("table1", infoDate.plusDays(1), getDf) - val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, commitChanges = false, isPostProcessing = false) + val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, ReaderMode.Batch) val date1 = reader.getLatestAvailableDate("table1") val date2 = reader.getLatestAvailableDate("table1", Some(infoDate)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index db32d380..faf4ab9c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.offset.DataOffset import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} import za.co.absa.pramen.core.metadata.MetadataManagerNull -import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode} import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore, TableNotConfigured} import za.co.absa.pramen.core.mocks.MetaTableFactory import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock @@ -105,7 +105,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), stats } - override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, commitChanges: Boolean, isPostProcessing: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, readMode: ReaderMode): MetastoreReader = { val metastore = this new MetastoreReaderCore {