From db86ccb11821231d85b727fb889dec1d58b39e4d Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 6 Mar 2019 11:53:07 -0600 Subject: [PATCH] [SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi Co-authored-by: Ngone51 Signed-off-by: Imran Rashid (cherry picked from commit e5c61436a5720f13eb6d530ebf80635522bd64c6) Signed-off-by: Imran Rashid --- .../spark/scheduler/TaskSchedulerImpl.scala | 36 +++++++++++++-- .../spark/scheduler/TaskSetManager.scala | 19 +++++--- .../scheduler/TaskSchedulerImplSuite.scala | 44 ++++++++++++++----- 3 files changed, 79 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 24d77f88db982..4d45fd268d219 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.Set -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} import scala.util.Random import org.apache.spark._ @@ -94,6 +94,9 @@ private[spark] class TaskSchedulerImpl( private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] val taskIdToExecutorId = new HashMap[Long, String] + // Protected by `this` + private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet] + @volatile private var hasReceivedTask = false @volatile private var hasLaunchedTask = false private val starvationTimer = new Timer(true) @@ -236,7 +239,20 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + // only create a BitSet once for a certain stage since we only remove + // that stage when an active TaskSetManager succeed. + stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet) + val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + // TaskSet got submitted by DAGScheduler may have some already completed + // tasks since DAGScheduler does not always know all the tasks that have + // been completed by other tasksets when completing a stage, so we mark + // those tasks as finished here to avoid launching duplicate tasks, while + // holding the TaskSchedulerImpl lock. + // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()` + stageIdToFinishedPartitions.get(taskSet.stageId).foreach { + finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None)) + } + tsm } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { @@ -833,19 +849,31 @@ private[spark] class TaskSchedulerImpl( } /** - * Marks the task has completed in all TaskSetManagers for the given stage. + * Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage. * * After stage failure and retry, there may be multiple TaskSetManagers for the stage. * If an earlier attempt of a stage completes a task, we should ensure that the later attempts * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. + * And there is also the possibility that the DAGScheduler submits another taskset at the same + * time as we're marking a task completed here -- that taskset would have a task for a partition + * that was already completed. We maintain the set of finished partitions in + * stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset + * is submitted. See SPARK-25250 for more details. + * + * note: this method must be called with a lock on this. */ private[scheduler] def markPartitionCompletedInAllTaskSets( stageId: Int, partitionId: Int, taskInfo: TaskInfo) = { + // if we do not find a BitSet for this stage, which means an active TaskSetManager + // has already succeeded and removed the stage. + stageIdToFinishedPartitions.get(stageId).foreach{ + finishedPartitions => finishedPartitions += partitionId + } taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => - tsm.markPartitionCompleted(partitionId, taskInfo) + tsm.markPartitionCompleted(partitionId, Some(taskInfo)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6bf60dd8e9dfa..71940ee7d803e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet} import scala.math.max import scala.util.control.NonFatal @@ -777,7 +777,11 @@ private[spark] class TaskSetManager( // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { - isZombie = true + // clean up finished partitions for the stage when the active TaskSetManager succeed + if (!isZombie) { + sched.stageIdToFinishedPartitions -= stageId + isZombie = true + } } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + @@ -796,16 +800,21 @@ private[spark] class TaskSetManager( maybeFinishTaskSet() } - private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = { + private[scheduler] def markPartitionCompleted( + partitionId: Int, + taskInfo: Option[TaskInfo]): Unit = { partitionToIndex.get(partitionId).foreach { index => if (!successful(index)) { if (speculationEnabled && !isZombie) { - successfulTaskDurations.insert(taskInfo.duration) + taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) } } tasksSuccessful += 1 successful(index) = true if (tasksSuccessful == numTasks) { - isZombie = true + if (!isZombie) { + sched.stageIdToFinishedPartitions -= stageId + isZombie = true + } } maybeFinishTaskSet() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 29172b4664e32..9ad3b7fdcf40e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1102,7 +1102,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } } - test("Completions in zombie tasksets update status of non-zombie taskset") { + test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") { val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() val valueSer = SparkEnv.get.serializer.newInstance() @@ -1114,9 +1114,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, - // two times, so we have three active task sets for one stage. (For this to really happen, - // you'd need the previous stage to also get restarted, and then succeed, in between each - // attempt, but that happens outside what we're mocking here.) + // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this + // to really happen, you'd need the previous stage to also get restarted, and then succeed, + // in between each attempt, but that happens outside what we're mocking here.) val zombieAttempts = (0 until 2).map { stageAttempt => val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) taskScheduler.submitTasks(attempt) @@ -1133,13 +1133,33 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(tsm.runningTasks === 9) tsm } + // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active + // attempt exists in taskScheduler by now. + + // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit. + // This is possible since the behaviour of submitting new attempt and handling successful task + // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop" + // separately. + (0 until 2).foreach { i => + completeTaskSuccessfully(zombieAttempts(i), i + 1) + assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1)) + } - // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for - // the stage, but this time with insufficient resources so not all tasks are active. - + // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread + // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with + // already completed tasks. And this time with insufficient resources so not all tasks are + // active. val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) taskScheduler.submitTasks(finalAttempt) val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get + // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should + // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch + // any duplicate tasks later (SPARK-25250). + (0 until 2).map(_ + 1).foreach { partitionId => + val index = finalTsm.partitionToIndex(partitionId) + assert(finalTsm.successful(index)) + } + val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => finalAttempt.tasks(task.index).partitionId @@ -1147,16 +1167,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(finalTsm.runningTasks === 5) assert(!finalTsm.isZombie) - // We simulate late completions from our zombie tasksets, corresponding to all the pending - // partitions in our final attempt. This means we're only waiting on the tasks we've already - // launched. + // We continually simulate late completions from our zombie tasksets(but this time, there's one + // active attempt exists in taskScheduler), corresponding to all the pending partitions in our + // final attempt. This means we're only waiting on the tasks we've already launched. val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) finalAttemptPendingPartitions.foreach { partition => completeTaskSuccessfully(zombieAttempts(0), partition) + assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition)) } // If there is another resource offer, we shouldn't run anything. Though our final attempt - // used to have pending tasks, now those tasks have been completed by zombie attempts. The + // used to have pending tasks, now those tasks have been completed by zombie attempts. The // remaining tasks to compute are already active in the non-zombie attempt. assert( taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) @@ -1204,6 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // perspective, as the failures weren't from a problem w/ the tasks themselves. verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) } + assert(taskScheduler.stageIdToFinishedPartitions.isEmpty) } test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {