-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4879] Use the Spark driver to authorize Hadoop commits. #4155
Conversation
Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is turned on, sometimes this can result in multiple tasks committing their output to the same partition. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see the aforementioned JIRA ticket. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.
Test build #25940 has started for PR 4155 at commit
|
Test build #25941 has started for PR 4155 at commit
|
Test build #25940 timed out for PR 4155 at commit |
Test FAILed. |
Test build #25941 timed out for PR 4155 at commit |
Test FAILed. |
The SparkHadoopWriter wasn't serializable because the commit coordinator was a class member. It doesn't need to be, so just get it on demand when committing the task.
Test build #25971 has started for PR 4155 at commit
|
Test build #25971 timed out for PR 4155 at commit |
Test FAILed. |
But we don't care about what's actually in the TaskInfo object - testing OutputCommitCoordinator is done in OutputCommitCoordinatorSuite which is the only code path that relies on the TaskInfo's fields. Nevertheless we need the Task Info to not be null to allow that code path to not throw an NPE.
Test build #25972 has started for PR 4155 at commit
|
Test build #25972 timed out for PR 4155 at commit |
Test FAILed. |
Looks like the tests timed out. This change is probably a large performance bottleneck, as communication back to the driver on every commit task is expensive? @JoshRosen what do you think of the performance implications here? |
Test build #25983 has started for PR 4155 at commit
|
Actually it just looks like one test is hanging, so likely something not being shut down properly. |
Before, a poison pill message was sent to the actor. That is not the paradigm that is used on other actors in Spark though, so making this more like the e.g. MapOutputTracker Actor.
Test build #25985 has started for PR 4155 at commit
|
Test build #25986 has started for PR 4155 at commit
|
Test build #25983 timed out for PR 4155 at commit |
Test FAILed. |
Test build #26347 has started for PR 4155 at commit
|
Test build #26347 has finished for PR 4155 at commit
|
Test FAILed. |
Test build #26348 has started for PR 4155 at commit
|
Test build #26348 timed out for PR 4155 at commit |
Test FAILed. |
Jenkins, retest this please. |
Test build #26365 has started for PR 4155 at commit
|
Test build #26365 timed out for PR 4155 at commit |
Test FAILed. |
Since we're nearing a working solution, I'd like to aim to include this patch in 1.3. Since this patch involves major changes to the output commit code, I'd like to propose that we "feature-flag" the output committer behind a new configuration option. I think (but please correct me if I'm wrong) that we can safely bypass the new driver coordination when speculation is disabled (this should also alleviate some of the performance impact concerns that have been raised here). When speculation is enabled, we should perform the new checks by default, but should have an emergency "escape-hatch" option to bypass the new checks in case they present problems or contain bugs. Therefore, I think the new setting's default value could be set to |
@JoshRosen I'll be out of the country for two weeks on a business trip. I'll attempt to push this through the hanging unit tests that are coming up now, but I can't guarantee I'll be able to finish it soon; if this is needed for 1.3.0, might I suggest you finish off the work from here? Sorry for the inconvenience. |
@mccheah I'm fine with picking this up and doing the final work to finish it. Just ping me once you've pushed your final changes and I'll pull these commits into my PR and continue work there. |
Hm, the tests that seem to be running last, don't hang on my machine. Maybe I just got really unlucky? I'm not sure what tests I should be expecting to be flaky vs. the ones we expect to pass with consistency. |
Jenkins, retest this please. |
Test build #26428 has started for PR 4155 at commit
|
Tried running the Streaming CheckpointSuite locally, and it broke because of the new CommitDeniedException logic I added. Don't have any ideas as to how this happens except that streaming might not be using SparkHadoopWriter in a way that is compatible with this design, perhaps... I don't think I'll be able to take this any further. Feel free to pick things up from here, @JoshRosen. |
Test build #26428 timed out for PR 4155 at commit |
Test FAILed. |
I'm continuing work on this output commit coordination patch over at #4066, in case anyone would like to help with further review. |
…eculative tasks Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well. This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879. In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing. This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state. Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost. Currently, the OutputCommitCoordinator is only used when `spark.speculation` is true. It can be disabled by setting `spark.hadoop.outputCommitCoordination.enabled=false` in SparkConf. This patch is an updated version of #4155 (by mccheah), which in turn was an updated version of this PR. Closes #4155. Author: mcheah <mcheah@palantir.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #4066 from JoshRosen/SPARK-4879-sparkhadoopwriter-fix and squashes the following commits: 658116b [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix ed783b2 [Josh Rosen] Address Andrew’s feedback. e7be65a [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 14861ea [Josh Rosen] splitID -> partitionID in a few places ed8b554 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 48d5c1c [Josh Rosen] Roll back copiesRunning change in TaskSetManager 3969f5f [Josh Rosen] Re-enable guarding of commit coordination with spark.speculation setting. ede7590 [Josh Rosen] Add test to ensure that a job that denies all commits cannot complete successfully. 97da5fe [Josh Rosen] Use actor only for RPC; call methods directly in DAGScheduler. f582574 [Josh Rosen] Some cleanup in OutputCommitCoordinatorSuite a7c0e29 [Josh Rosen] Create fake TaskInfo using dummy fields instead of Mockito. 997b41b [Josh Rosen] Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring: 459310a [Josh Rosen] Roll back TaskSetManager changes that broke other tests. dd00b7c [Josh Rosen] Move CommitDeniedException to executors package; remove `@DeveloperAPI` annotation. c79df98 [Josh Rosen] Some misc. code style + doc changes: f7d69c5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix 92e6dc9 [Josh Rosen] Bug fix: use task ID instead of StageID to index into authorizedCommitters. b344bad [Josh Rosen] (Temporarily) re-enable “always coordinate” for testing purposes. 0aec91e [Josh Rosen] Only coordinate when speculation is enabled; add configuration option to bypass new coordination. 594e41a [mcheah] Fixing a scalastyle error 60a47f4 [mcheah] Writing proper unit test for OutputCommitCoordinator and fixing bugs. d63f63f [mcheah] Fixing compiler error 9fe6495 [mcheah] Fixing scalastyle 1df2a91 [mcheah] Throwing exception if SparkHadoopWriter commit denied d431144 [mcheah] Using more concurrency to process OutputCommitCoordinator requests. c334255 [mcheah] Properly handling messages that could be sent after actor shutdown. 8d5a091 [mcheah] Was mistakenly serializing the accumulator in test suite. 9c6a4fa [mcheah] More OutputCommitCoordinator cleanup on stop() 78eb1b5 [mcheah] Better OutputCommitCoordinatorActor stopping; simpler canCommit 83de900 [mcheah] Making the OutputCommitCoordinatorMessage serializable abc7db4 [mcheah] TaskInfo can't be null in DAGSchedulerSuite f135a8e [mcheah] Moving the output commit coordinator from class into method. 1c2b219 [mcheah] Renaming oudated names for test function classes 66a71cd [mcheah] Removing whitespace modifications 6b543ba [mcheah] Removing redundant accumulator in unit test c9decc6 [mcheah] Scalastyle fixes bc80770 [mcheah] Unit tests for OutputCommitCoordinator 6e6f748 [mcheah] [SPARK-4879] Use the Spark driver to authorize Hadoop commits. (cherry picked from commit 1cb3770) Signed-off-by: Andrew Or <andrew@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/SparkEnv.scala
This is a version of #4066 which is up to date with master and has unit tests.
Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is turned on, sometimes this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.
This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see the aforementioned JIRA ticket.
In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.
This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.
Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.
cc @mingyukim @JoshRosen @ash211