From 707307fd8e468dce82d45a713ce11c9ce3f96d45 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 24 Apr 2018 10:24:15 -0500 Subject: [PATCH] review feedback --- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 19 ++++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 801a039c450cf..8e97b3da33820 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -692,7 +692,7 @@ private[spark] class TaskSchedulerImpl( /** * Marks the task has completed in all TaskSetManagers for the given stage. * - * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. + * After stage failure and retry, there may be multiple TaskSetManagers for the stage. * If an earlier attempt of a stage completes a task, we should ensure that the later attempts * do not also submit those same tasks. That also means that a task completion from an earlier * attempt can lead to the entire stage getting marked as successful. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 5f4f40bbba5cb..2fa41fcc5bce8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -940,15 +940,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } taskScheduler.resourceOffers(offers) assert(tsm.runningTasks === 10) - if (stageAttempt < 2) { - // fail attempt - tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, - FetchFailed(null, 0, 0, 0, "fetch failed")) - // the attempt is a zombie, but the tasks are still running (this could be true even if - // we actively killed those tasks, as killing is best-effort) - assert(tsm.isZombie) - assert(tsm.runningTasks === 9) - } + // fail attempt + tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) + // the attempt is a zombie, but the tasks are still running (this could be true even if + // we actively killed those tasks, as killing is best-effort) + assert(tsm.isZombie) + assert(tsm.runningTasks === 9) tsm } @@ -979,8 +977,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert( taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) - val allTaskSets = zombieAttempts ++ Seq(finalTsm) - val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions).toIndexedSeq.sorted // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be // marked as zombie.