-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans #29585
Conversation
} | ||
} | ||
|
||
def hasUniqueIdsForAttributes(plan: LogicalPlan): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comment for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
@@ -203,3 +203,23 @@ abstract class BinaryNode extends LogicalPlan { | |||
abstract class OrderPreservingUnaryNode extends UnaryNode { | |||
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering | |||
} | |||
|
|||
trait LogicalPlanIntegrity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If just for hasUniqueIdsForAttributes
, a trait
seems too much. Maybe just as an object LogicalPlanIntegrity
?
Test build #128045 has finished for PR 29585 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we may find some broken cases, but looks it is better than I thought. This passes unit tests.
5dab65a
to
9d46a60
Compare
p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute]) | ||
} | ||
val groupedAttrsByExprId = allOutputAttrs | ||
.flatten.groupBy(_.exprId).values.map(_.distinct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we actually check? name, data type and nullability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since references are canonicalized in advance, the current code just check a data type and a exprId:
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala
Lines 44 to 45 in a1e459e
case a: AttributeReference => | |
AttributeReference("none", a.dataType.asNullable)(exprId = a.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of the hacks in FoldablePropagation
.
We do have attributes with the same exprId but are actually different attributes, but I don't think there is an easy way to detect it automatically. For example, a + 1 as a
, if I reuse the exprId of attribute a
in the alias, how can we detect it? The name, data type and nullability are all the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good point. Current approach only detects it once the data type is changed. But we generally reuse exprId on another attribute with same data type.
p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute]) | ||
} | ||
val groupedAttrsByExprId = allOutputAttrs | ||
.flatten.groupBy(_.exprId).values.map(_.distinct) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we actually check? name, data type and nullability?
@@ -203,3 +203,28 @@ abstract class BinaryNode extends LogicalPlan { | |||
abstract class OrderPreservingUnaryNode extends UnaryNode { | |||
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering | |||
} | |||
|
|||
object LogicalPlanIntegrity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check the physical plan as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, RuleExecutor
is not used for transforming physical plans now:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
Lines 363 to 367 in a1e459e
val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) => | |
val result = rule.apply(sp) | |
planChangeLogger.logRule(rule.ruleName, sp, result) | |
result | |
} |
Is it okay to replace the code above with RuleExecutor
in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we in general won't change output in physical plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Nice suggestion. Looked around the related code, I think it is not easy to catch all the ill-formed case as you said, but IMO most common patterns to cause duplicate ExprId
s are a reuse of the ExprId
s of references, e.g., a#1 + 1 as a#1
as you pointed above. So, how about just checking this reuse pattern in the plan integrity check? https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R238
Test build #128057 has finished for PR 29585 at commit
|
Test build #128059 has finished for PR 29585 at commit
|
* @return a rewritten plan and updated references related to a root node of | ||
* the given `plan` for rewriting it. | ||
*/ | ||
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan, LogicalPlan]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: To update ExprId
in parent plan nodes, this method just was copied from https://github.com/apache/spark/pull/29485/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R137
if (keyExprIds.contains(attr.exprId)) { | ||
attr | ||
} else { | ||
Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: Found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R238
Alias( | ||
If(IsNull(alwaysTrueRef), | ||
resultWithZeroTups.get, | ||
aggValRef), origOutput.name)(exprId = origOutput.exprId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: Found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R238
(IsNull(alwaysTrueRef), resultWithZeroTups.get), | ||
(Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))), | ||
aggValRef), | ||
origOutput.name)(exprId = origOutput.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: Found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R238
@@ -266,7 +266,7 @@ case class Union( | |||
firstAttr.withNullability(nullable) | |||
} else { | |||
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)( | |||
firstAttr.exprId, firstAttr.qualifier) | |||
NamedExpression.newExprId, firstAttr.qualifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: Found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R224
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only place found by the check https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R224 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the bug in StreamSuite
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is risky. We are generating new expr ID in def output
, which means the output of this plan is changing everytime you call output
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe Union
should have a val output
in the constructor, like Generate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed the case. Yea, I'll check it around to fix the issue.
@@ -1268,7 +1267,7 @@ class StreamSuite extends StreamTest { | |||
} | |||
|
|||
abstract class FakeSource extends StreamSourceProvider { | |||
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) | |||
private val fakeSchema = StructType(StructField("a", LongType) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: This ill-formed definition was found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R224
Test build #128215 has finished for PR 29585 at commit
|
plan.map { p => | ||
p.expressions.filter(_.resolved).forall { e => | ||
val namedExprs = e.collect { | ||
case ne: NamedExpression if !ne.isInstanceOf[LeafExpression] => ne |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do a simple check: find all the Alias
in this plan, and check the expr ID of Alias
doesn't exist in the inputSet
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, nice suggestion. I've pushed the commit to follow it.
759dda6
to
a797d66
Compare
p -> attrMapping | ||
} else { | ||
// Just passes through unresolved nodes | ||
plan.mapChildren { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: I applied this band-aid-fix to solve the issue pointed out in #29643 (comment), but this issue should be fixed in #29643. So, After it merged, I will remove this update.
Test build #128339 has finished for PR 29585 at commit
|
e.collect { | ||
// Only accepts the case of aliases renaming foldable expressions, e.g., | ||
// `FoldablePropagation` generates this renaming pattern. | ||
case a: Alias if !a.child.foldable => a.exprId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this filter for avoindg the failure below;
// SQLWindowFunctionSuite"
[info] - SPARK-7595: Window will cause resolve failed with self join *** FAILED *** (264 milliseconds)
[info] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
[info] GlobalLimit 1
[info] +- LocalLimit 1
[info] +- Sort [0 ASC NULLS FIRST], true
[info] +- Project [0 AS key#495, cnt_val#500L] --- (2)
[info] +- Join Cross, (0 = 0)
[info] :- Project [0 AS key#495] --- (1)
[info] : +- Window [0]
[info] : +- Project [0 AS key#495, 1 AS value#496]
[info] : +- OneRowRelation
[info] +- Project [0 AS key#501, cnt_val#500L]
[info] +- Window [count(1) windowspecdefinition(0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt_val#500L], [0]
[info] +- Project [0 AS key#501, 1 AS value#502]
[info] +- OneRowRelation
I cannot find a way to fix this issue in FoldablePropagation
because I think we cannot simply create plan remapping rules for Analyzer.rewritePlan
.
Let's say that we create simple two rules below;
(1) Project [0 AS key#495] ==> Project [0 AS key#501]
(2) Project [0 AS key#495, cnt_val#500L] ==> Project [0 AS key#502, cnt_val#500L]
In this case, attrMapping
in Analyzer.rewritePlan
has duplicate entries (key#495 -> {#501, #502})
in the Sort
node and the assertion below fails:
[info] java.lang.AssertionError: assertion failed: Found duplicate rewrite attributes
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Lines 162 to 164 in 04f7f6d
assert(!attrMapping.groupBy(_._1.exprId) | |
.exists(_._2.map(_._2.exprId).distinct.length > 1), | |
"Found duplicate rewrite attributes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we just check
case a: Alias => assert(!a.references.contains(a.toAttribute))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this comment... your suggestion is like this (please check the latest commit)?
def checkIfSameExprIdNotReused(plan: LogicalPlan): Boolean = {
plan.collect { case p if p.resolved =>
p.expressions.forall {
case a: Alias => !a.references.contains(a.toAttribute)
case _ => true
}
}.forall(identity)
}
a797d66
to
acb2b80
Compare
Test build #128357 has finished for PR 29585 at commit
|
acb2b80
to
9bcc4e0
Compare
if (missingExpr.nonEmpty) { | ||
extractedExprBuffer += ne | ||
} | ||
// alias will be cleaned in the rule CleanupAliases | ||
ne | ||
attr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated this code to fix the test failure below;
[info] - grouping/grouping_id inside window function *** FAILED *** (75 milliseconds)
[info] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions in batch Resolution, the structural integrity of the plan is broken., tree:
[info] Project [course#427, year#428, sum(earnings)#421, grouping_id(course, year)#422L, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423]
[info] +- Project [course#427, year#428, sum(earnings)#421, grouping_id(course, year)#422L, _w0#434, grouping_id(course, year)#430L, _w2#438, spark_grouping_id#426L, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423]
[info] +- Window [rank(_w0#434) windowspecdefinition(spark_grouping_id#426L AS grouping_id(course, year)#430L, _w2#438 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423], [spark_grouping_id#426L AS grouping_id(course, year)#430L], [_w2#438 ASC NULLS FIRST]
[info] +- Aggregate [course#427, year#428, spark_grouping_id#426L], [course#427, year#428, sum(earnings#258) AS sum(earnings)#421, spark_grouping_id#426L AS grouping_id(course, year)#429L AS grouping_id(course, year)#422L, sum(earnings#258) AS _w0#434, spark_grouping_id#426L AS grouping_id(course, year)#430L, sum(earnings#258) AS _w2#438, spark_grouping_id#426L]
[info] +- Expand [List(course#256, year#257, earnings#258, course#424, year#425, 0), List(course#256, year#257, earnings#258, course#424, null, 1), List(course#256, year#257, earnings#258, null, year#425, 2), List(course#256, year#257, earnings#258, null, null, 3)], [course#256, year#257, earnings#258, course#427, year#428, spark_grouping_id#426L]
[info] +- Project [course#256, year#257, earnings#258, course#256 AS course#424, year#257 AS year#425]
[info] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).course, true, false) AS course#256, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).year AS year#257, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).earnings AS earnings#258]
[info] +- ExternalRDD [obj#255]
[info] at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:235)
[info] at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
The named expression spark_grouping_id#426L AS grouping_id(course, year)#430L
was duplicated in the Window
and Aggregate
nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give a simpler example to explain this change? It seems like if the project list is rank(a + b as c) over ...
, we should keep attribute c
in the window operator, instead of a + b as c
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I notice that we don't need this update now because I made the checkIfSameExprIdNotReused
integrity check looser in the commit (this update comes from the comment: #29585 (comment)).
Why we need this update in the previous check is as follows;
// I used the current master:
sql("SET spark.sql.planChangeLog.level=WARN")
scala> val df = Seq(("a", 1)).toDF("k", "v")
scala> val aggDf = df.cube("k").agg(sum("v"), rank().over(Window.partitionBy(grouping_id("k")).orderBy(sum("v"))))
20/09/17 23:32:58 WARN PlanChangeLogger:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions ===
!'Aggregate [k#191, spark_grouping_id#190L], [k#191, sum(cast(v#178 as bigint)) AS sum(v)#187L, rank(sum(cast(v#178 as bigint))) windowspecdefinition(spark_grouping_id#190L AS grouping_id(k)#192L, sum(cast(v#178 as bigint)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188] Project [k#191, sum(v)#187L, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188]
!+- Expand [List(k#177, v#178, k#189, 0), List(k#177, v#178, null, 1)], [k#177, v#178, k#191, spark_grouping_id#190L] +- Project [k#191, sum(v)#187L, _w0#196L, grouping_id(k)#192L, _w2#200L, spark_grouping_id#190L, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188]
! +- Project [k#177, v#178, k#177 AS k#189] +- Window [rank(_w0#196L) windowspecdefinition(spark_grouping_id#190L AS grouping_id(k)#192L, _w2#200L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188], [spark_grouping_id#190L AS grouping_id(k)#192L], [_w2#200L ASC NULLS FIRST]
! +- Project [_1#172 AS k#177, _2#173 AS v#178] +- Aggregate [k#191, spark_grouping_id#190L], [k#191, sum(cast(v#178 as bigint)) AS sum(v)#187L, sum(cast(v#178 as bigint)) AS _w0#196L, spark_grouping_id#190L AS grouping_id(k)#192L, sum(cast(v#178 as bigint)) AS _w2#200L, spark_grouping_id#190L]
! +- LocalRelation [_1#172, _2#173] +- Expand [List(k#177, v#178, k#189, 0), List(k#177, v#178, null, 1)], [k#177, v#178, k#191, spark_grouping_id#190L]
! +- Project [k#177, v#178, k#177 AS k#189]
! +- Project [_1#172 AS k#177, _2#173 AS v#178]
! +- LocalRelation [_1#172, _2#173]
ExtractWindowExpressions
replicates an alias spark_grouping_id#190L AS grouping_id(k)#192L
(Alias(AttributeReference(spark_grouping_id#190L), "grouping_id(k)"#192L
) in both Window
and the child Aggregate
. The parent Window
has the exirId=192L
in the inputSet
, so the previous integrity check fails.
Anyway, I will revert this change.
Test build #128372 has finished for PR 29585 at commit
|
9bcc4e0
to
3f21635
Compare
Test build #128391 has started for PR 29585 at commit |
@cloud-fan @viirya could you check this again? |
Kubernetes integration test starting |
Kubernetes integration test status success |
val t = LocalRelation('a.int, 'b.int) | ||
assert(hasUniqueExprIdsForOutput(OutputTestPlan(t, t.output))) | ||
assert(!hasUniqueExprIdsForOutput(OutputTestPlan(t, t.output.zipWithIndex.map { | ||
case (a, i) => AttributeReference(s"c$i", LongType)(a.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does it break the check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because the the same a.exprId
value has the two different types: int and long. So, the check fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a.exprId
is from t.output
, which should be two different expr ids?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same case with Union
;
OutputTestPlan(output=[c0#1(long), c1#2(long)])
+- LogicalRelation [a#1(int), b#2(int)]
Then, the integrity check fails as follows;
exprId=1 => int, long
exprId=2 => int, long
In this case, we need to reassign these IDs like this;
OutputTestPlan(output=[c0#3(long), c1#4(long)])
+- LogicalRelation [a#1(int), b#2(int)]
exprId=1 => int
exprId=2 => int
exprId=3 => long
exprId=4 => long
Kubernetes integration test starting |
Kubernetes integration test status success |
Thanks for the review, @cloud-fan ! |
Test build #129225 has finished for PR 29585 at commit
|
Test build #129236 has finished for PR 29585 at commit
|
Thanks! Merged to master. |
Hi, @maropu . Could you check Jenkins jobs? After this PR, Jenkins jobs execution time seems to increase a lot (about 30 minutes)? |
Oh, I see and I didn't notice that. Thanks, @dongjoon-hyun ! I will check it now. If its better to revert it first, please let me know. |
Thank you so much, @maropu . Since it's only |
…ive plan changes ### What changes were proposed in this pull request? (This is a followup PR of #29585) The PR modified `RuleExecutor#isPlanIntegral` code for checking if a plan has globally-unique attribute IDs, but this check made Jenkins maven test jobs much longer (See [the Dongjoon comment](#29585 (comment)) and thanks, dongjoon-hyun !). To recover running time for the Jenkins tests, this PR intends to update the code to run plan integrity check only for effective plans. ### Why are the changes needed? To recover running time for Jenkins tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29928 from maropu/PR29585-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…ive plan changes ### What changes were proposed in this pull request? (This is a followup PR of #29585) The PR modified `RuleExecutor#isPlanIntegral` code for checking if a plan has globally-unique attribute IDs, but this check made Jenkins maven test jobs much longer (See [the Dongjoon comment](apache/spark#29585 (comment)) and thanks, dongjoon-hyun !). To recover running time for the Jenkins tests, this PR intends to update the code to run plan integrity check only for effective plans. ### Why are the changes needed? To recover running time for Jenkins tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29928 from maropu/PR29585-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…eplace exprIds in a bottom-up manner ### What changes were proposed in this pull request? This PR intends to refactor code in `RewriteCorrelatedScalarSubquery` for replacing `ExprId`s in a bottom-up manner instead of doing in a top-down one. This PR comes from the talk with cloud-fan in #29585 (comment). ### Why are the changes needed? To improve code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29913 from maropu/RefactorRewriteCorrelatedScalarSubquery. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…eplace exprIds in a bottom-up manner ### What changes were proposed in this pull request? This PR intends to refactor code in `RewriteCorrelatedScalarSubquery` for replacing `ExprId`s in a bottom-up manner instead of doing in a top-down one. This PR comes from the talk with cloud-fan in apache/spark#29585 (comment). ### Why are the changes needed? To improve code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #29913 from maropu/RefactorRewriteCorrelatedScalarSubquery. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
Some plan transformations (e.g.,
RemoveNoopOperators
) implicitly assume the sameExprId
refers to the unique attribute. But,RuleExecutor
does not check this integrity between logical plan transformations. So, this PR intends to add this check inisPlanIntegral
ofAnalyzer
/Optimizer
.This PR comes from the talk with @cloud-fan @viirya in #29485 (comment)
Why are the changes needed?
For better logical plan integrity checking.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.