Skip to content

Commit

Permalink
SPARK-1601 & SPARK-1602: two bug fixes related to cancellation
Browse files Browse the repository at this point in the history
This should go into 1.0 since it would return wrong data when the bug happens (which is pretty likely if cancellation is used). Test case attached.

1. Do not put partially executed partitions into cache (in task killing).

2. Iterator returned by CacheManager#getOrCompute was not an InterruptibleIterator, and was thus leading to uninterruptible jobs.

Thanks @aarondav and @ahirreddy for reporting and helping debug.

Author: Reynold Xin <rxin@apache.org>

Closes apache#521 from rxin/kill and squashes the following commits:

401033f [Reynold Xin] Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark into kill
7a7bdd2 [Reynold Xin] Add a new line in the end of JobCancellationSuite.scala.
35cd9f7 [Reynold Xin] Fixed a bug that partially executed partitions can be put into cache (in task killing).

Conflicts:
	core/src/main/scala/org/apache/spark/CacheManager.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
  • Loading branch information
rxin authored and markhamstra committed Apr 24, 2014
1 parent c2bf02b commit e794d53
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 12 deletions.
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
if (loading.contains(key)) {
logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>}
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
Expand Down Expand Up @@ -74,7 +79,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
val returnValue: Iterator[T] = elements.iterator.asInstanceOf[Iterator[T]]

new InterruptibleIterator(context, returnValue)

} finally {
loading.synchronized {
loading.remove(key)
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ package org.apache.spark
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext
def hasNext: Boolean = {
// TODO(aarondav/rxin): Check Thread.interrupted instead of context.interrupted if interrupt
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
// introduces an expensive read fence.
if (context.interrupted) {
throw new TaskKilledException
} else {
delegate.hasNext
}
}

def next(): T = delegate.next()
}
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskKilledException.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Exception for a task getting killed.
*/
private[spark] class TaskKilledException extends RuntimeException
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ private[spark] class Executor(
class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {

object TaskKilledException extends Exception

@volatile private var killed = false
@volatile private var task: Task[Any] = _

Expand Down Expand Up @@ -202,7 +200,7 @@ private[spark] class Executor(
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw TaskKilledException
throw new TaskKilledException
}

attemptedTask = Some(task)
Expand All @@ -216,7 +214,7 @@ private[spark] class Executor(

// If the task has been killed, let's fail it.
if (task.killed) {
throw TaskKilledException
throw new TaskKilledException
}

for (m <- task.metrics) {
Expand Down Expand Up @@ -254,7 +252,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}

case TaskKilledException => {
case _: TaskKilledException | _: InterruptedException if task.killed => {
logInfo("Executor killed task " + taskId)
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
}
Expand Down
67 changes: 63 additions & 4 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,35 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("do not put partially executed partitions into cache") {
// In this test case, we create a scenario in which a partition is only partially executed,
// and make sure CacheManager does not put that partially executed partition into the
// BlockManager.
import JobCancellationSuite._
sc = new SparkContext("local", "test")

// Run from 1 to 10, and then block and wait for the task to be killed.
val rdd = sc.parallelize(1 to 1000, 2).map { x =>
if (x > 10) {
taskStartedSemaphore.release()
taskCancelledSemaphore.acquire()
}
x
}.cache()

val rdd1 = rdd.map(x => x)

future {
taskStartedSemaphore.acquire()
sc.cancelAllJobs()
taskCancelledSemaphore.release(100000)
}

intercept[SparkException] { rdd1.count() }
// If the partial block is put into cache, rdd.count() would return a number less than 1000.
assert(rdd.count() === 1000)
}

test("job group") {
sc = new SparkContext("local[2]", "test")

Expand All @@ -104,6 +133,30 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}

test("job group with interruption") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})

// jobA is the one to be cancelled.
val jobA = future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)
Expand All @@ -114,15 +167,15 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}
/*
test("two jobs sharing the same stage") {

ignore("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
// sem2: make sure the first stage is not finished until cancel is issued
val sem1 = new Semaphore(0)
val sem2 = new Semaphore(0)

sc = new SparkContext("local[2]", "test")
sc.dagScheduler.addSparkListener(new SparkListener {
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem1.release()
}
Expand All @@ -148,7 +201,7 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
intercept[SparkException] { f1.get() }
intercept[SparkException] { f2.get() }
}
*/

def testCount() {
// Cancel before launching any tasks
{
Expand Down Expand Up @@ -207,3 +260,9 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
}
}
}


object JobCancellationSuite {
val taskStartedSemaphore = new Semaphore(0)
val taskCancelledSemaphore = new Semaphore(0)
}

0 comments on commit e794d53

Please sign in to comment.