From e2034c17de9e669b6df7e8871b19642cc2710af5 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 21 Nov 2024 14:56:47 +0100 Subject: [PATCH] #520 Prepare interfaces for incremental transformer processing. --- .../absa/pramen/core/app/AppContextImpl.scala | 7 ++-- .../core/app/config/RuntimeConfig.scala | 21 ++++++++++ .../pramen/core/metastore/Metastore.scala | 2 +- .../pramen/core/metastore/MetastoreImpl.scala | 42 +++++++++++++++---- .../core/metastore/MetastoreReaderCore.scala | 23 ++++++++++ .../core/metastore/model/TrackingTable.scala | 27 ++++++++++++ .../pipeline/IncrementalIngestionJob.scala | 2 +- .../pramen/core/pipeline/IngestionJob.scala | 2 +- .../absa/pramen/core/pipeline/SinkJob.scala | 7 +++- .../core/pipeline/TransformationJob.scala | 17 ++++++-- .../jobrunner/ConcurrentJobRunnerImpl.scala | 10 +++-- .../core/metastore/MetastoreSuite.scala | 15 +++---- .../core/mocks/metastore/MetastoreSpy.scala | 8 ++-- 13 files changed, 148 insertions(+), 35 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala index 91970eed0..f659e71b2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContextImpl.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import za.co.absa.pramen.api.MetadataManager import za.co.absa.pramen.core.PramenImpl -import za.co.absa.pramen.core.app.config.InfoDateConfig +import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.lock.{TokenLockFactory, TokenLockFactoryAllow} @@ -57,7 +57,7 @@ object AppContextImpl { val (bookkeeper, tokenLockFactory, journal, metadataManager, closable) = Bookkeeper.fromConfig(appConfig.bookkeepingConfig, appConfig.runtimeConfig, batchId) - val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId) + val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.runtimeConfig, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId) PramenImpl.instance.asInstanceOf[PramenImpl].setMetadataManager(metadataManager) PramenImpl.instance.asInstanceOf[PramenImpl].setWorkflowConfig(conf) @@ -83,8 +83,9 @@ object AppContextImpl { val appConfig = AppConfig.fromConfig(conf) val metadataManager = new MetadataManagerNull(isPersistenceEnabled = false) + val runtimeConfig = RuntimeConfig.default - val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager, 0L) + val metastore: Metastore = MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bookkeeper, metadataManager, 0L) val appContext = new AppContextImpl( appConfig, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index 655b8edf1..c56fc33d4 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -151,4 +151,25 @@ object RuntimeConfig { sparkAppDescriptionTemplate ) } + + def default: RuntimeConfig = { + RuntimeConfig( + isDryRun = false, + isRerun = false, + runTables = Seq.empty, + isUndercover = false, + useLocks = true, + checkOnlyLateData = false, + checkOnlyNewData = true, + emailIfNoChanges = false, + runDate = LocalDate.now(), + runDateTo = None, + isInverseOrder = false, + parallelTasks = 1, + stopSparkSession = true, + allowEmptyPipeline = false, + historicalRunMode = RunMode.CheckUpdates, + sparkAppDescriptionTemplate = None + ) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala index 1209c882f..1d94d4463 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/Metastore.scala @@ -54,5 +54,5 @@ trait Metastore { def getStats(tableName: String, infoDate: LocalDate): MetaTableStats - def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader + def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index ab2d3a6c9..732eaff6f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -25,9 +25,8 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ import za.co.absa.pramen.api.offset.DataOffset import za.co.absa.pramen.api.status.TaskRunReason -import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT -import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER +import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.metastore.model.MetaTable @@ -36,12 +35,14 @@ import za.co.absa.pramen.core.utils.ConfigUtils import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper} import java.time.{Instant, LocalDate} +import scala.collection.mutable.ListBuffer class MetastoreImpl(appConfig: Config, tableDefs: Seq[MetaTable], bookkeeper: Bookkeeper, metadata: MetadataManager, batchId: Long, + isRerun: Boolean, skipBookKeepingUpdates: Boolean)(implicit spark: SparkSession) extends Metastore { import MetastoreImpl._ @@ -201,10 +202,12 @@ class MetastoreImpl(appConfig: Config, MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false) } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = { val metastore = this - new MetastoreReader { + new MetastoreReaderCore { + private val incrementalInputTables = new ListBuffer[String] + override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { validateTable(tableName) val from = infoDateFrom.orElse(Option(infoDate)) @@ -214,9 +217,12 @@ class MetastoreImpl(appConfig: Config, override def getCurrentBatch(tableName: String): DataFrame = { validateTable(tableName) - if (isIncremental) + if (isPostProcessing && isIncremental) { metastore.getBatch(tableName, infoDate, None) - else + } else if (isIncremental && !isRerun && !isPostProcessing) { + incrementalInputTables += tableName + getIncremental(tableName, outputTable, infoDate) + } else metastore.getTable(tableName, Option(infoDate), Option(infoDate)) } @@ -262,11 +268,24 @@ class MetastoreImpl(appConfig: Config, override def metadataManager: MetadataManager = metadata + override def commitIncremental(): Unit = { + // ToDo Replace this with proper offset management implementation + } + private def validateTable(tableName: String): Unit = { if (!tables.contains(tableName)) { throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName") } } + + private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = { + // Don't forget to use incrementalDryRun to decide if we need to commit + val needsToCommit = !isPostProcessing && !incrementalDryRun + val om = bookkeeper.getOffsetManager + + // ToDo Replace this with proper offset management implementation + metastore.getBatch(tableName, infoDate, None) + } } } @@ -288,15 +307,20 @@ object MetastoreImpl { val DEFAULT_RECORDS_PER_PARTITION = 500000 def fromConfig(conf: Config, + runtimeConfig: RuntimeConfig, infoDateConfig: InfoDateConfig, bookkeeper: Bookkeeper, metadataManager: MetadataManager, batchId: Long)(implicit spark: SparkSession): MetastoreImpl = { val tableDefs = MetaTable.fromConfig(conf, infoDateConfig, METASTORE_KEY) - val isUndercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false) - - new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, batchId, isUndercover) + new MetastoreImpl(conf, + tableDefs, + bookkeeper, + metadataManager, + batchId, + runtimeConfig.isRerun, + runtimeConfig.isUndercover) } private[core] def withSparkConfig(sparkConfig: Map[String, String]) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala new file mode 100644 index 000000000..6dec4ccd3 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderCore.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore + +import za.co.absa.pramen.api.MetastoreReader + +trait MetastoreReaderCore extends MetastoreReader { + def commitIncremental(): Unit +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala new file mode 100644 index 000000000..5f345ba45 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/TrackingTable.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.metastore.model + +import org.apache.spark.sql.DataFrame +import za.co.absa.pramen.core.bookkeeper.model.DataOffsetRequest + +case class TrackingTable( + inputTable: String, + outputTable: String, + commitRequest: DataOffsetRequest, + data: DataFrame + ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 502e56597..e6aae1dae 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -178,7 +178,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = true), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, incrementalDryRun = false, isPostProcessing = true), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index f84f09f6c..a02963162 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef, source.postProcess( sourceTable.query, outputTable.name, - metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = false), + metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, incrementalDryRun = false, isPostProcessing = true), infoDate, operationDef.extraOptions ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 8e2672ee2..0a42cacb8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -23,7 +23,7 @@ import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Sink} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore} +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore} import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} import za.co.absa.pramen.core.utils.ConfigUtils @@ -122,10 +122,12 @@ class SinkJob(operationDef: OperationDef, case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex) } + val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false) + try { val sinkResult = sink.send(df, sinkTable.metaTableName, - metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, runReason, isIncremental), + metastoreReader, infoDate, sinkTable.options ) @@ -153,6 +155,7 @@ class SinkJob(operationDef: OperationDef, } finally { Try { sink.close() + metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental() } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 0cd611e3f..8af4d2c6e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Transformer} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable +import za.co.absa.pramen.core.metastore.{Metastore, MetastoreReaderCore} import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing} import java.time.{Instant, LocalDate} @@ -53,11 +53,18 @@ class TransformationJob(operationDef: OperationDef, } override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = { - transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions) + transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false), infoDate, operationDef.extraOptions) } override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = { - RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions)) + val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false) + val runResult = try { + RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions)) + } finally { + metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental() + } + + runResult } def postProcessing(df: DataFrame, @@ -77,10 +84,12 @@ class TransformationJob(operationDef: OperationDef, else SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None)) + val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = true) + try { transformer.postProcess( outputTable.name, - metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, runReason, isIncremental), + metastoreReader, infoDate, operationDef.extraOptions ) } catch { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index ffb3ab79e..839bf9ae9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -23,7 +23,6 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskResult} import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper -import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.metastore.peristence.TransientJobManager import za.co.absa.pramen.core.pipeline.Job import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner.JobRunResults @@ -91,9 +90,12 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.send((job, Nil, isSucceeded)) } catch { - case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient) - case NonFatal(ex) => onNonFatalException(ex, job, isTransient) - case ex: Throwable => onFatalException(ex, job, isTransient) + case ex: FatalErrorWrapper if ex.cause != null => + onFatalException(ex.cause, job, isTransient) + case NonFatal(ex) => + onNonFatalException(ex, job, isTransient) + case ex: Throwable => + onFatalException(ex, job, isTransient) } } completedJobsChannel.close() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 20a88c039..179bba18c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -24,7 +24,6 @@ import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{CachePolicy, DataFormat} -import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} @@ -35,6 +34,7 @@ import za.co.absa.pramen.core.mocks.job.JobSpy import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock import za.co.absa.pramen.core.utils.SparkUtils import za.co.absa.pramen.core.utils.hive.{HiveHelperSql, HiveQueryTemplates, QueryExecutorSpark} +import za.co.absa.pramen.core.{OperationDefFactory, RuntimeConfigFactory} import java.time.LocalDate @@ -390,7 +390,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) val df1 = reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -404,7 +404,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table2" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table2" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) val ex = intercept[TableNotConfigured] { reader.getTable("table1", Some(infoDate), Some(infoDate)) @@ -420,7 +420,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) val runInfo1 = reader.getTableRunInfo("table1", infoDate) val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) @@ -438,7 +438,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) - val reader = m.getMetastoreReader("table1" :: Nil, infoDate, TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) val metadataManager = reader.metadataManager metadataManager.setMetadata("table1", infoDate, "key1", "value1") @@ -456,7 +456,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) m.saveTable("table1", infoDate.plusDays(1), getDf) - val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, infoDate.plusDays(10), TaskRunReason.New, isIncremental = false) + val reader = m.getMetastoreReader("table1" :: "table2" :: Nil, "output_table", infoDate.plusDays(10), TaskRunReason.New, isIncremental = false, incrementalDryRun = false, isPostProcessing = false) val date1 = reader.getLatestAvailableDate("table1") val date2 = reader.getLatestAvailableDate("table1", Some(infoDate)) @@ -579,9 +579,10 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF operationDef = OperationDefFactory.getDummyOperationDef(schedule = schedule)) ) + val runtimeConfig = RuntimeConfigFactory.getDummyRuntimeConfig().copy(isUndercover = undercover) val infoDateConfig = InfoDateConfig.fromConfig(conf) val bk = new SyncBookkeeperMock val mm = new MetadataManagerNull(isPersistenceEnabled = false) - (MetastoreImpl.fromConfig(conf, infoDateConfig, bk, mm, 0L), bk) + (MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bk, mm, 0L), bk) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 88e0d117c..db710c560 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -23,7 +23,7 @@ import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} import za.co.absa.pramen.core.metadata.MetadataManagerNull import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, TableNotConfigured} +import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore, TableNotConfigured} import za.co.absa.pramen.core.mocks.MetaTableFactory import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock import za.co.absa.pramen.core.utils.hive.{HiveHelper, HiveHelperSql, HiveQueryTemplates} @@ -105,10 +105,10 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), stats } - override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = { + override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, taskRunReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = { val metastore = this - new MetastoreReader { + new MetastoreReaderCore { override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { validateTable(tableName) val from = infoDateFrom.orElse(Option(infoDate)) @@ -170,6 +170,8 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName") } } + + override def commitIncremental(): Unit = {} } } }