Skip to content

Commit

Permalink
#498 Implement the setting of the Spark Application description.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 21, 2024
1 parent cb68ec6 commit d2fd7e0
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 8 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2454,6 +2454,27 @@ You can use any source/sink combination in transfer jobs.

We describe here a more complicated use cases.

### Dynamically changing Spark Application description
You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing.

Example configuration:
```hocon
pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate"
```

These variables are available:

| Variable | Description |
|--------------|-------------------------------------------------------------------------------|
| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). |
| @tenant | The name of the tenant (if defined at `pramen.environment.name`). |
| @environment | The environment (if defined at `pramen.tenant`). |
| @jobName | The name of the job as defined in the operation definition. |
| @infoDate | The information date the job is running for. |
| @outputTable | The output metastore table of the job. |
| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. |


### Startup and shutdown hooks

Startup and shutdown hooks allow running custom code before and after the pipeline runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
historicalRunMode: RunMode
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
)

object RuntimeConfig {
Expand All @@ -66,6 +67,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -128,6 +130,7 @@ object RuntimeConfig {
}

val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)

RuntimeConfig(
isDryRun = isDryRun,
Expand All @@ -144,7 +147,8 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
runMode
runMode,
sparkAppDescriptionTemplate
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
completedJobsChannel.close()
}

private[core] def setSparkAppDescription(): Unit = synchronized {
???
}

private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex)
val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.runner.task

import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
Expand All @@ -31,11 +31,12 @@ import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.hive.HiveHelper

import java.sql.Date
Expand All @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config,
runtimeConfig: RuntimeConfig,
pipelineState: PipelineState,
applicationId: String) extends TaskRunner {
import TaskRunnerBase._

implicit private val ecDefault: ExecutionContext = ExecutionContext.global
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

Expand Down Expand Up @@ -123,6 +126,12 @@ abstract class TaskRunnerBase(conf: Config,
@volatile var runStatus: RunStatus = null

try {
runtimeConfig.sparkAppDescriptionTemplate.foreach { template =>
val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf)
val spark = SparkSession.builder().getOrCreate()
spark.sparkContext.setJobDescription(description)
}

ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) {
log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.")
runStatus = doValidateAndRunTask(task)
Expand Down Expand Up @@ -604,3 +613,23 @@ abstract class TaskRunnerBase(conf: Config,
}
}
}

object TaskRunnerBase {
def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = {
val job = task.job
val pipelineName = conf.getString(PIPELINE_NAME_KEY)
val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN")
val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN")
val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else ""

template.replaceAll("@jobName", job.name)
.replaceAll("@infoDate", task.infoDate.toString)
.replaceAll("@metastoreTable", job.outputTable.name)
.replaceAll("@outputTable", job.outputTable.name)
.replaceAll("@table", job.outputTable.name)
.replaceAll("@pipeline", pipelineName)
.replaceAll("@tenant", tenant)
.replaceAll("@environment", environmentName)
.replaceAll("@dryRun", dryRun)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.pipeline.Task
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline.{Job, Task}
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}

import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.{ExecutorService, Semaphore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object RuntimeConfigFactory {
parallelTasks: Int = 1,
stopSparkSession: Boolean = false,
allowEmptyPipeline: Boolean = false,
historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = {
historicalRunMode: RunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
Expand All @@ -52,7 +53,8 @@ object RuntimeConfigFactory {
parallelTasks,
stopSparkSession,
allowEmptyPipeline,
historicalRunMode)
historicalRunMode,
sparkAppDescriptionTemplate)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
| load.date.to = 2021-01-10
| parallel.tasks = 4
| stop.spark.session = true
| job.description.template = "Test template"
|}
|""".stripMargin

Expand All @@ -64,6 +65,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.get.toString == "2021-01-10")
assert(runtimeConfig.parallelTasks == 4)
assert(runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template"))
}

"have default values" in {
Expand All @@ -84,6 +86,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.isEmpty)
assert(runtimeConfig.parallelTasks == 1)
assert(!runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty)
}
}

Expand Down

0 comments on commit d2fd7e0

Please sign in to comment.