Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4879] Use the Spark driver to authorize Hadoop commits. #4155

Closed
wants to merge 19 commits into from

Conversation

mccheah
Copy link
Contributor

@mccheah mccheah commented Jan 22, 2015

This is a version of #4066 which is up to date with master and has unit tests.

Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is turned on, sometimes this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.

This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see the aforementioned JIRA ticket.

In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.

This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.

Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.

cc @mingyukim @JoshRosen @ash211

Previously, SparkHadoopWriter always committed its tasks without
question. The problem is that when speculation is turned on, sometimes
this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to
speculation, the original task is not killed and may eventually commit
as well.

This can cause strange race conditions where multiple tasks that commit
interfere with each other, with the result being that some partition
files are actually lost entirely. For more context on these kinds of
scenarios, see the aforementioned JIRA ticket.

In Hadoop MapReduce jobs, the application master is a central
coordinator that authorizes whether or not any given task can commit.
Before a task commits its output, it queries the application master as
to whether or not such a commit is safe, and the application master does
bookeeping as tasks are requesting commits. Duplicate tasks that would
write to files that were already written to from other tasks are
prohibited from committing.

This patch emulates that functionality - the crucial missing component was
a central arbitrator, which is now a module called the OutputCommitCoordinator.
The coordinator lives on the driver and the executors can obtain a reference
to this actor and request its permission to commit. As tasks commit and are
reported as completed successfully or unsuccessfully by the DAGScheduler,
the commit coordinator is informed of the task completion events as well
to update its internal state.
@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25940 has started for PR 4155 at commit 6b543ba.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25941 has started for PR 4155 at commit 1c2b219.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25940 timed out for PR 4155 at commit 6b543ba after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25940/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25941 timed out for PR 4155 at commit 1c2b219 after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25941/
Test FAILed.

The SparkHadoopWriter wasn't serializable because the commit coordinator
was a class member. It doesn't need to be, so just get it on demand when
committing the task.
@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25971 has started for PR 4155 at commit f135a8e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25971 timed out for PR 4155 at commit f135a8e after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25971/
Test FAILed.

But we don't care about what's actually in the TaskInfo object - testing
OutputCommitCoordinator is done in OutputCommitCoordinatorSuite which is
the only code path that relies on the TaskInfo's fields. Nevertheless we
need the Task Info to not be null to allow that code path to not throw
an NPE.
@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25972 has started for PR 4155 at commit abc7db4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 22, 2015

Test build #25972 timed out for PR 4155 at commit abc7db4 after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25972/
Test FAILed.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 22, 2015

Looks like the tests timed out. This change is probably a large performance bottleneck, as communication back to the driver on every commit task is expensive? @JoshRosen what do you think of the performance implications here?

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25983 has started for PR 4155 at commit 83de900.

  • This patch merges cleanly.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 23, 2015

Actually it just looks like one test is hanging, so likely something not being shut down properly.

Before, a poison pill message was sent to the actor. That is not the
paradigm that is used on other actors in Spark though, so making this
more like the e.g. MapOutputTracker Actor.
@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25985 has started for PR 4155 at commit 78eb1b5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25986 has started for PR 4155 at commit 9c6a4fa.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 23, 2015

Test build #25983 timed out for PR 4155 at commit 83de900 after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25983/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26347 has started for PR 4155 at commit 60a47f4.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26347 has finished for PR 4155 at commit 60a47f4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)
    • case class TaskCommitDenied(
    • class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26347/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26348 has started for PR 4155 at commit 594e41a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26348 timed out for PR 4155 at commit 594e41a after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26348/
Test FAILed.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 30, 2015

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26365 has started for PR 4155 at commit 594e41a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26365 timed out for PR 4155 at commit 594e41a after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26365/
Test FAILed.

@JoshRosen
Copy link
Contributor

Since we're nearing a working solution, I'd like to aim to include this patch in 1.3. Since this patch involves major changes to the output commit code, I'd like to propose that we "feature-flag" the output committer behind a new configuration option. I think (but please correct me if I'm wrong) that we can safely bypass the new driver coordination when speculation is disabled (this should also alleviate some of the performance impact concerns that have been raised here). When speculation is enabled, we should perform the new checks by default, but should have an emergency "escape-hatch" option to bypass the new checks in case they present problems or contain bugs. Therefore, I think the new setting's default value could be set to spark.speculation.enabled's value. I'm open to naming settings for the new configuration, but I was thinking that it could be something like spark.hadoop.outputCommitCoordination.enabled (maybe that's too verbose). It's a toss-up on whether we'd want to include this in configuration.md, since I can't imagine that users would want to disable it unless we found bugs in the new code.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 30, 2015

@JoshRosen I'll be out of the country for two weeks on a business trip. I'll attempt to push this through the hanging unit tests that are coming up now, but I can't guarantee I'll be able to finish it soon; if this is needed for 1.3.0, might I suggest you finish off the work from here? Sorry for the inconvenience.

@JoshRosen
Copy link
Contributor

@mccheah I'm fine with picking this up and doing the final work to finish it. Just ping me once you've pushed your final changes and I'll pull these commits into my PR and continue work there.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 30, 2015

Hm, the tests that seem to be running last, don't hang on my machine. Maybe I just got really unlucky? I'm not sure what tests I should be expecting to be flaky vs. the ones we expect to pass with consistency.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 30, 2015

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26428 has started for PR 4155 at commit 594e41a.

  • This patch merges cleanly.

@mccheah
Copy link
Contributor Author

mccheah commented Jan 30, 2015

Tried running the Streaming CheckpointSuite locally, and it broke because of the new CommitDeniedException logic I added. Don't have any ideas as to how this happens except that streaming might not be using SparkHadoopWriter in a way that is compatible with this design, perhaps...

I don't think I'll be able to take this any further. Feel free to pick things up from here, @JoshRosen.

@SparkQA
Copy link

SparkQA commented Jan 31, 2015

Test build #26428 timed out for PR 4155 at commit 594e41a after a configured wait of 120m.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26428/
Test FAILed.

@JoshRosen
Copy link
Contributor

I'm continuing work on this output commit coordination patch over at #4066, in case anyone would like to help with further review.

@andrewor14
Copy link
Contributor

@mccheah looks like this is being superseded by #4066. Shall we close this PR then?

@asfgit asfgit closed this in 1cb3770 Feb 11, 2015
asfgit pushed a commit that referenced this pull request Feb 11, 2015
…eculative tasks

Previously, SparkHadoopWriter always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same file. Even though an HDFS-writing task may be re-launched due to speculation, the original task is not killed and may eventually commit as well.

This can cause strange race conditions where multiple tasks that commit interfere with each other, with the result being that some partition files are actually lost entirely. For more context on these kinds of scenarios, see SPARK-4879.

In Hadoop MapReduce jobs, the application master is a central coordinator that authorizes whether or not any given task can commit. Before a task commits its output, it queries the application master as to whether or not such a commit is safe, and the application master does bookkeeping as tasks are requesting commits. Duplicate tasks that would write to files that were already written to from other tasks are prohibited from committing.

This patch emulates that functionality - the crucial missing component was a central arbitrator, which is now a module called the OutputCommitCoordinator. The coordinator lives on the driver and the executors can obtain a reference to this actor and request its permission to commit. As tasks commit and are reported as completed successfully or unsuccessfully by the DAGScheduler, the commit coordinator is informed of the task completion events as well to update its internal state.

Future work includes more rigorous unit testing and extra optimizations should this patch cause a performance regression. It is unclear what the overall cost of communicating back to the driver on every hadoop-committing task will be. It's also important for those hitting this issue to backport this onto previous version of Spark because the bug has serious consequences, that is, data is lost.

Currently, the OutputCommitCoordinator is only used when `spark.speculation` is true.  It can be disabled by setting `spark.hadoop.outputCommitCoordination.enabled=false` in SparkConf.

This patch is an updated version of #4155 (by mccheah), which in turn was an updated version of this PR.

Closes #4155.

Author: mcheah <mcheah@palantir.com>
Author: Josh Rosen <joshrosen@databricks.com>

Closes #4066 from JoshRosen/SPARK-4879-sparkhadoopwriter-fix and squashes the following commits:

658116b [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
ed783b2 [Josh Rosen] Address Andrew’s feedback.
e7be65a [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
14861ea [Josh Rosen] splitID -> partitionID in a few places
ed8b554 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
48d5c1c [Josh Rosen] Roll back copiesRunning change in TaskSetManager
3969f5f [Josh Rosen] Re-enable guarding of commit coordination with spark.speculation setting.
ede7590 [Josh Rosen] Add test to ensure that a job that denies all commits cannot complete successfully.
97da5fe [Josh Rosen] Use actor only for RPC; call methods directly in DAGScheduler.
f582574 [Josh Rosen] Some cleanup in OutputCommitCoordinatorSuite
a7c0e29 [Josh Rosen] Create fake TaskInfo using dummy fields instead of Mockito.
997b41b [Josh Rosen] Roll back unnecessary DAGSchedulerSingleThreadedProcessLoop refactoring:
459310a [Josh Rosen] Roll back TaskSetManager changes that broke other tests.
dd00b7c [Josh Rosen] Move CommitDeniedException to executors package; remove `@DeveloperAPI` annotation.
c79df98 [Josh Rosen] Some misc. code style + doc changes:
f7d69c5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-4879-sparkhadoopwriter-fix
92e6dc9 [Josh Rosen] Bug fix: use task ID instead of StageID to index into authorizedCommitters.
b344bad [Josh Rosen] (Temporarily) re-enable “always coordinate” for testing purposes.
0aec91e [Josh Rosen] Only coordinate when speculation is enabled; add configuration option to bypass new coordination.
594e41a [mcheah] Fixing a scalastyle error
60a47f4 [mcheah] Writing proper unit test for OutputCommitCoordinator and fixing bugs.
d63f63f [mcheah] Fixing compiler error
9fe6495 [mcheah] Fixing scalastyle
1df2a91 [mcheah] Throwing exception if SparkHadoopWriter commit denied
d431144 [mcheah] Using more concurrency to process OutputCommitCoordinator requests.
c334255 [mcheah] Properly handling messages that could be sent after actor shutdown.
8d5a091 [mcheah] Was mistakenly serializing the accumulator in test suite.
9c6a4fa [mcheah] More OutputCommitCoordinator cleanup on stop()
78eb1b5 [mcheah] Better OutputCommitCoordinatorActor stopping; simpler canCommit
83de900 [mcheah] Making the OutputCommitCoordinatorMessage serializable
abc7db4 [mcheah] TaskInfo can't be null in DAGSchedulerSuite
f135a8e [mcheah] Moving the output commit coordinator from class into method.
1c2b219 [mcheah] Renaming oudated names for test function classes
66a71cd [mcheah] Removing whitespace modifications
6b543ba [mcheah] Removing redundant accumulator in unit test
c9decc6 [mcheah] Scalastyle fixes
bc80770 [mcheah] Unit tests for OutputCommitCoordinator
6e6f748 [mcheah] [SPARK-4879] Use the Spark driver to authorize Hadoop commits.

(cherry picked from commit 1cb3770)
Signed-off-by: Andrew Or <andrew@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/SparkEnv.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.