Skip to content

Commit

Permalink
#520 Prepare interfaces for incremental transformer processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Nov 21, 2024
1 parent fe94be8 commit e2034c1
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import za.co.absa.pramen.api.MetadataManager
import za.co.absa.pramen.core.PramenImpl
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.{TokenLockFactory, TokenLockFactoryAllow}
Expand Down Expand Up @@ -57,7 +57,7 @@ object AppContextImpl {

val (bookkeeper, tokenLockFactory, journal, metadataManager, closable) = Bookkeeper.fromConfig(appConfig.bookkeepingConfig, appConfig.runtimeConfig, batchId)

val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, appConfig.runtimeConfig, appConfig.infoDateDefaults, bookkeeper, metadataManager, batchId)

PramenImpl.instance.asInstanceOf[PramenImpl].setMetadataManager(metadataManager)
PramenImpl.instance.asInstanceOf[PramenImpl].setWorkflowConfig(conf)
Expand All @@ -83,8 +83,9 @@ object AppContextImpl {
val appConfig = AppConfig.fromConfig(conf)

val metadataManager = new MetadataManagerNull(isPersistenceEnabled = false)
val runtimeConfig = RuntimeConfig.default

val metastore: Metastore = MetastoreImpl.fromConfig(conf, infoDateConfig, bookkeeper, metadataManager, 0L)
val metastore: Metastore = MetastoreImpl.fromConfig(conf, runtimeConfig, infoDateConfig, bookkeeper, metadataManager, 0L)

val appContext = new AppContextImpl(
appConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,25 @@ object RuntimeConfig {
sparkAppDescriptionTemplate
)
}

def default: RuntimeConfig = {
RuntimeConfig(
isDryRun = false,
isRerun = false,
runTables = Seq.empty,
isUndercover = false,
useLocks = true,
checkOnlyLateData = false,
checkOnlyNewData = true,
emailIfNoChanges = false,
runDate = LocalDate.now(),
runDateTo = None,
isInverseOrder = false,
parallelTasks = 1,
stopSparkSession = true,
allowEmptyPipeline = false,
historicalRunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate = None
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ trait Metastore {

def getStats(tableName: String, infoDate: LocalDate): MetaTableStats

def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader
def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.offset.DataOffset
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER
import za.co.absa.pramen.core.app.config.{InfoDateConfig, RuntimeConfig}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.metastore.model.MetaTable
Expand All @@ -36,12 +35,14 @@ import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.hive.{HiveFormat, HiveHelper}

import java.time.{Instant, LocalDate}
import scala.collection.mutable.ListBuffer

class MetastoreImpl(appConfig: Config,
tableDefs: Seq[MetaTable],
bookkeeper: Bookkeeper,
metadata: MetadataManager,
batchId: Long,
isRerun: Boolean,
skipBookKeepingUpdates: Boolean)(implicit spark: SparkSession) extends Metastore {
import MetastoreImpl._

Expand Down Expand Up @@ -201,10 +202,12 @@ class MetastoreImpl(appConfig: Config,
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).getStats(infoDate, onlyForCurrentBatchId = false)
}

override def getMetastoreReader(tables: Seq[String], infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean): MetastoreReader = {
override def getMetastoreReader(tables: Seq[String], outputTable: String, infoDate: LocalDate, runReason: TaskRunReason, isIncremental: Boolean, incrementalDryRun: Boolean, isPostProcessing: Boolean): MetastoreReader = {
val metastore = this

new MetastoreReader {
new MetastoreReaderCore {
private val incrementalInputTables = new ListBuffer[String]

override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
validateTable(tableName)
val from = infoDateFrom.orElse(Option(infoDate))
Expand All @@ -214,9 +217,12 @@ class MetastoreImpl(appConfig: Config,

override def getCurrentBatch(tableName: String): DataFrame = {
validateTable(tableName)
if (isIncremental)
if (isPostProcessing && isIncremental) {
metastore.getBatch(tableName, infoDate, None)
else
} else if (isIncremental && !isRerun && !isPostProcessing) {
incrementalInputTables += tableName
getIncremental(tableName, outputTable, infoDate)
} else
metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}

Expand Down Expand Up @@ -262,11 +268,24 @@ class MetastoreImpl(appConfig: Config,

override def metadataManager: MetadataManager = metadata

override def commitIncremental(): Unit = {
// ToDo Replace this with proper offset management implementation
}

private def validateTable(tableName: String): Unit = {
if (!tables.contains(tableName)) {
throw new TableNotConfigured(s"Attempt accessing non-dependent table: $tableName")
}
}

private def getIncremental(tableName: String, transformationOutputTable: String, infoDate: LocalDate): DataFrame = {
// Don't forget to use incrementalDryRun to decide if we need to commit
val needsToCommit = !isPostProcessing && !incrementalDryRun
val om = bookkeeper.getOffsetManager

// ToDo Replace this with proper offset management implementation
metastore.getBatch(tableName, infoDate, None)
}
}
}

Expand All @@ -288,15 +307,20 @@ object MetastoreImpl {
val DEFAULT_RECORDS_PER_PARTITION = 500000

def fromConfig(conf: Config,
runtimeConfig: RuntimeConfig,
infoDateConfig: InfoDateConfig,
bookkeeper: Bookkeeper,
metadataManager: MetadataManager,
batchId: Long)(implicit spark: SparkSession): MetastoreImpl = {
val tableDefs = MetaTable.fromConfig(conf, infoDateConfig, METASTORE_KEY)

val isUndercover = ConfigUtils.getOptionBoolean(conf, UNDERCOVER).getOrElse(false)

new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, batchId, isUndercover)
new MetastoreImpl(conf,
tableDefs,
bookkeeper,
metadataManager,
batchId,
runtimeConfig.isRerun,
runtimeConfig.isUndercover)
}

private[core] def withSparkConfig(sparkConfig: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.metastore

import za.co.absa.pramen.api.MetastoreReader

trait MetastoreReaderCore extends MetastoreReader {
def commitIncremental(): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.metastore.model

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetRequest

case class TrackingTable(
inputTable: String,
outputTable: String,
commitRequest: DataOffsetRequest,
data: DataFrame
)
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = true),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = true, incrementalDryRun = false, isPostProcessing = true),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class IngestionJob(operationDef: OperationDef,
source.postProcess(
sourceTable.query,
outputTable.name,
metastore.getMetastoreReader(Seq(outputTable.name), infoDate, runReason, isIncremental = false),
metastore.getMetastoreReader(Seq(outputTable.name), outputTable.name, infoDate, runReason, isIncremental = false, incrementalDryRun = false, isPostProcessing = true),
infoDate,
operationDef.extraOptions
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Sink}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore}
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderCore}
import za.co.absa.pramen.core.pipeline.JobPreRunStatus.Ready
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.utils.ConfigUtils
Expand Down Expand Up @@ -122,10 +122,12 @@ class SinkJob(operationDef: OperationDef,
case NonFatal(ex) => throw new IllegalStateException("Unable to connect to the sink.", ex)
}

val metastoreReader = metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false)

try {
val sinkResult = sink.send(df,
sinkTable.metaTableName,
metastore.getMetastoreReader(List(sinkTable.metaTableName) ++ inputTables, infoDate, runReason, isIncremental),
metastoreReader,
infoDate,
sinkTable.options
)
Expand Down Expand Up @@ -153,6 +155,7 @@ class SinkJob(operationDef: OperationDef,
} finally {
Try {
sink.close()
metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Transformer}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.{Metastore, MetastoreReaderCore}
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental, ScheduleStrategySourcing}

import java.time.{Instant, LocalDate}
Expand Down Expand Up @@ -53,11 +53,18 @@ class TransformationJob(operationDef: OperationDef,
}

override def validate(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config): Reason = {
transformer.validate(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions)
transformer.validate(metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = false), infoDate, operationDef.extraOptions)
}

override def run(infoDate: LocalDate, runReason: TaskRunReason, conf: Config): RunResult = {
RunResult(transformer.run(metastore.getMetastoreReader(inputTables, infoDate, runReason, isIncremental), infoDate, operationDef.extraOptions))
val metastoreReader = metastore.getMetastoreReader(inputTables, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = true, isPostProcessing = false)
val runResult = try {
RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))
} finally {
metastoreReader.asInstanceOf[MetastoreReaderCore].commitIncremental()
}

runResult
}

def postProcessing(df: DataFrame,
Expand All @@ -77,10 +84,12 @@ class TransformationJob(operationDef: OperationDef,
else
SaveResult(metastore.saveTable(outputTable.name, infoDate, df, None))

val metastoreReader = metastore.getMetastoreReader(inputTables :+ outputTable.name, outputTable.name, infoDate, runReason, isIncremental, incrementalDryRun = false, isPostProcessing = true)

try {
transformer.postProcess(
outputTable.name,
metastore.getMetastoreReader(inputTables :+ outputTable.name, infoDate, runReason, isIncremental),
metastoreReader,
infoDate, operationDef.extraOptions
)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskResult}
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.peristence.TransientJobManager
import za.co.absa.pramen.core.pipeline.Job
import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner.JobRunResults
Expand Down Expand Up @@ -91,9 +90,12 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,

completedJobsChannel.send((job, Nil, isSucceeded))
} catch {
case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient)
case NonFatal(ex) => onNonFatalException(ex, job, isTransient)
case ex: Throwable => onFatalException(ex, job, isTransient)
case ex: FatalErrorWrapper if ex.cause != null =>
onFatalException(ex.cause, job, isTransient)
case NonFatal(ex) =>
onNonFatalException(ex, job, isTransient)
case ex: Throwable =>
onFatalException(ex, job, isTransient)
}
}
completedJobsChannel.close()
Expand Down
Loading

0 comments on commit e2034c1

Please sign in to comment.