Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Apr 24, 2018
1 parent 0720a7c commit 707307f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ private[spark] class TaskSchedulerImpl(
/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,15 +940,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offers)
assert(tsm.runningTasks === 10)
if (stageAttempt < 2) {
// fail attempt
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
// the attempt is a zombie, but the tasks are still running (this could be true even if
// we actively killed those tasks, as killing is best-effort)
assert(tsm.isZombie)
assert(tsm.runningTasks === 9)
}
// fail attempt
tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
FetchFailed(null, 0, 0, 0, "fetch failed"))
// the attempt is a zombie, but the tasks are still running (this could be true even if
// we actively killed those tasks, as killing is best-effort)
assert(tsm.isZombie)
assert(tsm.runningTasks === 9)
tsm
}

Expand Down Expand Up @@ -979,8 +977,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)

val allTaskSets = zombieAttempts ++ Seq(finalTsm)
val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions).toIndexedSeq.sorted

// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
// marked as zombie.
Expand Down

0 comments on commit 707307f

Please sign in to comment.