diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 42e0a0f20133e..78c23f2ef6b7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -144,11 +144,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { - var msg = "Serialized task %s:%d was %d bytes which " + - "exceeds spark.akka.frameSize (%d bytes). " + - "Consider using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) - scheduler.error(msg) + val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) + scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + try { + var msg = "Serialized task %s:%d was %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes). " + + "Consider using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + taskSet.abort(msg) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + } + // scheduler.error(msg) // TODO: Need to throw an exception? // throw new SparkException(msg) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 79bfb8d0dfe78..9cd7c85698732 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import org.apache.spark.{SparkConf, SparkException, SparkContext} import org.apache.spark.util.{SerializableBuffer, AkkaUtils} import org.apache.spark.SparkContext._ -import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} @@ -41,7 +40,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with val conf = new SparkConf val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) - val buffer = new SerializableBuffer(ByteBuffer.allocate(2 * frameSize)) + val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) val larger = sc.parallelize(Seq(buffer)) val thrown = intercept[SparkException] { larger.collect()