-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1582 Invoke Thread.interrupt() when cancelling jobs #498
Conversation
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job killing may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).
Merged build triggered. |
Merged build started. |
@pwendell if possible this should go into 1.0 ... |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14357/ |
Jenkins, retest this please. |
Merged build triggered. |
Merged build started. |
*/ | ||
def setJobGroup(groupId: String, description: String) { | ||
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is not the ideal way to set this property, as this API is mainly just for initializing the job group name. However, it avoids changing a number of internal and external APIs (there are 4 calls in this function itself that call into the cancellation API through different routes to the DAGScheduler#failJobAndIndependentStages
). Additionally, it provides the unique benefit that if the job is cancelled by another source (e.g., Spark fails the job, or the user uses the recently added cancel job feature in the JobProgressTab), then we can still set the flag based on this property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Aaron. Can we put the above reasoning as inline comments? Helps when we revisit this in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14361/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14390/ |
FYI - there is a compilation error in core:test for this https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14390/consoleFull
|
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14395/ |
The tests were hung inside of FileSuite, and I killed them |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Thanks. I've merged this. |
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). Author: Aaron Davidson <aaron@databricks.com> Closes #498 from aarondav/cancel and squashes the following commits: e52b829 [Aaron Davidson] Don't use job.properties when null 82f78bb [Aaron Davidson] Update DAGSchedulerSuite b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup 4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs (cherry picked from commit 432201c) Signed-off-by: Reynold Xin <rxin@apache.org>
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). Author: Aaron Davidson <aaron@databricks.com> Closes apache#498 from aarondav/cancel and squashes the following commits: e52b829 [Aaron Davidson] Don't use job.properties when null 82f78bb [Aaron Davidson] Update DAGSchedulerSuite b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup 4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
…he#498. Python api additions Author: Prashant Sharma <prashant.s@imaginea.com> == Merge branch commits == commit 8b51591f1a7a79a62c13ee66ff8d83040f7eccd8 Author: Prashant Sharma <prashant.s@imaginea.com> Date: Fri Jan 24 11:50:29 2014 +0530 Josh's and Patricks review comments. commit d37f9677838e43bef6c18ef61fbf08055ba6d1ca Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 17:27:17 2014 +0530 fixed doc tests commit 27cb54bf5c99b1ea38a73858c291d0a1c43d8b7c Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 16:48:43 2014 +0530 Added keys and values methods for PairFunctions in python commit 4ce76b396fbaefef2386d7a36d611572bdef9b5d Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 13:51:26 2014 +0530 Added foreachPartition commit 05f05341a187cba829ac0e6c2bdf30be49948c89 Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 13:02:59 2014 +0530 Added coalesce fucntion to python API commit 6568d2c2fa14845dc56322c0f39ba2e13b3b26dd Author: Prashant Sharma <prashant.s@imaginea.com> Date: Thu Jan 23 12:52:44 2014 +0530 added repartition function to python API.
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). Author: Aaron Davidson <aaron@databricks.com> Closes apache#498 from aarondav/cancel and squashes the following commits: e52b829 [Aaron Davidson] Don't use job.properties when null 82f78bb [Aaron Davidson] Update DAGSchedulerSuite b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup 4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work. Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node). Author: Aaron Davidson <aaron@databricks.com> Closes apache#498 from aarondav/cancel and squashes the following commits: e52b829 [Aaron Davidson] Don't use job.properties when null 82f78bb [Aaron Davidson] Update DAGSchedulerSuite b67f472 [Aaron Davidson] Add comment on why interruptOnCancel is in setJobGroup 4cb9fd6 [Aaron Davidson] SPARK-1582 Invoke Thread.interrupt() when cancelling jobs
…ache#498) * add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
Sometimes executor threads are blocked waiting for IO or monitors, and the current implementation of job cancellation may never recover these threads. By simply invoking Thread.interrupt() during cancellation, we can often safely unblock the threads and use them for subsequent work.
Note that this feature must remain optional for now because of a bug in HDFS where Thread.interrupt() may cause nodes to be marked as permanently dead (as the InterruptedException is reinterpreted as an IOException during communication with some node).