-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32638][SQL][FOLLOWUP] Move the plan rewriting methods to QueryPlan #29643
Conversation
p -> attrMapping | ||
} else { | ||
// Just passes through unresolved nodes | ||
plan.mapChildren { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that we won't replace attributes in an unresolved plan, which is not sufficient. See the updated test: https://github.com/apache/spark/pull/29643/files#diff-01ecdd038c5c2f53f38118912210fef8R1425
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! In this unresolved plan, there might be other resolved and replaced attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Nice catch.
* The outer plan may have old references and the function below updates the | ||
* outer references to refer to the new attributes. | ||
* | ||
* For example (SQL): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example here is not useful at all. The first sentence already explains the reason very well, while the query plan example is hard to read.
cc @maropu |
* This method also updates all the related references in this plan tree accordingly, in case | ||
* the replaced node has different output expr ID than the old node. | ||
*/ | ||
def rewriteWithPlanMapping( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think we may not have chance to do such complicated replacement in physical plan level, but it is no harm to move this here.
Test build #128267 has finished for PR 29643 at commit
|
// the `oldAttr` must be part of either `plan.references` (so that it can be used to | ||
// replace attributes of the current `plan`) or `plan.outputSet` (so that it can be | ||
// used by those parent plans). | ||
(plan.outputSet ++ plan.references).contains(oldAttr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, we don't check if plan
is resolved here, and plan.outputSet
can cause error.
planMapping: Map[PlanType, PlanType], | ||
canGetOutput: PlanType => Boolean = _ => true): PlanType = { | ||
def internalRewrite(plan: PlanType): (PlanType, Seq[(Attribute, Attribute)]) = { | ||
if (planMapping.contains(plan)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this check cannot correctly handle nested cases in planMapping
; for example,
Project
+- Union
:+- (1) Project
: +- Union
: : :+- (2) Project
: :
: +- Project
:
+- Project
+- ...
If the two nested Project
s above, (1)
and (2)
, are stored in planMapping
, I think only the case (1)
is matched in this condition then the case (2)
is just ignored. So, I rewrote the logic a bit so that plans are replaced in a bottom-up way in the previous PR:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Lines 140 to 144 in 1de272f
val attrMapping = new mutable.ArrayBuffer[(Attribute, Attribute)]() | |
val newChildren = plan.children.map { child => | |
// If not, we'd rewrite child plan recursively until we find the | |
// conflict node or reach the leaf node. | |
val (newChild, childAttrMapping) = rewritePlan(child, rewritePlanMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is this a real-world case? I think this is too complicated if we need to replace nodes in the values of planMapping
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, yea, this is complicated though, I remember the existing tests fail because of this reason. That might be the case below;
SQLQueryTestSuite.sql
org.scalatest.exceptions.TestFailedException: union.sql
Expected "struct<[c1:decimal(11,1),c2:string]>", but got "struct<[]>" Schema did not match for query #3
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2): -- !query
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.errors.package$TreeNodeException
After applying rule org.apache.spark.sql.catalyst.optimizer.RemoveNoopOperators in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
'Union false, false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, this is a valid use case.
e796ea7
to
30e6c4a
Compare
Test build #128282 has finished for PR 29643 at commit
|
Test build #128281 has finished for PR 29643 at commit
|
retest this please |
Test build #128284 has finished for PR 29643 at commit
|
@maropu I think it's too tricky if rewriting plan with |
Test build #128306 has finished for PR 29643 at commit
|
Test build #128307 has finished for PR 29643 at commit
|
Ah, good point. Yea, the current approach in this PR looks okay to me if all the existing tests pass. Btw, could we backport the previous PR and this PR into branch-3.0? The branch also has the issue described in SPARK-32638. |
yea we can backport |
Test build #128343 has finished for PR 29643 at commit
|
github action has passed. |
e -> e | ||
}.unzip | ||
Project(casted._1, plan) -> Project(casted._2, plan) | ||
Alias(Cast(e, dt, Some(SQLConf.get.sessionLocalTimeZone)), e.name)() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity; why we need to set timezone here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise WidenSetOperationTypes
will return invalid attribute mapping (unresolved Alias with unresolved cast) when calling resolveOperatorsUpWithNewOutput
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* with a new one that has different output expr IDs, by updating the attribute references in | ||
* the parent nodes accordingly. | ||
* | ||
* @param rule the function to transform plan nodes, and return new nodes with attributes mapping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question. Why we need to return the attribute mapping from old to new? Can we just detect if the output of new plan is different to old plan, then create the mapping inside transformUpWithNewOutput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because that's too hard. For example, WidenSetOperationTypes
returns attribute mapping according to the replaced children, not itself, because itself may not be resolved yet. While for self-join dedup, we return attribute mapping according to the current node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one question.
Thanks! Merged to master. |
…Plan ### What changes were proposed in this pull request? This is a followup of apache#29485 It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before. ### Why are the changes needed? Code cleanup and generalize. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes apache#29643 from cloud-fan/cleanup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…denSetOperationTypes ### What changes were proposed in this pull request? This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example, ``` CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v); SELECT t.v FROM ( SELECT v FROM t3 UNION ALL SELECT v + v AS v FROM t3 ) t; org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;; !Project [v#1] <------ the reference got missing +- SubqueryAlias t +- Union :- Project [cast(v#1 as decimal(11,0)) AS v#3] : +- Project [v#1] : +- SubqueryAlias t3 : +- SubqueryAlias tbl : +- LocalRelation [v#1] +- Project [v#2] +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2] +- SubqueryAlias t3 +- SubqueryAlias tbl +- LocalRelation [v#1] ``` In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule. This backport for 3.0 comes from #29485 and #29643 ### Why are the changes needed? bugfixes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Closes #29680 from maropu/SPARK-32638-BRANCH3.0. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…denSetOperationTypes ### What changes were proposed in this pull request? This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example, ``` CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v); SELECT t.v FROM ( SELECT v FROM t3 UNION ALL SELECT v + v AS v FROM t3 ) t; org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;; !Project [v#1] <------ the reference got missing +- SubqueryAlias t +- Union :- Project [cast(v#1 as decimal(11,0)) AS v#3] : +- Project [v#1] : +- SubqueryAlias t3 : +- SubqueryAlias tbl : +- LocalRelation [v#1] +- Project [v#2] +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2] +- SubqueryAlias t3 +- SubqueryAlias tbl +- LocalRelation [v#1] ``` In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule. This backport for 3.0 comes from apache#29485 and apache#29643 ### Why are the changes needed? bugfixes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Closes apache#29680 from maropu/SPARK-32638-BRANCH3.0. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…kingTransformsInAnalyzer ### What changes were proposed in this pull request? In #29643, we move the plan rewriting methods to QueryPlan. we need to override transformUpWithNewOutput to add allowInvokingTransformsInAnalyzer because it and resolveOperatorsUpWithNewOutput are called in the analyzer. For example, PaddingAndLengthCheckForCharVarchar could fail query when resolveOperatorsUpWithNewOutput with ```logtalk [info] - char/varchar resolution in sub query *** FAILED *** (367 milliseconds) [info] java.lang.RuntimeException: This method should not be called in the analyzer [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:150) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:146) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:161) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:160) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$updateOuterReferencesInSubquery(QueryPlan.scala:267) ``` ### Why are the changes needed? trivial bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #31013 from yaooqinn/SPARK-33992. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…kingTransformsInAnalyzer ### What changes were proposed in this pull request? In #29643, we move the plan rewriting methods to QueryPlan. we need to override transformUpWithNewOutput to add allowInvokingTransformsInAnalyzer because it and resolveOperatorsUpWithNewOutput are called in the analyzer. For example, PaddingAndLengthCheckForCharVarchar could fail query when resolveOperatorsUpWithNewOutput with ```logtalk [info] - char/varchar resolution in sub query *** FAILED *** (367 milliseconds) [info] java.lang.RuntimeException: This method should not be called in the analyzer [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:150) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:146) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:161) [info] at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:160) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) [info] at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$updateOuterReferencesInSubquery(QueryPlan.scala:267) ``` ### Why are the changes needed? trivial bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #31013 from yaooqinn/SPARK-33992. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit f0ffe0c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
val attrMappingForCurrentPlan = attrMapping.filter { | ||
// The `attrMappingForCurrentPlan` is used to replace the attributes of the | ||
// current `plan`, so the `oldAttr` must be part of `plan.references`. | ||
case (oldAttr, _) => plan.references.contains(oldAttr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we skip if child is not resolved ? Although, it would break the one shot rewrite idea. The reason is, call .references
on an unresovled plan is dangerous that plan may use child.outpuSet
as its references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a base trait for plans that override references
with child.outputSet
? Then we can match this trait here and skip calling reference
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally a plan should determine its reference by its expressions, but not by its child output attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a new base trait sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I send a pr #40154 for it
What changes were proposed in this pull request?
This is a followup of #29485
It moves the plan rewriting methods from
Analyzer
toQueryPlan
, so that it can work withSparkPlan
as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so thatWidenSetOperationTypes
can rewrite the plan in one shot like before.Why are the changes needed?
Code cleanup and generalize.
Does this PR introduce any user-facing change?
no
How was this patch tested?
existing test