-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1712]: TaskDescription instance is too big causes Spark to hang #694
Conversation
Can one of the admins verify this patch? |
There is another solution |
Jenkins, test this please |
Hey @witgo , do you have some unit tests in mind for this? In particular what happens when it fails if you throw a SparkException inside that launchTasks method? We want the job to fail with a SparkException, and the SparkContext to remain usable. |
Merged build triggered. |
Merged build started. |
BTW I kind of prefer this solution over the alternative one, at least for now. The very best solution would be to broadcast the part of the task description that's shared across tasks to all nodes. Otherwise putting these in the block store will lead to lots of replicated data. But it's better to warn for now and maybe tell them to use broadcast. |
val serializedTask = ser.serialize(task) | ||
if (serializedTask.limit >= akkaFrameSize - 1024) { | ||
var msg = "Serialized task %s:%d were %d bytes which " + | ||
"exceeds spark.akka.frameSize (%d bytes)." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might add to the exception "Consider using broadcast variables for large values".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it should say "was" instead of "were"
Yeah, that was the branch I was talking about above. I'm worried that if the user has some large variables, we end up with lots of blocks in the block store that are large and repetitive. So I'd leave that out for now. The best solution, which will be along those lines, will be to broadcast the body of the task using a broadcast variable and pass the per-instance pieces (Partition and such) separately. This current fix is also smaller, which is good in the meanwhile. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@mateiz |
Jenkins, test this please |
Merged build triggered. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15009/ |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
} | ||
// scheduler.error(msg) | ||
// TODO: Need to throw an exception? | ||
// throw new SparkException(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this code in here if we're aborting the TaskSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be removed.
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Jenkins, retest this please |
This looks good to me, just going to do one more run through tests to make sure recent changes don't break with it. Sorry for taking a while to get back to it. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Author: witgo <witgo@qq.com> Closes #694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang (cherry picked from commit 4dbb27b) Signed-off-by: Matei Zaharia <matei@databricks.com>
Author: witgo <witgo@qq.com> Closes #694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang Conflicts: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Author: witgo <witgo@qq.com> Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang Conflicts: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Author: witgo <witgo@qq.com> Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang
Author: witgo <witgo@qq.com> Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang
### What changes were proposed in this pull request? Added optimizer rule `RemoveRedundantAggregates`. It removes redundant aggregates from a query plan. A redundant aggregate is an aggregate whose only goal is to keep distinct values, while its parent aggregate would ignore duplicate values. The affected part of the query plan for TPCDS q87: Before: ``` == Physical Plan == *(26) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition, true, [id=#785] +- *(25) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#724] +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti :- ... ``` After: ``` == Physical Plan == *(26) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition, true, [id=#751] +- *(25) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#694] +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[]) +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti :- ... ``` ### Why are the changes needed? Performance improvements - few TPCDS queries have these kinds of duplicate aggregates. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Benchmarks (sf=5): OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Linux 5.8.13-arch1-1 Intel(R) Core(TM) i5-6500 CPU 3.20GHz | Query | Before | After | Speedup | | ------| ------- | ------| ------- | | q14a | 44s | 44s | 1x | | q14b | 41s | 41s | 1x | | q38 | 6.5s | 5.9s | 1.1x | | q87 | 7.2s | 6.8s | 1.1x | | q14a-v2.7 | 55s | 53s | 1x | Closes #30018 from tanelk/SPARK-33122. Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com> Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…ary Index enabled on fields (apache#694)
…ary Index enabled on fields (apache#694)
No description provided.