Skip to content

Commit

Permalink
[SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput should only be …
Browse files Browse the repository at this point in the history
…used with new outputs

This resubmits apache#42408 to 3.5

### What changes were proposed in this pull request?

This is a followup of apache#41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`.

This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan.

### Why are the changes needed?

fix potential bugs

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes apache#42449 from cloud-fan/minor.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Aug 11, 2023
1 parent b11022f commit dae1314
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
def check(plan: LogicalPlan): Unit = plan.foreach { node =>
node match {
case metrics @ CollectMetrics(name, _, _) =>
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics)
val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized)
metricsMap.get(name) match {
case Some(other) =>
val simplifiedOther = simplifyPlanForCollectedMetrics(other)
val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized)
// Exact duplicates are allowed. They can be the result
// of a CTE that is used multiple times or a self join.
if (!simplifiedMetrics.sameResult(simplifiedOther)) {
if (simplifiedMetrics != simplifiedOther) {
failAnalysis(
errorClass = "DUPLICATED_METRICS_NAME",
messageParameters = Map("metricName" -> name))
Expand All @@ -1102,17 +1102,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
* duplicates metric definition.
*/
private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = {
plan.transformUpWithNewOutput {
plan.resolveOperators {
case p: Project if p.projectList.size == p.child.output.size =>
val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
case (left: Alias, right: Attribute) =>
left.child.semanticEquals(right) && right.name == left.name
val assignExprIdOnly = p.projectList.zipWithIndex.forall {
case (Alias(attr: AttributeReference, _), index) =>
// The input plan of this method is already canonicalized. The attribute id becomes the
// ordinal of this attribute in the child outputs. So an alias-only Project means the
// the id of the aliased attribute is the same as its index in the project list.
attr.exprId.id == index
case _ => false
}
if (assignExprIdOnly) {
(p.child, p.output.zip(p.child.output))
p.child
} else {
(p, Nil)
p
}
}
}
Expand Down

0 comments on commit dae1314

Please sign in to comment.