Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1582 Invoke Thread.interrupt() when cancelling jobs #498

Closed
wants to merge 4 commits into from

Conversation

aarondav
Copy link
Contributor

Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Sometimes executor threads are blocked waiting for IO or monitors,
and the current implementation of job killing may never recover
these threads. By simply invoking Thread.interrupt() during
cancellation, we can often safely unblock the threads and use them
for subsequent work.

Note that this feature must remain optional for now because of a
bug in HDFS where Thread.interrupt() may cause nodes to be marked
as permanently dead (as the InterruptedException is reinterpreted
as an IOException during communication with some node).
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@rxin
Copy link
Contributor

rxin commented Apr 23, 2014

@pwendell if possible this should go into 1.0 ...

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14357/

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

*/
def setJobGroup(groupId: String, description: String) {
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is not the ideal way to set this property, as this API is mainly just for initializing the job group name. However, it avoids changing a number of internal and external APIs (there are 4 calls in this function itself that call into the cancellation API through different routes to the DAGScheduler#failJobAndIndependentStages). Additionally, it provides the unique benefit that if the job is cancelled by another source (e.g., Spark fails the job, or the user uses the recently added cancel job feature in the JobProgressTab), then we can still set the flag based on this property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Aaron. Can we put the above reasoning as inline comments? Helps when we revisit this in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14361/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14390/

@shivaram
Copy link
Contributor

FYI - there is a compilation error in core:test for this

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14390/consoleFull

[error] /root/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:51: object creation impossible, since method cancelTasks in trait TaskScheduler of type (stageId: Int, interruptThread: Boolean)Unit is not defined
[error]   val taskScheduler = new TaskScheduler() {
[error]                           ^
[error] /root/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:61: method cancelTasks overrides nothing.
[error] Note: the super classes of anonymous class $anon contain the following, non final members named cancelTasks:
[error] def cancelTasks(stageId: Int,interruptThread: Boolean): Unit
[error]     override def cancelTasks(stageId: Int) {
[error]                  ^

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14395/

@pwendell
Copy link
Contributor

The tests were hung inside of FileSuite, and I killed them

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14404/

@rxin
Copy link
Contributor

rxin commented Apr 23, 2014

Thanks. I've merged this.

@asfgit asfgit closed this in 432201c Apr 24, 2014
asfgit pushed a commit that referenced this pull request Apr 24, 2014
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aaron@databricks.com>

Closes #498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

(cherry picked from commit 432201c)
Signed-off-by: Reynold Xin <rxin@apache.org>
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Apr 24, 2014
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aaron@databricks.com>

Closes apache#498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
	core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
…he#498.

Python api additions

Author: Prashant Sharma <prashant.s@imaginea.com>

== Merge branch commits ==

commit 8b51591f1a7a79a62c13ee66ff8d83040f7eccd8
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Fri Jan 24 11:50:29 2014 +0530

    Josh's and Patricks review comments.

commit d37f9677838e43bef6c18ef61fbf08055ba6d1ca
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 17:27:17 2014 +0530

    fixed doc tests

commit 27cb54bf5c99b1ea38a73858c291d0a1c43d8b7c
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 16:48:43 2014 +0530

    Added keys and values methods for PairFunctions in python

commit 4ce76b396fbaefef2386d7a36d611572bdef9b5d
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 13:51:26 2014 +0530

    Added foreachPartition

commit 05f05341a187cba829ac0e6c2bdf30be49948c89
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 13:02:59 2014 +0530

    Added coalesce fucntion to python API

commit 6568d2c2fa14845dc56322c0f39ba2e13b3b26dd
Author: Prashant Sharma <prashant.s@imaginea.com>
Date:   Thu Jan 23 12:52:44 2014 +0530

    added repartition function to python API.
markhamstra pushed a commit to markhamstra/spark that referenced this pull request May 28, 2014
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aaron@databricks.com>

Closes apache#498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs

Conflicts:
	core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
	core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
	core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.

Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).

Author: Aaron Davidson <aaron@databricks.com>

Closes apache#498 from aarondav/cancel and squashes the following commits:

e52b829 [Aaron Davidson] Don't use job.properties when null
82f78bb [Aaron Davidson] Update DAGSchedulerSuite
b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup
4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs
yifeih added a commit to yifeih/spark that referenced this pull request Feb 27, 2019
yifeih added a commit to yifeih/spark that referenced this pull request Mar 11, 2019
yifeih added a commit to yifeih/spark that referenced this pull request May 8, 2019
…ache#498)

* add initial bypass merge sort shuffle writer benchmarks

* dd unsafe shuffle writer benchmarks

* changes in bypassmergesort benchmarks

* cleanup

* add circle script

* add this branch for testing

* fix circle attempt 1

* checkout code

* add some caches?

* why is it not pull caches...

* save as artifact instead of publishing

* mkdir

* typo

* try uploading artifacts again

* try print per iteration to avoid circle erroring out on idle

* blah (apache#495)

* make a PR comment

* actually delete files

* run benchmarks on test build branch

* oops forgot to enable upload

* add sort shuffle writer benchmarks

* add stdev

* cleanup sort a bit

* fix stdev text

* fix sort shuffle

* initial code for read side

* format

* use times and sample stdev

* add assert for at least one iteration

* cleanup shuffle write to use fewer mocks and single base interface

* shuffle read works with transport client... needs lots of cleaning

* test running in cicle

* scalastyle

* dont publish results yet

* cleanup writer code

* get only git message

* fix command to get PR number

* add SortshuffleWriterBenchmark

* writer code

* cleanup

* fix benchmark script

* use ArgumentMatchers

* also in shufflewriterbenchmarkbase

* scalastyle

* add apache license

* fix some scale stuff

* fix up tests

* only copy benchmarks we care about

* increase size for reader again

* delete two writers and reader for PR

* SPARK-25299: Add shuffle reader benchmarks (apache#506)

* Revert "SPARK-25299: Add shuffle reader benchmarks (apache#506)"

This reverts commit 9d46fae.

* add -e to bash script

* blah

* enable upload as a PR comment and prevent running benchmarks on this branch

* Revert "enable upload as a PR comment and prevent running benchmarks on this branch"

This reverts commit 13703fa.

* try machine execution

* try uploading benchmarks (apache#498)

* only upload results when merging into the feature branch

* lock down machine image

* don't write input data to disk

* run benchmark test

* stop creating file cleanup threads for every block manager

* use alphanumeric again

* use a new random everytime

* close the writers -__________-

* delete branch and publish results as comment

* close in finally
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants