-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23433][CORE] Late zombie task completions update all tasksets #21131
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,6 +73,8 @@ private[spark] class TaskSetManager( | |
val ser = env.closureSerializer.newInstance() | ||
|
||
val tasks = taskSet.tasks | ||
private[scheduler] val partitionToIndex = tasks.zipWithIndex | ||
.map { case (t, idx) => t.partitionId -> idx }.toMap | ||
val numTasks = tasks.length | ||
val copiesRunning = new Array[Int](numTasks) | ||
|
||
|
@@ -153,7 +155,7 @@ private[spark] class TaskSetManager( | |
private[scheduler] val speculatableTasks = new HashSet[Int] | ||
|
||
// Task index, start and finish time for each task attempt (indexed by task ID) | ||
private val taskInfos = new HashMap[Long, TaskInfo] | ||
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] | ||
|
||
// Use a MedianHeap to record durations of successful tasks so we know when to launch | ||
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead | ||
|
@@ -754,6 +756,9 @@ private[spark] class TaskSetManager( | |
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + | ||
" because task " + index + " has already completed successfully") | ||
} | ||
// There may be multiple tasksets for this stage -- we let all of them know that the partition | ||
// was completed. This may result in some of the tasksets getting completed. | ||
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId) | ||
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the | ||
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not | ||
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call | ||
|
@@ -764,6 +769,19 @@ private[spark] class TaskSetManager( | |
maybeFinishTaskSet() | ||
} | ||
|
||
private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = { | ||
partitionToIndex.get(partitionId).foreach { index => | ||
if (!successful(index)) { | ||
tasksSuccessful += 1 | ||
successful(index) = true | ||
if (tasksSuccessful == numTasks) { | ||
isZombie = true | ||
} | ||
maybeFinishTaskSet() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this line needed? We will call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the do you think its worth removing? I'm fine either way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it. |
||
} | ||
} | ||
} | ||
|
||
/** | ||
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the | ||
* DAG Scheduler. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -917,4 +917,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B | |
taskScheduler.initialize(new FakeSchedulerBackend) | ||
} | ||
} | ||
|
||
test("Completions in zombie tasksets update status of non-zombie taskset") { | ||
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() | ||
val valueSer = SparkEnv.get.serializer.newInstance() | ||
|
||
def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { | ||
val indexInTsm = tsm.partitionToIndex(partition) | ||
val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head | ||
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) | ||
tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) | ||
} | ||
|
||
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, | ||
// two times, so we have three active task sets for one stage. (For this to really happen, | ||
// you'd need the previous stage to also get restarted, and then succeed, in between each | ||
// attempt, but that happens outside what we're mocking here.) | ||
val zombieAttempts = (0 until 2).map { stageAttempt => | ||
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) | ||
taskScheduler.submitTasks(attempt) | ||
val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get | ||
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } | ||
taskScheduler.resourceOffers(offers) | ||
assert(tsm.runningTasks === 10) | ||
// fail attempt | ||
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, | ||
FetchFailed(null, 0, 0, 0, "fetch failed")) | ||
// the attempt is a zombie, but the tasks are still running (this could be true even if | ||
// we actively killed those tasks, as killing is best-effort) | ||
assert(tsm.isZombie) | ||
assert(tsm.runningTasks === 9) | ||
tsm | ||
} | ||
|
||
// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for | ||
// the stage, but this time with insufficient resources so not all tasks are active. | ||
|
||
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) | ||
taskScheduler.submitTasks(finalAttempt) | ||
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get | ||
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } | ||
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yet, launched tasks has nothing to do with other running tasks in other (But, it seems we will always have running copies in other There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion. I'm just trying to do a correctness fix here. Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task. And perhaps we should even actively kill those tasks (you'll see comments about that in various places). But if they do succeed, we need to handle them correctly. Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
why? In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the assumption is that a fetchfailure means that all data on that host is unavailable. As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host. Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know. Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again. They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source). Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest): There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explanation is quite clear and I get understand now. Thank you very mush! @squito |
||
finalAttempt.tasks(task.index).partitionId | ||
}.toSet | ||
assert(finalTsm.runningTasks === 5) | ||
assert(!finalTsm.isZombie) | ||
|
||
// We simulate late completions from our zombie tasksets, corresponding to all the pending | ||
// partitions in our final attempt. This means we're only waiting on the tasks we've already | ||
// launched. | ||
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) | ||
finalAttemptPendingPartitions.foreach { partition => | ||
completeTaskSuccessfully(zombieAttempts(0), partition) | ||
} | ||
|
||
// If there is another resource offer, we shouldn't run anything. Though our final attempt | ||
// used to have pending tasks, now those tasks have been completed by zombie attempts. The | ||
// remaining tasks to compute are already active in the non-zombie attempt. | ||
assert( | ||
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) | ||
|
||
val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted | ||
|
||
// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be | ||
// marked as zombie. | ||
// for each of the remaining tasks, find the tasksets with an active copy of the task, and | ||
// finish the task. | ||
remainingTasks.foreach { partition => | ||
val tsm = if (partition == 0) { | ||
// we failed this task on both zombie attempts, this one is only present in the latest | ||
// taskset | ||
finalTsm | ||
} else { | ||
// should be active in every taskset. We choose a zombie taskset just to make sure that | ||
// we transition the active taskset correctly even if the final completion comes | ||
// from a zombie. | ||
zombieAttempts(partition % 2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmm... I know it's pretty nitpick but since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not a nitpick at all, thanks for catching this! I'll update |
||
} | ||
completeTaskSuccessfully(tsm, partition) | ||
} | ||
|
||
assert(finalTsm.isZombie) | ||
|
||
// no taskset has completed all of its tasks, so no updates to the blacklist tracker yet | ||
verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject()) | ||
|
||
// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything | ||
// else succeeds, to make sure we get the right updates to the blacklist in all cases. | ||
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you can reuse the val "allTaskSets". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for pointing that out -- though actually I'm going to go the other direction, I realized |
||
val stageAttempt = tsm.taskSet.stageAttemptId | ||
tsm.runningTasksSet.foreach { index => | ||
if (stageAttempt == 1) { | ||
tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost) | ||
} else { | ||
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) | ||
tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result) | ||
} | ||
} | ||
|
||
// we update the blacklist for the stage attempts with all successful tasks. Even though | ||
// some tasksets had failures, we still consider them all successful from a blacklisting | ||
// perspective, as the failures weren't from a problem w/ the tasks themselves. | ||
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is mockito's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, the code is folded, no wonder I didn't find it. Thank you. |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, it seems impossible for a unfinished
TaskSet
to get an emptyMap()
intaskSetsByStageIdAndAttempt
. But, if it does, maybe, we can tell the caller the stage has already finished.