From 4f25df1dc25cc4f002107821cc67e35c1fe0e42c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 9 Aug 2023 19:16:37 +0800 Subject: [PATCH] [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`. This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan. ### Why are the changes needed? fix potential bugs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42408 from cloud-fan/collect-metrics. Authored-by: Wenchen Fan Signed-off-by: Kent Yao --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fee5660017c7d..0b953fc2b61f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1080,13 +1080,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def check(plan: LogicalPlan): Unit = plan.foreach { node => node match { case metrics @ CollectMetrics(name, _, _) => - val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics) + val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) metricsMap.get(name) match { case Some(other) => - val simplifiedOther = simplifyPlanForCollectedMetrics(other) + val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized) // Exact duplicates are allowed. They can be the result // of a CTE that is used multiple times or a self join. - if (!simplifiedMetrics.sameResult(simplifiedOther)) { + if (simplifiedMetrics != simplifiedOther) { failAnalysis( errorClass = "DUPLICATED_METRICS_NAME", messageParameters = Map("metricName" -> name)) @@ -1111,7 +1111,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { - plan.transformUpWithNewOutput { + plan.resolveOperators { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1119,9 +1119,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - (p.child, p.output.zip(p.child.output)) + p.child } else { - (p, Nil) + p } } }