Skip to content

Commit

Permalink
#520 Allow gracefully adding 'pramen_batchid' field to metastore tabl…
Browse files Browse the repository at this point in the history
…es consistently with the old behavior.
  • Loading branch information
yruslan committed Nov 25, 2024
1 parent e5755e7 commit b8b85b1
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ pramen {
initial.sourcing.date.weekly.expr = "@runDate - 6"
initial.sourcing.date.monthly.expr = "beginOfMonth(@runDate)"

# If true, Prmen always adds 'pramen_batchid' column, even for non-incremental pipelines
always.add.batchid.column = false

# Pramen can stop the Spark session at the end of execution. This can help cleanly finalize running
# jobs started from 'spark-submit'. But when running on Databriks this results in the job failure.
# Use it with caution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
alwaysAddBatchIdColumn: Boolean,
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
)
Expand All @@ -67,6 +68,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val ALWAYS_ADD_BATCHID_COLUMN = "pramen.always.add.batchid.column"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"

def fromConfig(conf: Config): RuntimeConfig = {
Expand Down Expand Up @@ -130,6 +132,7 @@ object RuntimeConfig {
}

val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val alwaysAddBatchIdColumn = ConfigUtils.getOptionBoolean(conf, ALWAYS_ADD_BATCHID_COLUMN).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)

RuntimeConfig(
Expand All @@ -147,6 +150,7 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
alwaysAddBatchIdColumn,
runMode,
sparkAppDescriptionTemplate
)
Expand All @@ -168,6 +172,7 @@ object RuntimeConfig {
parallelTasks = 1,
stopSparkSession = true,
allowEmptyPipeline = false,
alwaysAddBatchIdColumn = false,
historicalRunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate = None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
import za.co.absa.pramen.api.jobdef.Schedule
import za.co.absa.pramen.api.status._
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
Expand Down Expand Up @@ -356,24 +357,25 @@ abstract class TaskRunnerBase(conf: Config,
dfWithTimestamp.withColumn(task.job.outputTable.infoDateColumn, lit(Date.valueOf(task.infoDate)))
}

val batchIdColumn = task.job.outputTable.batchIdColumn
val needAddBatchId = runtimeConfig.alwaysAddBatchIdColumn || task.job.operation.schedule == Schedule.Incremental

val dfWithBatchIdColumn = if (dfWithInfoDate.schema.exists(f => f.name == batchIdColumn)) {
dfWithInfoDate
} else {
val dfWithBatchIdColumn = if (needAddBatchId) {
val batchIdColumn = task.job.outputTable.batchIdColumn
dfWithInfoDate.withColumn(batchIdColumn, lit(pipelineState.getBatchId))
} else {
dfWithInfoDate
}

val postProcessed = task.job.postProcessing(dfWithBatchIdColumn, task.infoDate, conf)

val dfTransformed = applyFilters(
applyTransformations(postProcessed, task.job.operation.schemaTransformations),
applyTransformations(dfWithBatchIdColumn, task.job.operation.schemaTransformations),
task.job.operation.filters,
task.infoDate,
task.infoDate,
task.infoDate
)

val postProcessed = task.job.postProcessing(dfTransformed, task.infoDate, conf)

val schemaChangesAfterTransform = if (task.job.operation.schemaTransformations.nonEmpty) {
val transformedTable = task.job.outputTable.copy(name = s"${task.job.outputTable.name}_transformed")
handleSchemaChange(dfTransformed, transformedTable, task.infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object RuntimeConfigFactory {
parallelTasks: Int = 1,
stopSparkSession: Boolean = false,
allowEmptyPipeline: Boolean = false,
alwaysAddBatchIdColumn: Boolean = false,
historicalRunMode: RunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
RuntimeConfig(isDryRun,
Expand All @@ -53,6 +54,7 @@ object RuntimeConfigFactory {
parallelTasks,
stopSparkSession,
allowEmptyPipeline,
alwaysAddBatchIdColumn,
historicalRunMode,
sparkAppDescriptionTemplate)
}
Expand Down

0 comments on commit b8b85b1

Please sign in to comment.