Skip to content

Commit

Permalink
[SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be…
Browse files Browse the repository at this point in the history
… used with new outputs

### What changes were proposed in this pull request?

This is a followup of apache#41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`.

This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan.

### Why are the changes needed?

fix potential bugs

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes apache#42408 from cloud-fan/collect-metrics.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
cloud-fan authored and yaooqinn committed Aug 9, 2023
1 parent c73660c commit 4f25df1
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1080,13 +1080,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
def check(plan: LogicalPlan): Unit = plan.foreach { node =>
node match {
case metrics @ CollectMetrics(name, _, _) =>
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized)
metricsMap.get(name) match {
case Some(other) =>
val simplifiedOther = simplifyPlanForCollectedMetrics(other)
val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized)
// Exact duplicates are allowed. They can be the result
// of a CTE that is used multiple times or a self join.
if (!simplifiedMetrics.sameResult(simplifiedOther)) {
if (simplifiedMetrics != simplifiedOther) {
failAnalysis(
errorClass = "DUPLICATED_METRICS_NAME",
messageParameters = Map("metricName" -> name))
Expand All @@ -1111,17 +1111,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
* duplicates metric definition.
*/
private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = {
plan.transformUpWithNewOutput {
plan.resolveOperators {
case p: Project if p.projectList.size == p.child.output.size =>
val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
case (left: Alias, right: Attribute) =>
left.child.semanticEquals(right) && right.name == left.name
case _ => false
}
if (assignExprIdOnly) {
(p.child, p.output.zip(p.child.output))
p.child
} else {
(p, Nil)
p
}
}
}
Expand Down

0 comments on commit 4f25df1

Please sign in to comment.