Skip to content

Commit

Permalink
#437 Implement Spark configuration application to metastore table wri…
Browse files Browse the repository at this point in the history
…tes.
  • Loading branch information
yruslan committed Aug 2, 2024
1 parent 1740b66 commit 40d9a0a
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 10 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pramen.metastore {
# Optional Spark configuration that will be used when writing to the table
# Useful to use Spark Committers (partitioned, directory, magic) only for some of tables.
spark.config {
spark.conf {
spark.sql.sources.commitProtocolClass = "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
spark.sql.parquet.output.committer.class = "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
}
Expand Down Expand Up @@ -2115,7 +2115,7 @@ and 'python.class' to refer to the transformer.
# Arbitrary Spark configuration
# You can use any configuration option from the official documentation: https://spark.apache.org/docs/latest/configuration.html
spark.config {
spark.conf {
spark.executor.instances = 4
spark.executor.cores = 1
spark.executor.memory = "4g"
Expand Down Expand Up @@ -2823,11 +2823,16 @@ Hive configuration and query templates can be defined globally:
```hocon
pramen {
# Hive configuration for the Spark metastore
spark.conf.option = {
spark.conf = {
hive.metastore.uris = "thrift://host1:9083,thrift://host2:9083"
spark.sql.warehouse.dir = "/hive/warehouse"
}
hadoop.conf = {
# Any Hadoop custom options go here
fs.s3a.path.style.access = true
}
hive {
# The API to use to query Hive. Valid values are: "sql" (default), "spark_catalog"
hive.api = "sql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ object Keys {

val HADOOP_REDACT_TOKENS = "hadoop.redacted.tokens"
val HADOOP_OPTION_PREFIX = "hadoop.option"
val HADOOP_OPTION_PREFIX_V2 = "hadoop.conf"

val EXTRA_OPTIONS_PREFIX = "pramen.spark.conf.option"
val EXTRA_OPTIONS_PREFIX_V2 = "pramen.spark.conf"

val ENABLE_HIVE_SUPPORT = "pramen.enable.hive"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class MetastoreImpl(appConfig: Config,
bookkeeper: Bookkeeper,
metadata: MetadataManager,
skipBookKeepingUpdates: Boolean)(implicit spark: SparkSession) extends Metastore {
import MetastoreImpl._

private val log = LoggerFactory.getLogger(this.getClass)

override def getRegisteredTables: Seq[String] = tableDefs.map(_.name)
Expand Down Expand Up @@ -92,7 +94,13 @@ class MetastoreImpl(appConfig: Config,
val mt = getTableDef(tableName)
val isTransient = mt.format.isTransient
val start = Instant.now.getEpochSecond
val stats = MetastorePersistence.fromMetaTable(mt, appConfig).saveTable(infoDate, df, inputRecordCount)

var stats = MetaTableStats(0, None)

withSparkConfig(mt.sparkConfig) {
stats = MetastorePersistence.fromMetaTable(mt, appConfig).saveTable(infoDate, df, inputRecordCount)
}

val finish = Instant.now.getEpochSecond

if (!skipBookKeepingUpdates) {
Expand Down Expand Up @@ -236,6 +244,8 @@ class MetastoreImpl(appConfig: Config,
}

object MetastoreImpl {
private val log = LoggerFactory.getLogger(this.getClass)

val METASTORE_KEY = "pramen.metastore.tables"
val DEFAULT_RECORDS_PER_PARTITION = 500000

Expand All @@ -249,5 +259,34 @@ object MetastoreImpl {

new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, isUndercover)
}

private[core] def withSparkConfig(sparkConfig: Map[String, String])
(action: => Unit)
(implicit spark: SparkSession): Unit = {
val savedConfig = sparkConfig.map {
case (k, _) => (k, spark.conf.getOption(k))
}

sparkConfig.foreach {
case (k, v) =>
log.info(s"Setting '$k' = '$v'...")
spark.conf.set(k, v)
}

try {
action
} finally {
savedConfig.foreach {
case (k, opt) => opt match {
case Some(v) =>
log.info(s"Restoring '$k' = '$v'...")
spark.conf.set(k, v)
case None =>
log.info(s"unsetting '$k'...")
spark.conf.unset(k)
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, MetaTableDef}
import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.config.InfoDateOverride
import za.co.absa.pramen.core.pipeline.OperationDef.SPARK_CONFIG_PREFIX
import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils}

import java.time.LocalDate
Expand Down Expand Up @@ -79,6 +78,7 @@ object MetaTable {
val WRITE_OPTION_KEY = "write.option"
val TABLE_HIVE_CONFIG_PREFIX = "hive"
val DEFAULT_HIVE_CONFIG_PREFIX = "pramen.hive"
val SPARK_CONFIG_PREFIX = "spark.conf"

def fromConfig(conf: Config, infoDateConfig: InfoDateConfig, key: String): Seq[MetaTable] = {
val defaultInfoDateColumnName = infoDateConfig.columnName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ object OperationDef {
val FILTERS_KEY = "filters"
val NOTIFICATION_TARGETS_KEY = "notification.targets"
val SPARK_CONFIG_PREFIX = "spark.config"
val SPARK_CONFIG_PREFIX_V2 = "spark.conf"
val EXTRA_OPTIONS_PREFIX = "option"

val DEFAULT_CONSUME_THREADS = 1
Expand Down Expand Up @@ -98,9 +99,13 @@ object OperationDef {
val schemaTransformations = TransformExpression.fromConfig(conf, SCHEMA_TRANSFORMATIONS_KEY, parent)
val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY)
val notificationTargets = ConfigUtils.getOptListStrings(conf, NOTIFICATION_TARGETS_KEY)
val sparkConfigOptions = ConfigUtils.getExtraOptions(conf, SPARK_CONFIG_PREFIX)
val sparkConfigOptions = ConfigUtils.getExtraOptions(conf, SPARK_CONFIG_PREFIX) ++ ConfigUtils.getExtraOptions(conf, SPARK_CONFIG_PREFIX_V2)
val extraOptions = ConfigUtils.getExtraOptions(conf, EXTRA_OPTIONS_PREFIX)

if (conf.hasPath(SPARK_CONFIG_PREFIX)) {
log.warn(s"Using legacy '$SPARK_CONFIG_PREFIX' option. Please, use the new option: '$SPARK_CONFIG_PREFIX_V2'")
}

val outputInfoDateExpression = outputInfoDateExpressionOpt match {
case Some(expr) => expr
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object PipelineSparkSessionBuilder {
*
* Extra options can be passed as
* {{{
* pramen.spark.conf.option {
* pramen.spark.conf {
* spark.config.option = "value"
* }
* }}}
Expand All @@ -50,7 +50,12 @@ object PipelineSparkSessionBuilder {
val isHiveEnabled = conf.getBoolean(ENABLE_HIVE_SUPPORT)
log.info(s"Hive support enabled = $isHiveEnabled")

val extraOptions = ConfigUtils.getExtraOptions(conf, EXTRA_OPTIONS_PREFIX)
val extraOptions = if (conf.hasPath(EXTRA_OPTIONS_PREFIX)) {
log.warn(s"Using legacy '$EXTRA_OPTIONS_PREFIX' option. Please, use the new option: '$EXTRA_OPTIONS_PREFIX_V2'")
ConfigUtils.getExtraOptions(conf, EXTRA_OPTIONS_PREFIX)
} else
ConfigUtils.getExtraOptions(conf, EXTRA_OPTIONS_PREFIX_V2)

log.info("Extra Spark Config:")
ConfigUtils.renderExtraOptions(extraOptions, KEYS_TO_REDACT)(s => log.info(s))

Expand Down Expand Up @@ -83,9 +88,12 @@ object PipelineSparkSessionBuilder {
def applyHadoopConfig(spark: SparkSession, conf: Config): SparkSession = {
val redactTokens = ConfigUtils.getOptListStrings(conf, HADOOP_REDACT_TOKENS).toSet

if (conf.hasPath(HADOOP_OPTION_PREFIX)) {
if (conf.hasPath(HADOOP_OPTION_PREFIX) || conf.hasPath(HADOOP_OPTION_PREFIX_V2)) {
if (conf.hasPath(HADOOP_OPTION_PREFIX)) {
log.warn(s"Using legacy '$HADOOP_OPTION_PREFIX' option. Please, use the new option: '$HADOOP_OPTION_PREFIX_V2'")
}
val sc = spark.sparkContext
val hadoopOptions = ConfigUtils.getExtraOptions(conf, HADOOP_OPTION_PREFIX)
val hadoopOptions = ConfigUtils.getExtraOptions(conf, HADOOP_OPTION_PREFIX) ++ ConfigUtils.getExtraOptions(conf, HADOOP_OPTION_PREFIX_V2)

hadoopOptions.foreach { case (key, value) =>
val redactedValue = ConfigUtils.getRedactedValue(key, value, redactTokens).toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,37 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF
}
}

"withSparkConfig()" should {
"set the config at runtime, and restore the original config afterwards" in {
val sparkConfig = Map(
"spark.sql.sources.commitProtocolClass" -> "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
"spark.sql.parquet.output.committer.class" -> "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
"spark.pramen.test" -> "test"
)

var inner1: String = null
var inner2: String = null
var inner3: String = null

MetastoreImpl.withSparkConfig(sparkConfig) {
inner1 = spark.conf.get("spark.sql.sources.commitProtocolClass")
inner2 = spark.conf.get("spark.sql.parquet.output.committer.class")
inner3 = spark.conf.get("spark.pramen.test")
}

val outer1 = spark.conf.get("spark.sql.sources.commitProtocolClass")
val outer2 = spark.conf.get("spark.sql.parquet.output.committer.class")
val outer3 = spark.conf.getOption("spark.pramen.test")

assert(inner1 == "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
assert(inner2 == "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
assert(inner3 == "test")
assert(outer1 != "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
assert(outer2 != "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
assert(outer3.isEmpty)
}
}

def getDf: DataFrame = {
import spark.implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ class MetaTableSuite extends AnyWordSpec {
| x = test2
| y = 101
|}
|spark.conf = {
| key1 = value1
|}
|""".stripMargin)

val defaultHiveConfig = HiveDefaultConfig(HiveApi.Sql,
Expand Down Expand Up @@ -229,6 +232,7 @@ class MetaTableSuite extends AnyWordSpec {
assert(metaTable.readOptions("some.option.b") == "12")
assert(metaTable.writeOptions("x") == "test2")
assert(metaTable.writeOptions("y") == "101")
assert(metaTable.sparkConfig("key1") == "value1")
}

"load a metatable definition with hive overrides" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,44 @@ class PipelineSparkSessionBuilderSuite extends AnyWordSpec {
assert(key1Set)
assert(key2Set)
}

"apply the given config to the spark session version 2" in {
val conf = ConfigFactory.parseString(
s"""$HADOOP_REDACT_TOKENS = [ secret ]
|$HADOOP_OPTION_PREFIX_V2 {
| fs.s3a.aws.credentials.provider = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
| fs.s3a.secret.key = "acs123"
|}
|""".stripMargin
)

val sparkMock = mock(classOf[SparkSession])
val scMock = mock(classOf[SparkContext])
val hcMock = mock(classOf[Configuration])

whenMock(sparkMock.sparkContext).thenReturn(scMock)
whenMock(scMock.hadoopConfiguration).thenReturn(hcMock)

var key1Set = false
var key2Set = false

doAnswer(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
key1Set = true
}
}).when(hcMock).set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

doAnswer(new Answer[Unit] {
override def answer(invocationOnMock: InvocationOnMock): Unit = {
key2Set = true
}
}).when(hcMock).set("fs.s3a.secret.key", "acs123")

PipelineSparkSessionBuilder.applyHadoopConfig(sparkMock, conf)

assert(key1Set)
assert(key2Set)
}
}

"getSparkAppName" should {
Expand Down

0 comments on commit 40d9a0a

Please sign in to comment.