Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans #29585

Closed
wants to merge 16 commits into from

Conversation

maropu
Copy link
Member

@maropu maropu commented Aug 30, 2020

What changes were proposed in this pull request?

Some plan transformations (e.g., RemoveNoopOperators) implicitly assume the same ExprId refers to the unique attribute. But, RuleExecutor does not check this integrity between logical plan transformations. So, this PR intends to add this check in isPlanIntegral of Analyzer/Optimizer.

This PR comes from the talk with @cloud-fan @viirya in #29485 (comment)

Why are the changes needed?

For better logical plan integrity checking.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing tests.

}
}

def hasUniqueIdsForAttributes(plan: LogicalPlan): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comment for this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@@ -203,3 +203,23 @@ abstract class BinaryNode extends LogicalPlan {
abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
}

trait LogicalPlanIntegrity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If just for hasUniqueIdsForAttributes, a trait seems too much. Maybe just as an object LogicalPlanIntegrity?

@SparkQA
Copy link

SparkQA commented Aug 30, 2020

Test build #128045 has finished for PR 29585 at commit f24e82d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • trait LogicalPlanIntegrity

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we may find some broken cases, but looks it is better than I thought. This passes unit tests.

@maropu maropu force-pushed the PlanIntegrityTest branch from 5dab65a to 9d46a60 Compare August 31, 2020 00:22
p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute])
}
val groupedAttrsByExprId = allOutputAttrs
.flatten.groupBy(_.exprId).values.map(_.distinct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we actually check? name, data type and nullability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since references are canonicalized in advance, the current code just check a data type and a exprId:

case a: AttributeReference =>
AttributeReference("none", a.dataType.asNullable)(exprId = a.exprId)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of the hacks in FoldablePropagation.

We do have attributes with the same exprId but are actually different attributes, but I don't think there is an easy way to detect it automatically. For example, a + 1 as a, if I reuse the exprId of attribute a in the alias, how can we detect it? The name, data type and nullability are all the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. Current approach only detects it once the data type is changed. But we generally reuse exprId on another attribute with same data type.

p.output.filter(_.resolved).map(_.canonicalized.asInstanceOf[Attribute])
}
val groupedAttrsByExprId = allOutputAttrs
.flatten.groupBy(_.exprId).values.map(_.distinct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we actually check? name, data type and nullability?

@@ -203,3 +203,28 @@ abstract class BinaryNode extends LogicalPlan {
abstract class OrderPreservingUnaryNode extends UnaryNode {
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
}

object LogicalPlanIntegrity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we check the physical plan as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, RuleExecutor is not used for transforming physical plans now:

val preparedPlan = preparations.foldLeft(plan) { case (sp, rule) =>
val result = rule.apply(sp)
planChangeLogger.logRule(rule.ruleName, sp, result)
result
}

Is it okay to replace the code above with RuleExecutor in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we in general won't change output in physical plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Nice suggestion. Looked around the related code, I think it is not easy to catch all the ill-formed case as you said, but IMO most common patterns to cause duplicate ExprIds are a reuse of the ExprIds of references, e.g., a#1 + 1 as a#1 as you pointed above. So, how about just checking this reuse pattern in the plan integrity check? https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R238

@SparkQA
Copy link

SparkQA commented Aug 31, 2020

Test build #128057 has finished for PR 29585 at commit 5dab65a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2020

Test build #128059 has finished for PR 29585 at commit 9d46a60.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @return a rewritten plan and updated references related to a root node of
* the given `plan` for rewriting it.
*/
def rewritePlan(plan: LogicalPlan, rewritePlanMap: Map[LogicalPlan, LogicalPlan])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: To update ExprId in parent plan nodes, this method just was copied from https://github.com/apache/spark/pull/29485/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R137

if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId)
Copy link
Member Author

@maropu maropu Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alias(
If(IsNull(alwaysTrueRef),
resultWithZeroTups.get,
aggValRef), origOutput.name)(exprId = origOutput.exprId),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(IsNull(alwaysTrueRef), resultWithZeroTups.get),
(Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))),
aggValRef),
origOutput.name)(exprId = origOutput.exprId)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -266,7 +266,7 @@ case class Union(
firstAttr.withNullability(nullable)
} else {
AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
firstAttr.exprId, firstAttr.qualifier)
NamedExpression.newExprId, firstAttr.qualifier)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the bug in StreamSuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is risky. We are generating new expr ID in def output, which means the output of this plan is changing everytime you call output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe Union should have a val output in the constructor, like Generate.

Copy link
Member Author

@maropu maropu Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed the case. Yea, I'll check it around to fix the issue.

@@ -1268,7 +1267,7 @@ class StreamSuite extends StreamTest {
}

abstract class FakeSource extends StreamSourceProvider {
private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
private val fakeSchema = StructType(StructField("a", LongType) :: Nil)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: This ill-formed definition was found by the integrity check: https://github.com/apache/spark/pull/29585/files#diff-27c76f96a7b2733ecfd6f46a1716e153R224

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128215 has finished for PR 29585 at commit 759dda6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

plan.map { p =>
p.expressions.filter(_.resolved).forall { e =>
val namedExprs = e.collect {
case ne: NamedExpression if !ne.isInstanceOf[LeafExpression] => ne
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do a simple check: find all the Alias in this plan, and check the expr ID of Alias doesn't exist in the inputSet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, nice suggestion. I've pushed the commit to follow it.

p -> attrMapping
} else {
// Just passes through unresolved nodes
plan.mapChildren {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I applied this band-aid-fix to solve the issue pointed out in #29643 (comment), but this issue should be fixed in #29643. So, After it merged, I will remove this update.

@SparkQA
Copy link

SparkQA commented Sep 7, 2020

Test build #128339 has finished for PR 29585 at commit a797d66.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

e.collect {
// Only accepts the case of aliases renaming foldable expressions, e.g.,
// `FoldablePropagation` generates this renaming pattern.
case a: Alias if !a.child.foldable => a.exprId
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this filter for avoindg the failure below;

// SQLWindowFunctionSuite"
[info] - SPARK-7595: Window will cause resolve failed with self join *** FAILED *** (264 milliseconds)
[info]   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation in batch Operator Optimization before Inferring Filters, the structural integrity of the plan is broken., tree:
[info] GlobalLimit 1
[info] +- LocalLimit 1
[info]    +- Sort [0 ASC NULLS FIRST], true
[info]       +- Project [0 AS key#495, cnt_val#500L]               --- (2)
[info]          +- Join Cross, (0 = 0)
[info]             :- Project [0 AS key#495]                       --- (1)
[info]             :  +- Window [0]
[info]             :     +- Project [0 AS key#495, 1 AS value#496]
[info]             :        +- OneRowRelation
[info]             +- Project [0 AS key#501, cnt_val#500L]
[info]                +- Window [count(1) windowspecdefinition(0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt_val#500L], [0]
[info]                   +- Project [0 AS key#501, 1 AS value#502]
[info]                      +- OneRowRelation

I cannot find a way to fix this issue in FoldablePropagation because I think we cannot simply create plan remapping rules for Analyzer.rewritePlan.

Let's say that we create simple two rules below;

(1) Project [0 AS key#495]                 ==> Project [0 AS key#501]
(2) Project [0 AS key#495, cnt_val#500L]   ==> Project [0 AS key#502, cnt_val#500L]

In this case, attrMapping in Analyzer.rewritePlan has duplicate entries (key#495 -> {#501, #502})
in the Sort node and the assertion below fails:

[info]   java.lang.AssertionError: assertion failed: Found duplicate rewrite attributes

assert(!attrMapping.groupBy(_._1.exprId)
.exists(_._2.map(_._2.exprId).distinct.length > 1),
"Found duplicate rewrite attributes")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we just check

case a: Alias =>  assert(!a.references.contains(a.toAttribute))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this comment... your suggestion is like this (please check the latest commit)?

  def checkIfSameExprIdNotReused(plan: LogicalPlan): Boolean = {
    plan.collect { case p if p.resolved =>
      p.expressions.forall {
        case a: Alias => !a.references.contains(a.toAttribute)
        case _ => true
      }
    }.forall(identity)
  }

@SparkQA
Copy link

SparkQA commented Sep 7, 2020

Test build #128357 has finished for PR 29585 at commit acb2b80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (missingExpr.nonEmpty) {
extractedExprBuffer += ne
}
// alias will be cleaned in the rule CleanupAliases
ne
attr
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this code to fix the test failure below;

[info] - grouping/grouping_id inside window function *** FAILED *** (75 milliseconds)
[info]   org.apache.spark.sql.catalyst.errors.package$TreeNodeException: After applying rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions in batch Resolution, the structural integrity of the plan is broken., tree:
[info] Project [course#427, year#428, sum(earnings)#421, grouping_id(course, year)#422L, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423]
[info] +- Project [course#427, year#428, sum(earnings)#421, grouping_id(course, year)#422L, _w0#434, grouping_id(course, year)#430L, _w2#438, spark_grouping_id#426L, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423, RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423]
[info]    +- Window [rank(_w0#434) windowspecdefinition(spark_grouping_id#426L AS grouping_id(course, year)#430L, _w2#438 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(course, year) ORDER BY sum(earnings) ASC NULLS FIRST unspecifiedframe$())#423], [spark_grouping_id#426L AS grouping_id(course, year)#430L], [_w2#438 ASC NULLS FIRST]
[info]       +- Aggregate [course#427, year#428, spark_grouping_id#426L], [course#427, year#428, sum(earnings#258) AS sum(earnings)#421, spark_grouping_id#426L AS grouping_id(course, year)#429L AS grouping_id(course, year)#422L, sum(earnings#258) AS _w0#434, spark_grouping_id#426L AS grouping_id(course, year)#430L, sum(earnings#258) AS _w2#438, spark_grouping_id#426L]
[info]          +- Expand [List(course#256, year#257, earnings#258, course#424, year#425, 0), List(course#256, year#257, earnings#258, course#424, null, 1), List(course#256, year#257, earnings#258, null, year#425, 2), List(course#256, year#257, earnings#258, null, null, 3)], [course#256, year#257, earnings#258, course#427, year#428, spark_grouping_id#426L]
[info]             +- Project [course#256, year#257, earnings#258, course#256 AS course#424, year#257 AS year#425]
[info]                +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).course, true, false) AS course#256, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).year AS year#257, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$CourseSales, true])).earnings AS earnings#258]
[info]                   +- ExternalRDD [obj#255]
[info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:235)
[info]   at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)

The named expression spark_grouping_id#426L AS grouping_id(course, year)#430L was duplicated in the Window and Aggregate nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a simpler example to explain this change? It seems like if the project list is rank(a + b as c) over ..., we should keep attribute c in the window operator, instead of a + b as c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I notice that we don't need this update now because I made the checkIfSameExprIdNotReused integrity check looser in the commit (this update comes from the comment: #29585 (comment)).

Why we need this update in the previous check is as follows;

// I used the current master:
sql("SET spark.sql.planChangeLog.level=WARN")
scala> val df = Seq(("a", 1)).toDF("k", "v")
scala> val aggDf = df.cube("k").agg(sum("v"), rank().over(Window.partitionBy(grouping_id("k")).orderBy(sum("v"))))

20/09/17 23:32:58 WARN PlanChangeLogger: 
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions ===
!'Aggregate [k#191, spark_grouping_id#190L], [k#191, sum(cast(v#178 as bigint)) AS sum(v)#187L, rank(sum(cast(v#178 as bigint))) windowspecdefinition(spark_grouping_id#190L AS grouping_id(k)#192L, sum(cast(v#178 as bigint)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188]   Project [k#191, sum(v)#187L, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188]
!+- Expand [List(k#177, v#178, k#189, 0), List(k#177, v#178, null, 1)], [k#177, v#178, k#191, spark_grouping_id#190L]                                                                                                                                                                                                                                                                                                          +- Project [k#191, sum(v)#187L, _w0#196L, grouping_id(k)#192L, _w2#200L, spark_grouping_id#190L, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188, RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188]
!   +- Project [k#177, v#178, k#177 AS k#189]                                                                                                                                                                                                                                                                                                                                                                                     +- Window [rank(_w0#196L) windowspecdefinition(spark_grouping_id#190L AS grouping_id(k)#192L, _w2#200L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY grouping_id(k) ORDER BY sum(v) ASC NULLS FIRST unspecifiedframe$())#188], [spark_grouping_id#190L AS grouping_id(k)#192L], [_w2#200L ASC NULLS FIRST]
!      +- Project [_1#172 AS k#177, _2#173 AS v#178]                                                                                                                                                                                                                                                                                                                                                                                 +- Aggregate [k#191, spark_grouping_id#190L], [k#191, sum(cast(v#178 as bigint)) AS sum(v)#187L, sum(cast(v#178 as bigint)) AS _w0#196L, spark_grouping_id#190L AS grouping_id(k)#192L, sum(cast(v#178 as bigint)) AS _w2#200L, spark_grouping_id#190L]
!         +- LocalRelation [_1#172, _2#173]                                                                                                                                                                                                                                                                                                                                                                                             +- Expand [List(k#177, v#178, k#189, 0), List(k#177, v#178, null, 1)], [k#177, v#178, k#191, spark_grouping_id#190L]
!                                                                                                                                                                                                                                                                                                                                                                                                                                          +- Project [k#177, v#178, k#177 AS k#189]
!                                                                                                                                                                                                                                                                                                                                                                                                                                             +- Project [_1#172 AS k#177, _2#173 AS v#178]
!                                                                                                                                                                                                                                                                                                                                                                                                                                                +- LocalRelation [_1#172, _2#173]

ExtractWindowExpressions replicates an alias spark_grouping_id#190L AS grouping_id(k)#192L (Alias(AttributeReference(spark_grouping_id#190L), "grouping_id(k)"#192L) in both Window and the child Aggregate. The parent Window has the exirId=192L in the inputSet, so the previous integrity check fails.

Anyway, I will revert this change.

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128372 has finished for PR 29585 at commit 9bcc4e0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2020

Test build #128391 has started for PR 29585 at commit 3f21635.

@maropu
Copy link
Member Author

maropu commented Sep 29, 2020

@cloud-fan @viirya could you check this again?

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33841/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33841/

This reverts commit 4c8a81e.
This reverts commit 8b86bcf.
This reverts commit c78e517.
val t = LocalRelation('a.int, 'b.int)
assert(hasUniqueExprIdsForOutput(OutputTestPlan(t, t.output)))
assert(!hasUniqueExprIdsForOutput(OutputTestPlan(t, t.output.zipWithIndex.map {
case (a, i) => AttributeReference(s"c$i", LongType)(a.exprId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it break the check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the the same a.exprId value has the two different types: int and long. So, the check fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a.exprId is from t.output, which should be two different expr ids?

Copy link
Member Author

@maropu maropu Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same case with Union;

OutputTestPlan(output=[c0#1(long), c1#2(long)])
+- LogicalRelation [a#1(int), b#2(int)]

Then, the integrity check fails as follows;

exprId=1 => int, long
exprId=2 => int, long

In this case, we need to reassign these IDs like this;

OutputTestPlan(output=[c0#3(long), c1#4(long)])
+- LogicalRelation [a#1(int), b#2(int)]

exprId=1 => int
exprId=2 => int
exprId=3 => long
exprId=4 => long

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33853/

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33853/

@maropu
Copy link
Member Author

maropu commented Sep 29, 2020

Thanks for the review, @cloud-fan !

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129225 has finished for PR 29585 at commit 454821a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2020

Test build #129236 has finished for PR 29585 at commit e3c8742.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in 3a299aa Sep 30, 2020
@maropu
Copy link
Member Author

maropu commented Sep 30, 2020

Thanks! Merged to master.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Oct 2, 2020

@maropu
Copy link
Member Author

maropu commented Oct 2, 2020

Oh, I see and I didn't notice that. Thanks, @dongjoon-hyun ! I will check it now. If its better to revert it first, please let me know.

@dongjoon-hyun
Copy link
Member

Thank you so much, @maropu . Since it's only master and the PRBuilder is still okay, it's not urgent. Let's see.

maropu added a commit that referenced this pull request Oct 2, 2020
…ive plan changes

### What changes were proposed in this pull request?

(This is a followup PR of #29585) The PR modified `RuleExecutor#isPlanIntegral` code for checking if a plan has globally-unique attribute IDs, but this check made Jenkins maven test jobs much longer (See [the Dongjoon comment](#29585 (comment)) and thanks, dongjoon-hyun !). To recover running time for the Jenkins tests, this PR intends to update the code to run plan integrity check only for effective plans.

### Why are the changes needed?

To recover running time for Jenkins tests.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29928 from maropu/PR29585-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
a0x8o added a commit to a0x8o/spark that referenced this pull request Oct 2, 2020
…ive plan changes

### What changes were proposed in this pull request?

(This is a followup PR of #29585) The PR modified `RuleExecutor#isPlanIntegral` code for checking if a plan has globally-unique attribute IDs, but this check made Jenkins maven test jobs much longer (See [the Dongjoon comment](apache/spark#29585 (comment)) and thanks, dongjoon-hyun !). To recover running time for the Jenkins tests, this PR intends to update the code to run plan integrity check only for effective plans.

### Why are the changes needed?

To recover running time for Jenkins tests.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29928 from maropu/PR29585-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
maropu added a commit that referenced this pull request Oct 7, 2020
…eplace exprIds in a bottom-up manner

### What changes were proposed in this pull request?

This PR intends to refactor code in `RewriteCorrelatedScalarSubquery` for replacing `ExprId`s in a bottom-up manner instead of doing in a top-down one.

This PR comes from the talk with cloud-fan in #29585 (comment).

### Why are the changes needed?

To improve code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29913 from maropu/RefactorRewriteCorrelatedScalarSubquery.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
a0x8o added a commit to a0x8o/spark that referenced this pull request Oct 7, 2020
…eplace exprIds in a bottom-up manner

### What changes were proposed in this pull request?

This PR intends to refactor code in `RewriteCorrelatedScalarSubquery` for replacing `ExprId`s in a bottom-up manner instead of doing in a top-down one.

This PR comes from the talk with cloud-fan in apache/spark#29585 (comment).

### Why are the changes needed?

To improve code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29913 from maropu/RefactorRewriteCorrelatedScalarSubquery.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants