Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
luluorta committed Nov 25, 2020
1 parent 6a41103 commit 22b7dde
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ license: |

- In Spark 3.1, refreshing a table will trigger an uncache operation for all other caches that reference the table, even if the table itself is not cached. In Spark 3.0 the operation will only be triggered if the table itself is cached.

- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.view.applySQLConfigs` to `false`.
- In Spark 3.1, creating or altering a view will capture runtime SQL configs and store them as view properties. These configs will be applied during the parsing and analysis phases of the view resolution. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.useCurrentConfigsForView` to `true`.

## Upgrading from Spark SQL 3.0 to 3.0.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.commons.lang3.StringUtils
import org.json4s.jackson.JsonMethods

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand All @@ -39,7 +38,6 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.JsonProtocol


/**
Expand Down Expand Up @@ -324,19 +322,18 @@ case class CatalogTable(
}

/**
* Return the SQL configs of the query that creates a view, the configs are applied when parsing
* and analyzing the view, should be empty if the CatalogTable is not a View or created by older
* Return the SQL configs when creating a view, the configs are applied when parsing and
* analyzing the view, should be empty if the CatalogTable is not a View or created by older
* versions of Spark(before 3.1.0).
*/
def viewQuerySQLConfigs: Map[String, String] = {
try {
properties.get(CatalogTable.VIEW_QUERY_SQL_CONFIGS)
.map(confJson => JsonProtocol.mapFromJson(JsonMethods.parse(confJson)).toMap)
.getOrElse(Map.empty)
for ((key, value) <- properties if key.startsWith(CatalogTable.VIEW_SQL_CONFIG_PREFIX))
yield (key.substring(CatalogTable.VIEW_SQL_CONFIG_PREFIX.length), value)
} catch {
case e: Exception =>
throw new AnalysisException(
"Corrupted view query SQL configs in catalog", cause = Some(e))
"Corrupted view SQL configs in catalog", cause = Some(e))
}
}

Expand Down Expand Up @@ -430,11 +427,11 @@ object CatalogTable {
props.toMap
}

val VIEW_SQL_CONFIG_PREFIX = VIEW_PREFIX + "sqlConfig."

val VIEW_QUERY_OUTPUT_PREFIX = VIEW_PREFIX + "query.out."
val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."

val VIEW_QUERY_SQL_CONFIGS = VIEW_PREFIX + "query.sqlConfigs"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,14 @@ case class View(
object View {
def effectiveSQLConf(configs: Map[String, String]): SQLConf = {
val activeConf = SQLConf.get
if (!activeConf.applyViewSQLConfigs) return activeConf
if (activeConf.useCurrentSQLConfigsForView) return activeConf

val sqlConf = new SQLConf()
for ((k, v) <- configs) {
sqlConf.settings.put(k, v)
}
// We should respect the current maxNestedViewDepth cause the view resolving are executed
// from top to down.
sqlConf.setConf(SQLConf.MAX_NESTED_VIEW_DEPTH, activeConf.maxNestedViewDepth)
sqlConf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1448,14 +1448,14 @@ object SQLConf {
"must be positive.")
.createWithDefault(100)

val APPLY_VIEW_SQL_CONFIGS =
buildConf("spark.sql.legacy.view.applySQLConfigs")
val USE_CURRENT_SQL_CONFIGS_FOR_VIEW =
buildConf("spark.sql.legacy.useCurrentConfigsForView")
.internal()
.doc("When true, captured SQL Configs will be applied during the parsing and analysis " +
"phases of the view resolution.")
.doc("When true, SQL Configs of the current active SparkSession instead of the captured " +
"ones will be applied during the parsing and analysis phases of the view resolution.")
.version("3.1.0")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val STREAMING_FILE_COMMIT_PROTOCOL_CLASS =
buildConf("spark.sql.streaming.commitProtocolClass")
Expand Down Expand Up @@ -3394,7 +3394,7 @@ class SQLConf extends Serializable with Logging {

def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH)

def applyViewSQLConfigs: Boolean = getConf(SQLConf.APPLY_VIEW_SQL_CONFIGS)
def useCurrentSQLConfigsForView: Boolean = getConf(SQLConf.USE_CURRENT_SQL_CONFIGS_FOR_VIEW)

def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.command

import scala.collection.mutable

import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunction, UnresolvedRelation, ViewType}
Expand All @@ -32,7 +30,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.JsonProtocol

/**
* Create or replace a view with given query plan. This command will generate some view-specific
Expand Down Expand Up @@ -337,19 +334,16 @@ case class ShowViewsCommand(

object ViewHelper {

private val configPrefixBlacklist = Seq(
private val configPrefixDenyList = Seq(
SQLConf.MAX_NESTED_VIEW_DEPTH.key,
"spark.sql.optimizer.",
"spark.sql.codegen.",
"spark.sql.execution.",
"spark.sql.shuffle.",
"spark.sql.adaptive.")

private def isConfigBlacklisted(key: String): Boolean = {
for (prefix <- configPrefixBlacklist if key.startsWith(prefix)) {
return true
}
false
private def shouldCaptureConfig(key: String): Boolean = {
!configPrefixDenyList.exists(prefix => key.startsWith(prefix))
}

import CatalogTable._
Expand Down Expand Up @@ -380,28 +374,27 @@ object ViewHelper {
}

/**
* Convert the view query SQL configs in `properties`.
* Convert the view SQL configs to `properties`.
*/
private def generateQuerySQLConfigs(conf: SQLConf): Map[String, String] = {
private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
val modifiedConfs = conf.getAllConfs.filter { case (k, _) =>
conf.isModifiable(k) && !isConfigBlacklisted(k)
conf.isModifiable(k) && shouldCaptureConfig(k)
}
val props = new mutable.HashMap[String, String]
if (modifiedConfs.nonEmpty) {
val confJson = compact(render(JsonProtocol.mapToJson(modifiedConfs)))
props.put(VIEW_QUERY_SQL_CONFIGS, confJson)
for ((key, value) <- modifiedConfs) {
props.put(s"$VIEW_SQL_CONFIG_PREFIX$key", value)
}
props.toMap
}

/**
* Remove the view query SQL configs in `properties`.
* Remove the view SQL configs in `properties`.
*/
private def removeQuerySQLConfigs(properties: Map[String, String]): Map[String, String] = {
private def removeSQLConfigs(properties: Map[String, String]): Map[String, String] = {
// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable,
// while `CatalogTable` should be serializable.
properties.filterNot { case (key, _) =>
key == VIEW_QUERY_SQL_CONFIGS
key.startsWith(VIEW_SQL_CONFIG_PREFIX)
}
}

Expand All @@ -425,19 +418,19 @@ object ViewHelper {
// for createViewCommand queryOutput may be different from fieldNames
val queryOutput = analyzedPlan.schema.fieldNames

val conf = SQLConf.get
val conf = session.sessionState.conf

// Generate the query column names, throw an AnalysisException if there exists duplicate column
// names.
SchemaUtils.checkColumnNameDuplication(
fieldNames, "in the view definition", conf.resolver)

// Generate the view default catalog and namespace.
// Generate the view default catalog and namespace, as well as captured SQL configs.
val manager = session.sessionState.catalogManager
removeQuerySQLConfigs(removeQueryColumnNames(properties)) ++
removeSQLConfigs(removeQueryColumnNames(properties)) ++
catalogAndNamespaceToProps(manager.currentCatalog.name, manager.currentNamespace) ++
generateQueryColumnNames(queryOutput) ++
generateQuerySQLConfigs(conf)
sqlConfigsToProps(conf) ++
generateQueryColumnNames(queryOutput)
}

/**
Expand Down
Loading

0 comments on commit 22b7dde

Please sign in to comment.