From d2fd7e0f4982adc904830937183c3e285691a8d4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 17 Oct 2024 10:33:08 +0200 Subject: [PATCH] #498 Implement the setting of the Spark Application description. --- README.md | 21 ++++++++++++ .../core/app/config/RuntimeConfig.scala | 8 +++-- .../jobrunner/ConcurrentJobRunnerImpl.scala | 4 +++ .../core/runner/task/TaskRunnerBase.scala | 33 +++++++++++++++++-- .../runner/task/TaskRunnerMultithreaded.scala | 5 +-- .../pramen/core/RuntimeConfigFactory.scala | 6 ++-- .../core/app/config/RuntimeConfigSuite.scala | 3 ++ 7 files changed, 72 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index e4bf267ba..4f5889e83 100644 --- a/README.md +++ b/README.md @@ -2454,6 +2454,27 @@ You can use any source/sink combination in transfer jobs. We describe here a more complicated use cases. +### Dynamically changing Spark Application description +You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing. + +Example configuration: +```hocon +pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate" +``` + +These variables are available: + +| Variable | Description | +|--------------|-------------------------------------------------------------------------------| +| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). | +| @tenant | The name of the tenant (if defined at `pramen.environment.name`). | +| @environment | The environment (if defined at `pramen.tenant`). | +| @jobName | The name of the job as defined in the operation definition. | +| @infoDate | The information date the job is running for. | +| @outputTable | The output metastore table of the job. | +| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. | + + ### Startup and shutdown hooks Startup and shutdown hooks allow running custom code before and after the pipeline runs. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala index edb4d9ce3..655b8edf1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala @@ -42,7 +42,8 @@ case class RuntimeConfig( parallelTasks: Int, stopSparkSession: Boolean, allowEmptyPipeline: Boolean, - historicalRunMode: RunMode + historicalRunMode: RunMode, + sparkAppDescriptionTemplate: Option[String] ) object RuntimeConfig { @@ -66,6 +67,7 @@ object RuntimeConfig { val STOP_SPARK_SESSION = "pramen.stop.spark.session" val VERBOSE = "pramen.verbose" val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline" + val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template" def fromConfig(conf: Config): RuntimeConfig = { val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP) @@ -128,6 +130,7 @@ object RuntimeConfig { } val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false) + val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE) RuntimeConfig( isDryRun = isDryRun, @@ -144,7 +147,8 @@ object RuntimeConfig { parallelTasks = parallelTasks, stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION), allowEmptyPipeline, - runMode + runMode, + sparkAppDescriptionTemplate ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index 4122bea5c..b29ffc7a3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -99,6 +99,10 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, completedJobsChannel.close() } + private[core] def setSparkAppDescription(): Unit = synchronized { + ??? + } + private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = { log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex) val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index de2683a82..07ce3e557 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.runner.task import com.typesafe.config.Config -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.lit import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ @@ -31,11 +31,12 @@ import za.co.absa.pramen.core.lock.TokenLockFactory import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.JobPreRunStatus._ +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ -import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils} +import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils} import za.co.absa.pramen.core.utils.hive.HiveHelper import java.sql.Date @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config, runtimeConfig: RuntimeConfig, pipelineState: PipelineState, applicationId: String) extends TaskRunner { + import TaskRunnerBase._ + implicit private val ecDefault: ExecutionContext = ExecutionContext.global implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay) @@ -123,6 +126,12 @@ abstract class TaskRunnerBase(conf: Config, @volatile var runStatus: RunStatus = null try { + runtimeConfig.sparkAppDescriptionTemplate.foreach { template => + val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf) + val spark = SparkSession.builder().getOrCreate() + spark.sparkContext.setJobDescription(description) + } + ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) { log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.") runStatus = doValidateAndRunTask(task) @@ -604,3 +613,23 @@ abstract class TaskRunnerBase(conf: Config, } } } + +object TaskRunnerBase { + def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = { + val job = task.job + val pipelineName = conf.getString(PIPELINE_NAME_KEY) + val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN") + val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN") + val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else "" + + template.replaceAll("@jobName", job.name) + .replaceAll("@infoDate", task.infoDate.toString) + .replaceAll("@metastoreTable", job.outputTable.name) + .replaceAll("@outputTable", job.outputTable.name) + .replaceAll("@table", job.outputTable.name) + .replaceAll("@pipeline", pipelineName) + .replaceAll("@tenant", tenant) + .replaceAll("@environment", environmentName) + .replaceAll("@dryRun", dryRun) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala index 279abf770..8f378f105 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerMultithreaded.scala @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper import za.co.absa.pramen.core.journal.Journal import za.co.absa.pramen.core.lock.TokenLockFactory -import za.co.absa.pramen.core.pipeline.Task +import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY} +import za.co.absa.pramen.core.pipeline.{Job, Task} import za.co.absa.pramen.core.state.PipelineState -import za.co.absa.pramen.core.utils.Emoji +import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji} import java.util.concurrent.Executors.newFixedThreadPool import java.util.concurrent.{ExecutorService, Semaphore} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala index c07b6f540..542321914 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala @@ -37,7 +37,8 @@ object RuntimeConfigFactory { parallelTasks: Int = 1, stopSparkSession: Boolean = false, allowEmptyPipeline: Boolean = false, - historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = { + historicalRunMode: RunMode = RunMode.CheckUpdates, + sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = { RuntimeConfig(isDryRun, isRerun, runTables, @@ -52,7 +53,8 @@ object RuntimeConfigFactory { parallelTasks, stopSparkSession, allowEmptyPipeline, - historicalRunMode) + historicalRunMode, + sparkAppDescriptionTemplate) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala index 7e7cf21f6..22c1535d1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/app/config/RuntimeConfigSuite.scala @@ -43,6 +43,7 @@ class RuntimeConfigSuite extends AnyWordSpec { | load.date.to = 2021-01-10 | parallel.tasks = 4 | stop.spark.session = true + | job.description.template = "Test template" |} |""".stripMargin @@ -64,6 +65,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.get.toString == "2021-01-10") assert(runtimeConfig.parallelTasks == 4) assert(runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template")) } "have default values" in { @@ -84,6 +86,7 @@ class RuntimeConfigSuite extends AnyWordSpec { assert(runtimeConfig.runDateTo.isEmpty) assert(runtimeConfig.parallelTasks == 1) assert(!runtimeConfig.stopSparkSession) + assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty) } }