Skip to content

Commit

Permalink
[SPARK-34701][SQL] Introduce AnalysisOnlyCommand that allows its chil…
Browse files Browse the repository at this point in the history
…dren to be removed once the command is marked as analyzed

### What changes were proposed in this pull request?

This PR proposes to introduce the `AnalysisOnlyCommand` trait such that a command that extends this trait can have its children only analyzed, but not optimized. There is a corresponding analysis rule `HandleAnalysisOnlyCommand` that marks the command as analyzed after all other analysis rules are run.

This can be useful if a logical plan has children where they need to be only analyzed, but not optimized - e.g., `CREATE VIEW` or `CACHE TABLE AS`. This also addresses the issue found in #31933.

This PR also updates `CreateViewCommand`, `CacheTableAsSelect`, and `AlterViewAsCommand` to use the new trait / rule such that their children are only analyzed.

### Why are the changes needed?

To address the issue where the plan is unnecessarily re-analyzed in `CreateViewCommand`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover the changes.

Closes #32032 from imback82/skip_transform.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and cloud-fan committed Apr 14, 2021
1 parent 816f6dd commit b5241c9
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
CleanupAliases),
Batch("HandleAnalysisOnlyCommand", Once,
HandleAnalysisOnlyCommand)
)

/**
Expand Down Expand Up @@ -3543,6 +3545,18 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}

/**
* A rule that marks a command as analyzed so that its children are removed to avoid
* being optimized. This rule should run after all other analysis rules are run.
*/
object HandleAnalysisOnlyCommand extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c: AnalysisOnlyCommand if c.resolved =>
checkAnalysis(c)
c.markAsAnalyzed()
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,14 @@ trait Command extends LogicalPlan {
trait LeafCommand extends Command with LeafLike[LogicalPlan]
trait UnaryCommand extends Command with UnaryLike[LogicalPlan]
trait BinaryCommand extends Command with BinaryLike[LogicalPlan]

/**
* A logical node that can be used for a command that requires its children to be only analyzed,
* but not optimized.
*/
trait AnalysisOnlyCommand extends Command {
val isAnalyzed: Boolean
def childrenToAnalyze: Seq[LogicalPlan]
override final def children: Seq[LogicalPlan] = if (isAnalyzed) Nil else childrenToAnalyze
def markAsAnalyzed(): LogicalPlan
}
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,18 @@ case class CacheTableAsSelect(
plan: LogicalPlan,
originalText: String,
isLazy: Boolean,
options: Map[String, String]) extends LeafCommand
options: Map[String, String],
isAnalyzed: Boolean = false) extends AnalysisOnlyCommand {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = {
assert(!isAnalyzed)
copy(plan = newChildren.head)
}

override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil

override def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)
}

/**
* The logical plan of the UNCACHE TABLE command.
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3370,10 +3370,11 @@ class Dataset[T] private[sql](
comment = None,
properties = Map.empty,
originalText = None,
child = logicalPlan,
plan = logicalPlan,
allowExisting = false,
replace = replace,
viewType = viewType)
viewType = viewType,
isAnalyzed = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)

case AlterViewAs(ResolvedView(ident, _), originalText, query) if query.resolved =>
case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
AlterViewAsCommand(
ident.asTableIdentifier,
originalText,
query)

case CreateViewStatement(
tbl, userSpecifiedColumns, comment, properties,
originalText, child, allowExisting, replace, viewType) if child.resolved =>
originalText, child, allowExisting, replace, viewType) =>

val v1TableName = if (viewType != PersistedView) {
// temp view doesn't belong to any catalog and we shouldn't resolve catalog in the name.
Expand All @@ -491,15 +491,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
parseV1Table(tbl, "CREATE VIEW")
}
CreateViewCommand(
v1TableName.asTableIdentifier,
userSpecifiedColumns,
comment,
properties,
originalText,
child,
allowExisting,
replace,
viewType)
name = v1TableName.asTableIdentifier,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
originalText = originalText,
plan = child,
allowExisting = allowExisting,
replace = replace,
viewType = viewType)

case ShowViews(resolved: ResolvedNamespace, pattern, output) =>
resolved match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, UserDefinedExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand All @@ -48,29 +48,41 @@ import org.apache.spark.sql.util.SchemaUtils
* @param properties the properties of this view.
* @param originalText the original SQL text of this view, can be None if this view is created via
* Dataset API.
* @param child the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param plan the logical plan that represents the view; this is used to generate the logical
* plan for temporary view and the view schema.
* @param allowExisting if true, and if the view already exists, noop; if false, and if the view
* already exists, throws analysis exception.
* @param replace if true, and if the view already exists, updates it; if false, and if the view
* already exists, throws analysis exception.
* @param viewType the expected view type to be created with this command.
* @param isAnalyzed whether this command is analyzed or not.
*/
case class CreateViewCommand(
name: TableIdentifier,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String],
child: LogicalPlan,
plan: LogicalPlan,
allowExisting: Boolean,
replace: Boolean,
viewType: ViewType)
extends LeafRunnableCommand {
viewType: ViewType,
isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {

import ViewHelper._

override def innerChildren: Seq[QueryPlan[_]] = Seq(child)
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): CreateViewCommand = {
assert(!isAnalyzed)
copy(plan = newChildren.head)
}

override def innerChildren: Seq[QueryPlan[_]] = Seq(plan)

// `plan` needs to be analyzed, but shouldn't be optimized so that caching works correctly.
override def childrenToAnalyze: Seq[LogicalPlan] = plan :: Nil

def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)

if (viewType == PersistedView) {
require(originalText.isDefined, "'originalText' must be provided to create permanent view")
Expand All @@ -96,10 +108,10 @@ case class CreateViewCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
if (!isAnalyzed) {
throw new AnalysisException("The logical plan that represents the view is not analyzed.")
}
val analyzedPlan = plan

if (userSpecifiedColumns.nonEmpty &&
userSpecifiedColumns.length != analyzedPlan.output.length) {
Expand Down Expand Up @@ -233,12 +245,23 @@ case class CreateViewCommand(
case class AlterViewAsCommand(
name: TableIdentifier,
originalText: String,
query: LogicalPlan) extends LeafRunnableCommand {
query: LogicalPlan,
isAnalyzed: Boolean = false) extends RunnableCommand with AnalysisOnlyCommand {

import ViewHelper._

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): AlterViewAsCommand = {
assert(!isAnalyzed)
copy(query = newChildren.head)
}

override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def childrenToAnalyze: Seq[LogicalPlan] = query :: Nil

def markAsAnalyzed(): LogicalPlan = copy(isAnalyzed = true)

override def run(session: SparkSession): Seq[Row] = {
if (session.sessionState.catalog.isTempView(name)) {
alterTemporaryView(session, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,19 @@ case class CacheTableAsSelectExec(
override lazy val relationName: String = tempViewName

override lazy val planToCache: LogicalPlan = {
Dataset.ofRows(sparkSession,
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
child = query,
allowExisting = false,
replace = false,
viewType = LocalTempView
)
)
CreateViewCommand(
name = TableIdentifier(tempViewName),
userSpecifiedColumns = Nil,
comment = None,
properties = Map.empty,
originalText = Some(originalText),
plan = query,
allowExisting = false,
replace = false,
viewType = LocalTempView,
isAnalyzed = true
).run(sparkSession)

dataFrameForCachedPlan.logicalPlan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ Execute CreateViewCommand (1)
Output: []

(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView, true

(3) LogicalRelation
Arguments: parquet, [key#x, val#x], CatalogTable(
Expand Down

0 comments on commit b5241c9

Please sign in to comment.