diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e198fd58953dd..848749f9e3b37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def check(plan: LogicalPlan): Unit = plan.foreach { node => node match { case metrics @ CollectMetrics(name, _, _) => - val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics) + val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) metricsMap.get(name) match { case Some(other) => - val simplifiedOther = simplifyPlanForCollectedMetrics(other) + val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized) // Exact duplicates are allowed. They can be the result // of a CTE that is used multiple times or a self join. - if (!simplifiedMetrics.sameResult(simplifiedOther)) { + if (simplifiedMetrics != simplifiedOther) { failAnalysis( errorClass = "DUPLICATED_METRICS_NAME", messageParameters = Map("metricName" -> name)) @@ -1102,7 +1102,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { - plan.transformUpWithNewOutput { + plan.resolveOperators { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1110,9 +1110,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - (p.child, p.output.zip(p.child.output)) + p.child } else { - (p, Nil) + p } } }