diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index de698e73d3b7a..52436a9b8801d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -132,13 +132,12 @@ class Analyzer( override val catalogManager: CatalogManager, conf: SQLConf, maxIterations: Int) - extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog - with LogicalPlanIntegrity { + extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { - !Utils.isTesting || hasUniqueIdsForAttributes(plan) + !Utils.isTesting || LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan) } override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7934785e66929..d736589bc0997 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -37,7 +37,7 @@ import org.apache.spark.util.Utils * Optimizers can override this. */ abstract class Optimizer(catalogManager: CatalogManager) - extends RuleExecutor[LogicalPlan] with LogicalPlanIntegrity { + extends RuleExecutor[LogicalPlan] { // Check for structural integrity of the plan in test mode. // Currently we check after the execution of each rule if a plan: @@ -46,7 +46,7 @@ abstract class Optimizer(catalogManager: CatalogManager) override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || (plan.resolved && plan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty) && - hasUniqueIdsForAttributes(plan) + LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan) } override protected val excludedOnceBatches: Set[String] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index d25e24eff75b3..5addda746e865 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -204,7 +204,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode { override final def outputOrdering: Seq[SortOrder] = child.outputOrdering } -trait LogicalPlanIntegrity { +object LogicalPlanIntegrity { private def canGetOutputAttrs(p: LogicalPlan): Boolean = { p.resolved && !p.expressions.exists { e => @@ -214,7 +214,12 @@ trait LogicalPlanIntegrity { } } - def hasUniqueIdsForAttributes(plan: LogicalPlan): Boolean = { + /** + * This method checks if expression IDs, `ExprId`s, refer to unique attributes. + * Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical + * plans based on this assumption. + */ + def hasUniqueExprIdsForAttributes(plan: LogicalPlan): Boolean = { val allOutputAttrs = plan.collect { case p if canGetOutputAttrs(p) => p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute]) }