-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26634]Do not allow task of FetchFailureStage commit in OutputCommitCoordinator #23563
[SPARK-26634]Do not allow task of FetchFailureStage commit in OutputCommitCoordinator #23563
Conversation
@HyukjinKwon Ok, thanks, I will try to explain it with examples in the description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense, but in this code it's good to get more eyes. @squito @tgravescs @cloud-fan
* @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. | ||
* the maximum possible value of `context.partitionId`). | ||
*/ | ||
private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { | ||
private[scheduler] def stageStart( | ||
stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one parameter per line, double indented
ok to test |
stageStates.get(stage) match { | ||
case Some(state) => | ||
require(state.authorizedCommitters.length == maxPartitionId + 1) | ||
state.latestStageAttempt = stageAttemptNumber | ||
logInfo(s"Reusing state from previous attempt of stage $stage.") | ||
|
||
case _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to assign stageAttemptNumber
to latestStageAttempt
of newly create StageState
too.
stageStates.get(stage) match { | ||
case Some(state) => | ||
require(state.authorizedCommitters.length == maxPartitionId + 1) | ||
state.latestStageAttempt = stageAttemptNumber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check if current latestStageAttempt
is less than stageAttemptNumber
or not?
hmm, doesn't Task 1.0 in stage 0.0 hits FetchFailure? Or it can be successfully completed after FetchFailure? When the stage 0.1 gets launched? After FetchFailure of stage 0.0 but before it is completed? |
No, some other task fails and causes the stage to be recomputed. That's what causes the scenario where task 1.0 commits after the new stage attempt starts. |
Oh, I see. Thanks @vanzin. Won't task 1.0 in stage 0.0 release the commit lock later? Will it hold the lock forever? |
No, once you commit, that's it. The partition stays "locked" until the stage finishes. |
Test build #102310 has finished for PR 23563 at commit
|
the partition 1 of stage 0 is marked as completed, why the scheduler need to schedule this task? |
@cloud-fan @vanzin @viirya @HyukjinKwon |
Because it completes after the next stage attempt starts, so the scheduler creates new tasks for all the tasks that did not finish in the previous attempt up to that point. Anyway, if Imran's fix covers this case, that's good, no need to make this more complicated. |
btw that fix also had a bug, see followup in #22806 (which unfortunately still needs to be figured out). But that is needed for more than OutputCommitters, so that issue should still cover this. |
Based on the above I'm closing this. |
What changes were proposed in this pull request?
What's the problem?
canCommit of OutputCommitCoordinator would allow the task of FetchFailure stage commit, which result in TaskCommitDenied for the task(with the same partition as the commit task of the fetchfailure stage) of retry stage. Because of TaskCommitDenied is not counting towards failure, So the scheduler will constantly scheduling task and got TaskCommitDenied, thus causing the application hangs forever.
How does it happen?
A detailed explaination for this:
Let's say we have:
stage 0.0 . (stage id 0, attempt 0)
Stage 0.1 (stage id 0, attempt 1) started due to fetch failure for instance
Task 1.0 in stage 0.0 is successfuly compeleted after the the launch of Stage 0.1, it will hold the commit lock for partition 1. (Sure, because AuthorizedCommiters for partition 1 is not exist and the attempt is not failed.)
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 180 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 183 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 184 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 185 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 186 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 189 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 190 in 4915cb3
Task 1.0 in stage 0.1 compeleted, it can not get the commit lock. (Sure, already hold by task 1.0 in stage 0.0)
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 185 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 186 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 191 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 192 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
Line 194 in 4915cb3
Because of TaskCommitDenied not counting towards failure, TaskSetManager.handleFailedTask would not abort despite the consecutive failure of task1.x for parition 1 in stage0.1.
spark/core/src/main/scala/org/apache/spark/TaskEndReason.scala
Line 242 in 4915cb3
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Line 907 in 4915cb3
task 1 will be readded to pendingTasks and scheduler will schedule Task1.1 later.
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Line 927 in 4915cb3
Task 1.1 in stage 0.1 completed and also can not get the commit lock. and so back and forth
Logs:
How does this PR fix?
This PR will forbidden task of failed stage commit in the term of the new stage and thus solve the problem.
How was this patch tested?
unittest
Please review http://spark.apache.org/contributing.html before opening a pull request.