Skip to content

Commit

Permalink
Changes to accommodate backporting of SPARK-1582, SPARK-1601 and SPAR…
Browse files Browse the repository at this point in the history
…K-1602
  • Loading branch information
markhamstra committed Apr 24, 2014
1 parent f96d67d commit f8a68d7
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 13 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
logWarning("Got an exception while waiting for another thread to load " + key, e)
}
}
logInfo("Finished waiting for %s".format(key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,13 +962,13 @@ class DAGScheduler(
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to cancel unregistered job " + jobId)
} else {
val job = idToActiveJob(jobId)
val independentStages = removeJobAndIndependentStages(jobId)
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
independentStages.foreach { taskSched.cancelTasks(_, shouldInterruptThread) }
val error = new SparkException("Job %d cancelled".format(jobId))
val job = idToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.reviveOffers()
}

override def cancelTasks(stageId: Int): Unit = synchronized {
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
// There are two possible cases here:
Expand All @@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (taskIds.size > 0) {
taskIds.foreach { tid =>
val execId = taskIdToExecutorId(tid)
backend.killTask(tid, execId)
backend.killTask(tid, execId, interruptThread)
}
}
logInfo("Stage %d was cancelled".format(stageId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit
def defaultParallelism(): Int

def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException

// Memory used by each executor (in megabytes)
protected val executorMemory: Int = SparkContext.executorMemoryRequested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[local]
case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)

private[local]
case class KillTask(taskId: Long)
case class KillTask(taskId: Long, interruptThread: Boolean)

private[spark]
class LocalActor(localScheduler: LocalScheduler, private var freeCores: Int)
Expand All @@ -62,8 +62,8 @@ class LocalActor(localScheduler: LocalScheduler, private var freeCores: Int)
launchTask(localScheduler.resourceOffer(freeCores))
}

case KillTask(taskId) =>
executor.killTask(taskId)
case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)
}

private def launchTask(tasks: Seq[TaskDescription]) {
Expand Down Expand Up @@ -128,7 +128,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
}
}

override def cancelTasks(stageId: Int): Unit = synchronized {
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
logInfo("Cancelling stage " + stageId)
logInfo("Cancelling stage " + activeTaskSets.map(_._2.stageId))
activeTaskSets.find(_._2.stageId == stageId).foreach { case (_, tsm) =>
Expand All @@ -141,7 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val taskIds = taskSetTaskIds(tsm.taskSet.id)
if (taskIds.size > 0) {
taskIds.foreach { tid =>
localActor ! KillTask(tid)
localActor ! KillTask(tid, interruptThread)
}
}
logInfo("Stage %d was cancelled".format(stageId))
Expand Down
10 changes: 8 additions & 2 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.util.concurrent.Semaphore

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

Expand Down Expand Up @@ -131,6 +131,9 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
Expand Down Expand Up @@ -160,8 +163,11 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import scala.collection.mutable.{Map, HashMap}
import scala.collection.mutable.{Map, HashMap, HashSet}

import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -49,6 +49,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont

/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()

/** Stages for which the DAGScheduler has called TaskScheduler.cancelTasks(). */
val cancelledStages = new HashSet[Int]()

val taskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
Expand Down Expand Up @@ -99,6 +103,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
taskSets.clear()
cancelledStages.clear()
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTracker()
Expand Down

0 comments on commit f8a68d7

Please sign in to comment.