Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1712]: TaskDescription instance is too big causes Spark to hang #694

Closed
wants to merge 26 commits into from

Conversation

witgo
Copy link
Contributor

@witgo witgo commented May 8, 2014

No description provided.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@witgo
Copy link
Contributor Author

witgo commented May 13, 2014

There is another solution

@witgo witgo changed the title SPARK-1712: TaskDescription instance is too big causes Spark to hang [WIP][SPARK-1712]: TaskDescription instance is too big causes Spark to hang May 14, 2014
@mateiz
Copy link
Contributor

mateiz commented May 14, 2014

Jenkins, test this please

@mateiz
Copy link
Contributor

mateiz commented May 14, 2014

Hey @witgo , do you have some unit tests in mind for this? In particular what happens when it fails if you throw a SparkException inside that launchTasks method? We want the job to fail with a SparkException, and the SparkContext to remain usable.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@mateiz
Copy link
Contributor

mateiz commented May 14, 2014

BTW I kind of prefer this solution over the alternative one, at least for now. The very best solution would be to broadcast the part of the task description that's shared across tasks to all nodes. Otherwise putting these in the block store will lead to lots of replicated data. But it's better to warn for now and maybe tell them to use broadcast.

val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
var msg = "Serialized task %s:%d were %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might add to the exception "Consider using broadcast variables for large values".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it should say "was" instead of "were"

@witgo
Copy link
Contributor Author

witgo commented May 14, 2014

@mateiz
What do you think of this demo?

@mateiz
Copy link
Contributor

mateiz commented May 14, 2014

Yeah, that was the branch I was talking about above. I'm worried that if the user has some large variables, we end up with lots of blocks in the block store that are large and repetitive. So I'd leave that out for now. The best solution, which will be along those lines, will be to broadcast the body of the task using a broadcast variable and pass the per-instance pieces (Partition and such) separately.

This current fix is also smaller, which is good in the meanwhile.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14959/

@witgo
Copy link
Contributor Author

witgo commented May 14, 2014

@mateiz
Unit testing has been added

@witgo witgo changed the title [WIP][SPARK-1712]: TaskDescription instance is too big causes Spark to hang [SPARK-1712]: TaskDescription instance is too big causes Spark to hang May 14, 2014
@mateiz
Copy link
Contributor

mateiz commented May 14, 2014

Jenkins, test this please

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15009/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15016/

}
// scheduler.error(msg)
// TODO: Need to throw an exception?
// throw new SparkException(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this code in here if we're aborting the TaskSet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be removed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15062/

@mateiz
Copy link
Contributor

mateiz commented May 28, 2014

Jenkins, retest this please

@mateiz
Copy link
Contributor

mateiz commented May 28, 2014

This looks good to me, just going to do one more run through tests to make sure recent changes don't break with it. Sorry for taking a while to get back to it.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15265/

@asfgit asfgit closed this in 4dbb27b May 28, 2014
asfgit pushed a commit that referenced this pull request May 28, 2014
Author: witgo <witgo@qq.com>

Closes #694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang

(cherry picked from commit 4dbb27b)
Signed-off-by: Matei Zaharia <matei@databricks.com>
asfgit pushed a commit that referenced this pull request May 28, 2014
Author: witgo <witgo@qq.com>

Closes #694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang

Conflicts:
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@witgo witgo deleted the SPARK-1712_new branch May 29, 2014 02:04
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Jun 6, 2014
Author: witgo <witgo@qq.com>

Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang

Conflicts:
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Author: witgo <witgo@qq.com>

Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Author: witgo <witgo@qq.com>

Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang
maropu pushed a commit that referenced this pull request Mar 20, 2021
### What changes were proposed in this pull request?

Added optimizer rule `RemoveRedundantAggregates`. It removes redundant aggregates from a query plan. A redundant aggregate is an aggregate whose only goal is to keep distinct values, while its parent aggregate would ignore duplicate values.

The affected part of the query plan for TPCDS q87:

Before:
```
== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#785]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
            +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                  +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                     +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#724]
                        +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
                           +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                              :- ...
```

After:
```
== Physical Plan ==
*(26) HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition, true, [id=#751]
   +- *(25) HashAggregate(keys=[], functions=[partial_count(1)])
      +- *(25) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
         +- Exchange hashpartitioning(c_last_name#61, c_first_name#60, d_date#26, 5), true, [id=#694]
            +- *(24) HashAggregate(keys=[c_last_name#61, c_first_name#60, d_date#26], functions=[])
               +- SortMergeJoin [coalesce(c_last_name#61, ), isnull(c_last_name#61), coalesce(c_first_name#60, ), isnull(c_first_name#60), coalesce(d_date#26, 0), isnull(d_date#26)], [coalesce(c_last_name#221, ), isnull(c_last_name#221), coalesce(c_first_name#220, ), isnull(c_first_name#220), coalesce(d_date#186, 0), isnull(d_date#186)], LeftAnti
                  :- ...
```

### Why are the changes needed?

Performance improvements - few TPCDS queries have these kinds of duplicate aggregates.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Benchmarks (sf=5):

OpenJDK 64-Bit Server VM 1.8.0_265-b01 on Linux 5.8.13-arch1-1
Intel(R) Core(TM) i5-6500 CPU  3.20GHz

| Query | Before  | After | Speedup |
| ------| ------- | ------| ------- |
| q14a | 44s | 44s | 1x |
| q14b | 41s | 41s | 1x |
| q38  | 6.5s | 5.9s | 1.1x |
| q87  | 7.2s | 6.8s | 1.1x |
| q14a-v2.7 | 55s | 53s | 1x |

Closes #30018 from tanelk/SPARK-33122.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@reach-u.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Agirish pushed a commit to HPEEzmeral/apache-spark that referenced this pull request May 5, 2022
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants