Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23433][CORE] Late zombie task completions update all tasksets #21131

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
}
}

/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it seems impossible for a unfinished TaskSet to get an empty Map() in taskSetsByStageIdAndAttempt . But, if it does, maybe, we can tell the caller the stage has already finished.

tsm.markPartitionCompleted(partitionId)
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class TaskSetManager(
val ser = env.closureSerializer.newInstance()

val tasks = taskSet.tasks
private[scheduler] val partitionToIndex = tasks.zipWithIndex
.map { case (t, idx) => t.partitionId -> idx }.toMap
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)

Expand Down Expand Up @@ -153,7 +155,7 @@ private[spark] class TaskSetManager(
private[scheduler] val speculatableTasks = new HashSet[Int]

// Task index, start and finish time for each task attempt (indexed by task ID)
private val taskInfos = new HashMap[Long, TaskInfo]
private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]

// Use a MedianHeap to record durations of successful tasks so we know when to launch
// speculative tasks. This is only used when speculation is enabled, to avoid the overhead
Expand Down Expand Up @@ -754,6 +756,9 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
// There may be multiple tasksets for this stage -- we let all of them know that the partition
// was completed. This may result in some of the tasksets getting completed.
sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
Expand All @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}

private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line needed? We will call maybeFinishTaskSet() at the end of handleSuccessfulTask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the runningTasks == 0 check inside maybeFinishTaskSet().

do you think its worth removing? I'm fine either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it.

}
}
}

/**
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,4 +917,108 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.initialize(new FakeSchedulerBackend)
}
}

test("Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()

def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
val indexInTsm = tsm.partitionToIndex(partition)
val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
}

// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
// two times, so we have three active task sets for one stage. (For this to really happen,
// you'd need the previous stage to also get restarted, and then succeed, in between each
// attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offers)
assert(tsm.runningTasks === 10)
// fail attempt
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
// the attempt is a zombie, but the tasks are still running (this could be true even if
// we actively killed those tasks, as killing is best-effort)
assert(tsm.isZombie)
assert(tsm.runningTasks === 9)
tsm
}

// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
// the stage, but this time with insufficient resources so not all tasks are active.

val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet, launched tasks has nothing to do with other running tasks in other TaskSets. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example, launching FetchFailed task or tasks who do not have a running copy across TaskSets firstly ?

(But, it seems we will always have running copies in other TaskSets for our final TaskSet, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous TaskSets.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion. I'm just trying to do a correctness fix here. Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task. And perhaps we should even actively kill those tasks (you'll see comments about that in various places). But if they do succeed, we need to handle them correctly. Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because they won't be able to get their shuffle input, same as the original fetch failure

why? In DAGScheduler, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task) may still get MapOutputs from MapOutputTrackerMaster, and fetch data from other Executors. So, they can success normally.
Do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assumption is that a fetchfailure means that all data on that host is unavailable. As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host. Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know. Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again. They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source).

Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest):

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1391

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1406

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanation is quite clear and I get understand now. Thank you very mush! @squito

finalAttempt.tasks(task.index).partitionId
}.toSet
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)

// We simulate late completions from our zombie tasksets, corresponding to all the pending
// partitions in our final attempt. This means we're only waiting on the tasks we've already
// launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
}

// If there is another resource offer, we shouldn't run anything. Though our final attempt
// used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)

val remainingTasks = finalAttemptLaunchedPartitions.toIndexedSeq.sorted

// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
// marked as zombie.
// for each of the remaining tasks, find the tasksets with an active copy of the task, and
// finish the task.
remainingTasks.foreach { partition =>
val tsm = if (partition == 0) {
// we failed this task on both zombie attempts, this one is only present in the latest
// taskset
finalTsm
} else {
// should be active in every taskset. We choose a zombie taskset just to make sure that
// we transition the active taskset correctly even if the final completion comes
// from a zombie.
zombieAttempts(partition % 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I know it's pretty nitpick but since remainingTasks is a set, you can't guarantee the final completion comes from a zombie. It's fine to keep this, or we can finish the partition 0 first instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a nitpick at all, thanks for catching this! I'll update

}
completeTaskSuccessfully(tsm, partition)
}

assert(finalTsm.isZombie)

// no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())

// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
// else succeeds, to make sure we get the right updates to the blacklist in all cases.
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can reuse the val "allTaskSets".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing that out -- though actually I'm going to go the other direction, I realized allTaskSets is not necessary at all.

val stageAttempt = tsm.taskSet.stageAttemptId
tsm.runningTasksSet.foreach { index =>
if (stageAttempt == 1) {
tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
} else {
val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
}
}

// we update the blacklist for the stage attempts with all successful tasks. Even though
// some tasksets had failures, we still consider them all successful from a blacklisting
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is meq() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mockito's eq matcher which is renamed to avoid clashing with scala's eq, this is a standard rename we use in the codebase:

https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala#L24

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the code is folded, no wonder I didn't find it. Thank you.

}
}
}