Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Aug 31, 2020
1 parent f24e82d commit 9d46a60
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,12 @@ class Analyzer(
override val catalogManager: CatalogManager,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog
with LogicalPlanIntegrity {
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog

override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
!Utils.isTesting || hasUniqueIdsForAttributes(plan)
!Utils.isTesting || LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan)
}

override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.util.Utils
* Optimizers can override this.
*/
abstract class Optimizer(catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] with LogicalPlanIntegrity {
extends RuleExecutor[LogicalPlan] {

// Check for structural integrity of the plan in test mode.
// Currently we check after the execution of each rule if a plan:
Expand All @@ -46,7 +46,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
override protected def isPlanIntegral(plan: LogicalPlan): Boolean = {
!Utils.isTesting || (plan.resolved &&
plan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty) &&
hasUniqueIdsForAttributes(plan)
LogicalPlanIntegrity.hasUniqueExprIdsForAttributes(plan)
}

override protected val excludedOnceBatches: Set[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
}

trait LogicalPlanIntegrity {
object LogicalPlanIntegrity {

private def canGetOutputAttrs(p: LogicalPlan): Boolean = {
p.resolved && !p.expressions.exists { e =>
Expand All @@ -214,7 +214,12 @@ trait LogicalPlanIntegrity {
}
}

def hasUniqueIdsForAttributes(plan: LogicalPlan): Boolean = {
/**
* This method checks if the same expression ID, `ExprId`, refer to an unique attribute.
* Some plan transformers (e.g., `RemoveNoopOperators`) rewrite logical
* plans based on this assumption.
*/
def hasUniqueExprIdsForAttributes(plan: LogicalPlan): Boolean = {
val allOutputAttrs = plan.collect { case p if canGetOutputAttrs(p) =>
p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute])
}
Expand Down

0 comments on commit 9d46a60

Please sign in to comment.