-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32638][SQL] Corrects references when adding aliases in WidenSetOperationTypes #29485
Conversation
Test build #127678 has finished for PR 29485 at commit
|
retest this please |
Test build #127679 has finished for PR 29485 at commit
|
val (casted, newExprIds) = plan.output.zip(targetTypes).map { | ||
case (e, dt) if e.dataType != dt => | ||
val alias = Alias(Cast(e, dt), e.name)() | ||
(alias, Some(e.exprId -> (alias.exprId, dt))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just store the attribute of the Alias
, i.e., alias.toAttribute
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I'll update.
Test build #127710 has finished for PR 29485 at commit
|
Looks right to me too |
Thanks for the reviews, @viirya and @HyukjinKwon ! also cc: @cloud-fan |
} | ||
|
||
// Re-maps existing references to the new ones (exprId and dataType) | ||
// for aliases added when widening columns' data types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another common way to solve this issue is to create an Alias
with the existing exprId, so that we don't need to rewrite the parent nodes.
I think it's safer than rewriting the parent nodes. We rewrite parent nodes in ResolveReferences.dedupRight
, we hit bugs and end up with a complicated solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it too. But I'm not sure if duplicate exprId is okay. If this is common way, it sounds simple and safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You meant re-alias with exprId=1 in the example above like this?
org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;;
!Project [v#1] <------ the reference got missing
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#3] <----- !!!! re-alias with exprId=#1 ?!!!!!
: +- Project [v#1]
: +- SubqueryAlias t3
: +- SubqueryAlias tbl
: +- LocalRelation [v#1]
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
+- SubqueryAlias t3
+- SubqueryAlias tbl
+- LocalRelation [v#1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, like re-alias with exprId=1
Just did a quick search, rule TimeWindowing
, Aggregation
did it. AFAIK it's common when need to change the plan in the middle and don't want to affect the parent nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I tried it first, but RemoveNoopOperators
will remove a Project
with a rewritten alias https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L480
Because it assumes projects having common exprIds have the semantic-equally output. There may be a way to avoid the case and I'll check TimeWindowing
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of TimeWindowing
, it seems RemoveNoopOperators
cannot remove a project having the same exprIds because the output numbers are different before/after transformation (L3622).
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Lines 3610 to 3623 in 11c6a23
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)( | |
exprId = windowAttr.exprId, explicitMetadata = Some(metadata)) | |
val replacedPlan = p transformExpressions { | |
case t: TimeWindow => windowAttr | |
} | |
// For backwards compatibility we add a filter to filter out nulls | |
val filterExpr = IsNotNull(window.timeColumn) | |
replacedPlan.withNewChildren( | |
Filter(filterExpr, | |
Project(windowStruct +: child.output, child)) :: Nil) | |
} else { |
Looked around the related code though, I couldn't find a solution to avoid the case. Any suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't RemoveNoopOperators
check if the outputs are semantically equal? Is cast(v#1 as decimal(11,0)) AS v#1
semantically equal to v#1
? Canonicalize
should keep cast
and alias
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm I see. I'll check that approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, we still need to update parent nodes even if we re-alias it. For example, in the example of the PR description;
!Project [v#1] <------ this project already has `AttributeReference(v, decimal(10, 0))#1`, so
we need to update the data type, too
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#1] <----- re-alias with exprId=#1
: +- Project [v#1] <----- dataType=decimal(10, 0)
: +- SubqueryAlias t3
: +- SubqueryAlias tbl
: +- LocalRelation [v#1]
+- Project [v#2] <----- dataType=decimal(11, 0)
+- ...
the parent Project
has a attribute reference with exprId=1
and dataType=decimal(10, 0)
. So, IIUC we need to update the data type, too. If we don't update it, plan integrity can break, e.g., in PushProjectionThroughUnion
.
-- !query
CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v)
-- !query schema
struct<>
-- !query output
-- !query
SELECT t.v FROM (
SELECT v FROM t3
UNION ALL
SELECT v + v AS v FROM t3
) t
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.errors.package$TreeNodeException
After applying rule org.apache.spark.sql.catalyst.optimizer.PushProjectionThroughUnion in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
'Union false, false
:- Project [v#183]
: +- Project [cast(v#183 as decimal(11,0)) AS v#183]
: +- Project [v#183]
: +- LocalRelation [v#183]
+- Project [v#184]
+- Project [v#184]
+- Project [CheckOverflow((promote_precision(cast(v#183 as decimal(11,0))) + promote_precision(cast(v#183 as decimal(11,0)))), DecimalType(11,0), true) AS v#184]
+- LocalRelation [v#183]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird. For all the type coercion rules, they extend TypeCoercionRule
, which will update parent nodes with data type change automatically.
@@ -328,27 +328,46 @@ object TypeCoercion { | |||
*/ | |||
object WidenSetOperationTypes extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this rule should extend TypeCoercionRule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. I missed that. I'll replace it with TypeCoercionRule
then check the re-alias approach again.
case Project(projList, child) if projList.length == child.output.length && | ||
projList.zip(child.output).forall { case (e1, e2) => e1.semanticEquals(e2) } => child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for Alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, yes. I modified the code to handle the case below;
:- Project [v#183]
: +- Project [cast(v#183 as decimal(11,0)) AS v#183]
: +- Project [v#183]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you a comment for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure when we add RemoveNoopOperators
if we consider this case. But seems this change didn't cause any test failure.
Test build #127893 has finished for PR 29485 at commit
|
@@ -85,14 +84,19 @@ case class RemoveRedundantProjects(conf: SQLConf) extends Rule[SparkPlan] { | |||
// to convert the rows to UnsafeRow. See DataSourceV2Strategy for more details. | |||
case d: DataSourceV2ScanExecBase if !d.supportsColumnar => false | |||
case _ => | |||
def semanticEquals(exprs1: Seq[Expression], exprs2: Seq[Expression]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also a comment here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we agree that aliasing an existing exprId shouldn't be removed by RemoveNoopOperators
or RemoveRedundantProjects
, this change is okay.
Thanks for the review, @viirya ! cc: @cloud-fan |
It's a bit frustrating to see the issue in Maybe we should go with the other direction: create new attributes when necessary, and rewrite the parent nodes. We need to follow I'll check other places that create |
NVM, @cloud-fan. okay, I'll update it to follow |
* @return a rewritten plan and updated references related to a root node of | ||
* the given `plan` for rewriting it. | ||
*/ | ||
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan, LogicalPlan]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote the existing rewritePlan
a bit, then just reused it for WidenSetOperationTypes
. Does this udpate satisfy your intention? #29485 (comment)
Test build #127971 has finished for PR 29485 at commit
|
retest this please |
Test build #128216 has finished for PR 29485 at commit
|
case (e, _) => | ||
e -> e | ||
}.unzip | ||
Project(casted._1, plan) -> Project(casted._2, plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are we doing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generates a rewrite map used for Analyzer.rewritePlan
. The rewritePlan
assumes a plan structure is the same before/after plan rewriting, so this WidenSetOperationTypes
rule does two-phase transformation now as follows;
### Input Plan (Query described in the PR description) ###
Project [v#1]
+- SubqueryAlias t
+- Union
:+- Project [v#1]
: +- SubqueryAlias t3
: ...
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
+- SubqueryAlias t3
...
### Phase-1 (Adds Project, but not update ExprId) ###
Project [v#1]
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#1] <--- !!!Adds Project to widen a type!!!
: +- Project [v#1]
: +- SubqueryAlias t3
: ...
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
...
### Phase-2 ###
// Analyzer.rewritePlan updates ExprIds based on a rewrite map:
// `Project [cast(v#1 as decimal(11,0)) AS v#1]` => Project [cast(v#1 as decimal(11,0)) AS v#3]
Project [v#3] <--- !!!Updates ExprId!!!
+- SubqueryAlias t
+- Union
:- Project [cast(v#1 as decimal(11,0)) AS v#3] <--- !!!Updates ExprId!!!
: +- Project [v#1]
: +- SubqueryAlias t3
: ...
+- Project [v#2]
+- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2]
...
thanks, merging to master! |
Thanks for the reviews, @cloud-fan @viirya ! |
…Plan ### What changes were proposed in this pull request? This is a followup of #29485 It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before. ### Why are the changes needed? Code cleanup and generalize. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes #29643 from cloud-fan/cleanup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…Plan ### What changes were proposed in this pull request? This is a followup of apache#29485 It moves the plan rewriting methods from `Analyzer` to `QueryPlan`, so that it can work with `SparkPlan` as well. This PR also does an improvement to support a corner case (The attribute to be replace stays together with an unresolved attribute), and make it more general, so that `WidenSetOperationTypes` can rewrite the plan in one shot like before. ### Why are the changes needed? Code cleanup and generalize. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test Closes apache#29643 from cloud-fan/cleanup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…denSetOperationTypes ### What changes were proposed in this pull request? This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example, ``` CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v); SELECT t.v FROM ( SELECT v FROM t3 UNION ALL SELECT v + v AS v FROM t3 ) t; org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;; !Project [v#1] <------ the reference got missing +- SubqueryAlias t +- Union :- Project [cast(v#1 as decimal(11,0)) AS v#3] : +- Project [v#1] : +- SubqueryAlias t3 : +- SubqueryAlias tbl : +- LocalRelation [v#1] +- Project [v#2] +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2] +- SubqueryAlias t3 +- SubqueryAlias tbl +- LocalRelation [v#1] ``` In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule. This backport for 3.0 comes from #29485 and #29643 ### Why are the changes needed? bugfixes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Closes #29680 from maropu/SPARK-32638-BRANCH3.0. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…ibute in logical plans ### What changes were proposed in this pull request? Some plan transformations (e.g., `RemoveNoopOperators`) implicitly assume the same `ExprId` refers to the unique attribute. But, `RuleExecutor` does not check this integrity between logical plan transformations. So, this PR intends to add this check in `isPlanIntegral` of `Analyzer`/`Optimizer`. This PR comes from the talk with cloud-fan viirya in #29485 (comment) ### Why are the changes needed? For better logical plan integrity checking. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29585 from maropu/PlanIntegrityTest. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…ibute in logical plans ### What changes were proposed in this pull request? Some plan transformations (e.g., `RemoveNoopOperators`) implicitly assume the same `ExprId` refers to the unique attribute. But, `RuleExecutor` does not check this integrity between logical plan transformations. So, this PR intends to add this check in `isPlanIntegral` of `Analyzer`/`Optimizer`. This PR comes from the talk with cloud-fan viirya in apache/spark#29485 (comment) ### Why are the changes needed? For better logical plan integrity checking. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29585 from maropu/PlanIntegrityTest. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…denSetOperationTypes ### What changes were proposed in this pull request? This PR intends to fix a bug where references can be missing when adding aliases to widen data types in `WidenSetOperationTypes`. For example, ``` CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v); SELECT t.v FROM ( SELECT v FROM t3 UNION ALL SELECT v + v AS v FROM t3 ) t; org.apache.spark.sql.AnalysisException: Resolved attribute(s) v#1 missing from v#3 in operator !Project [v#1]. Attribute(s) with the same name appear in the operation: v. Please check if the right attribute(s) are used.;; !Project [v#1] <------ the reference got missing +- SubqueryAlias t +- Union :- Project [cast(v#1 as decimal(11,0)) AS v#3] : +- Project [v#1] : +- SubqueryAlias t3 : +- SubqueryAlias tbl : +- LocalRelation [v#1] +- Project [v#2] +- Project [CheckOverflow((promote_precision(cast(v#1 as decimal(11,0))) + promote_precision(cast(v#1 as decimal(11,0)))), DecimalType(11,0), true) AS v#2] +- SubqueryAlias t3 +- SubqueryAlias tbl +- LocalRelation [v#1] ``` In the case, `WidenSetOperationTypes` added the alias `cast(v#1 as decimal(11,0)) AS v#3`, then the reference in the top `Project` got missing. This PR correct the reference (`exprId` and widen `dataType`) after adding aliases in the rule. This backport for 3.0 comes from apache#29485 and apache#29643 ### Why are the changes needed? bugfixes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Closes apache#29680 from maropu/SPARK-32638-BRANCH3.0. Lead-authored-by: Wenchen Fan <wenchen@databricks.com> Co-authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
This PR intends to fix a bug where references can be missing when adding aliases to widen data types in
WidenSetOperationTypes
. For example,In the case,
WidenSetOperationTypes
added the aliascast(v#1 as decimal(11,0)) AS v#3
, then the reference in the topProject
got missing. This PR correct the reference (exprId
and widendataType
) after adding aliases in the rule.Why are the changes needed?
bugfixes
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests