Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32638][SQL][FOLLOWUP] Move the plan rewriting methods to QueryPlan #29643

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Sep 3, 2020

What changes were proposed in this pull request?

This is a followup of #29485

It moves the plan rewriting methods from Analyzer to QueryPlan, so that it can work with SparkPlan as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that WidenSetOperationTypes can rewrite the plan in one shot like before.

Why are the changes needed?

Code cleanup and generalize.

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing test

p -> attrMapping
} else {
// Just passes through unresolved nodes
plan.mapChildren {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we won't replace attributes in an unresolved plan, which is not sufficient. See the updated test: https://github.com/apache/spark/pull/29643/files#diff-01ecdd038c5c2f53f38118912210fef8R1425

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! In this unresolved plan, there might be other resolved and replaced attributes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Nice catch.

* The outer plan may have old references and the function below updates the
* outer references to refer to the new attributes.
*
* For example (SQL):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example here is not useful at all. The first sentence already explains the reason very well, while the query plan example is hard to read.

@cloud-fan
Copy link
Contributor Author

cc @maropu

* This method also updates all the related references in this plan tree accordingly, in case
* the replaced node has different output expr ID than the old node.
*/
def rewriteWithPlanMapping(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we may not have chance to do such complicated replacement in physical plan level, but it is no harm to move this here.

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128267 has finished for PR 29643 at commit 76cf567.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// the `oldAttr` must be part of either `plan.references` (so that it can be used to
// replace attributes of the current `plan`) or `plan.outputSet` (so that it can be
// used by those parent plans).
(plan.outputSet ++ plan.references).contains(oldAttr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, we don't check if plan is resolved here, and plan.outputSet can cause error.

@HyukjinKwon HyukjinKwon changed the title [SPARK-32638][SQL][FOLLOWUP] move the plan rewriting methods to QueryPlan [SPARK-32638][SQL][FOLLOWUP] Move the plan rewriting methods to QueryPlan Sep 4, 2020
planMapping: Map[PlanType, PlanType],
canGetOutput: PlanType => Boolean = _ => true): PlanType = {
def internalRewrite(plan: PlanType): (PlanType, Seq[(Attribute, Attribute)]) = {
if (planMapping.contains(plan)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this check cannot correctly handle nested cases in planMapping; for example,

Project
 +- Union
    :+- (1) Project 
    :   +- Union
    :   :  :+- (2) Project
    :   :
    :   +- Project
    :
    +- Project
       +- ...

If the two nested Projects above, (1) and (2), are stored in planMapping, I think only the case (1) is matched in this condition then the case (2) is just ignored. So, I rewrote the logic a bit so that plans are replaced in a bottom-up way in the previous PR:

val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]()
val newChildren = plan.children.map { child =>
// If not, we'd rewrite child plan recursively until we find the
// conflict node or reach the leaf node.
val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is this a real-world case? I think this is too complicated if we need to replace nodes in the values of planMapping.

Copy link
Member

@maropu maropu Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, yea, this is complicated though, I remember the existing tests fail because of this reason. That might be the case below;

SQLQueryTestSuite.sql
org.scalatest.exceptions.TestFailedException: union.sql
Expected "struct<[c1:decimal(11,1),c2:string]>", but got "struct<[]>" Schema did not match for query #3
SELECT *
FROM   (SELECT * FROM t1
        UNION ALL
        SELECT * FROM t2
        UNION ALL
        SELECT * FROM t2): -- !query
SELECT *
FROM   (SELECT * FROM t1
        UNION ALL
        SELECT * FROM t2
        UNION ALL
        SELECT * FROM t2)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.errors.package$TreeNodeException
After applying rule org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
'Union false, false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, this is a valid use case.

@cloud-fan cloud-fan force-pushed the cleanup branch 2 times, most recently from e796ea7 to 30e6c4a Compare September 4, 2020 06:48
@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128282 has finished for PR 29643 at commit 30e6c4a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128281 has finished for PR 29643 at commit e796ea7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128284 has finished for PR 29643 at commit 30e6c4a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

@maropu I think it's too tricky if rewriting plan with planMapping is recursive. The reason is that, WidenSetOperationTypes does the work by traversing the plan tree twice. I made the plan rewriting method more general, so that WidenSetOperationTypes only need to traverse the plan tree once, and now the logic is simpler. Please take a look, thanks!

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128306 has finished for PR 29643 at commit 5cce482.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128307 has finished for PR 29643 at commit be7e864.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 7, 2020

The reason is that, WidenSetOperationTypes does the work by traversing the plan tree twice.

Ah, good point. Yea, the current approach in this PR looks okay to me if all the existing tests pass. Btw, could we backport the previous PR and this PR into branch-3.0? The branch also has the issue described in SPARK-32638.

@cloud-fan
Copy link
Contributor Author

yea we can backport

@SparkQA
Copy link

SparkQA commented Sep 7, 2020

Test build #128343 has finished for PR 29643 at commit 0857791.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

github action has passed.

e -> e
}.unzip
Project(casted._1, plan) -> Project(casted._2, plan)
Alias(Cast(e, dt, Some(SQLConf.get.sessionLocalTimeZone)), e.name)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity; why we need to set timezone here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise WidenSetOperationTypes will return invalid attribute mapping (unresolved Alias with unresolved cast) when calling resolveOperatorsUpWithNewOutput

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM cc: @viirya @Ngone51

* with a new one that has different output expr IDs, by updating the attribute references in
* the parent nodes accordingly.
*
* @param rule the function to transform plan nodes, and return new nodes with attributes mapping
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Why we need to return the attribute mapping from old to new? Can we just detect if the output of new plan is different to old plan, then create the mapping inside transformUpWithNewOutput?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that's too hard. For example, WidenSetOperationTypes returns attribute mapping according to the replaced children, not itself, because itself may not be resolved yet. While for self-join dedup, we return attribute mapping according to the current node.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one question.

@maropu maropu closed this in 117a6f1 Sep 8, 2020
@maropu
Copy link
Member

maropu commented Sep 8, 2020

Thanks! Merged to master.

maropu pushed a commit to maropu/spark that referenced this pull request Sep 8, 2020
…Plan

### What changes were proposed in this pull request?

This is a followup of apache#29485

It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before.

### Why are the changes needed?

Code cleanup and generalize.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test

Closes apache#29643 from cloud-fan/cleanup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu added a commit that referenced this pull request Sep 8, 2020
…denSetOperationTypes

### What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.

This backport for 3.0 comes from #29485 and #29643

### Why are the changes needed?

bugfixes

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes #29680 from maropu/SPARK-32638-BRANCH3.0.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…denSetOperationTypes

### What changes were proposed in this pull request?

This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example,
```
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
SELECT t.v FROM (
  SELECT v FROM t3
  UNION ALL
  SELECT v + v AS v FROM t3
) t;

org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1]  <------ the reference got missing
+- SubqueryAlias t
   +- Union
      :- Project [cast(v#1 as decimal(11,0)) AS v#3]
      :  +- Project [v#1]
      :     +- SubqueryAlias t3
      :        +- SubqueryAlias tbl
      :           +- LocalRelation [v#1]
      +- Project [v#2]
         +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
            +- SubqueryAlias t3
               +- SubqueryAlias tbl
                  +- LocalRelation [v#1]
```
In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule.

This backport for 3.0 comes from apache#29485 and apache#29643

### Why are the changes needed?

bugfixes

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests

Closes apache#29680 from maropu/SPARK-32638-BRANCH3.0.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
cloud-fan pushed a commit that referenced this pull request Jan 5, 2021
…kingTransformsInAnalyzer

### What changes were proposed in this pull request?

In #29643, we move  the plan rewriting methods to QueryPlan. we need to override transformUpWithNewOutput to add allowInvokingTransformsInAnalyzer
 because it and resolveOperatorsUpWithNewOutput are called in the analyzer.
For example,

PaddingAndLengthCheckForCharVarchar could fail query when resolveOperatorsUpWithNewOutput
with
```logtalk
[info] - char/varchar resolution in sub query  *** FAILED *** (367 milliseconds)
[info]   java.lang.RuntimeException: This method should not be called in the analyzer
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:150)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:146)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:161)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:160)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$updateOuterReferencesInSubquery(QueryPlan.scala:267)
```
### Why are the changes needed?

trivial bugfix
### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

new tests

Closes #31013 from yaooqinn/SPARK-33992.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Jan 5, 2021
…kingTransformsInAnalyzer

### What changes were proposed in this pull request?

In #29643, we move  the plan rewriting methods to QueryPlan. we need to override transformUpWithNewOutput to add allowInvokingTransformsInAnalyzer
 because it and resolveOperatorsUpWithNewOutput are called in the analyzer.
For example,

PaddingAndLengthCheckForCharVarchar could fail query when resolveOperatorsUpWithNewOutput
with
```logtalk
[info] - char/varchar resolution in sub query  *** FAILED *** (367 milliseconds)
[info]   java.lang.RuntimeException: This method should not be called in the analyzer
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:150)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:146)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:161)
[info]   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:160)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
[info]   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$updateOuterReferencesInSubquery(QueryPlan.scala:267)
```
### Why are the changes needed?

trivial bugfix
### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

new tests

Closes #31013 from yaooqinn/SPARK-33992.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f0ffe0c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val attrMappingForCurrentPlan = attrMapping.filter {
// The `attrMappingForCurrentPlan` is used to replace the attributes of the
// current `plan`, so the `oldAttr` must be part of `plan.references`.
case (oldAttr, _) => plan.references.contains(oldAttr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we skip if child is not resolved ? Although, it would break the one shot rewrite idea. The reason is, call .references on an unresovled plan is dangerous that plan may use child.outpuSet as its references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a base trait for plans that override references with child.outputSet? Then we can match this trait here and skip calling reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally a plan should determine its reference by its expressions, but not by its child output attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new base trait sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I send a pr #40154 for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants