From be538299481c2f1ada2e2896bea15559a9b5f825 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 28 May 2014 15:57:05 -0700 Subject: [PATCH] [SPARK-1712]: TaskDescription instance is too big causes Spark to hang Author: witgo Closes #694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang --- .../CoarseGrainedExecutorBackend.scala | 9 ++-- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 27 ++++++++++-- .../CoarseGrainedSchedulerBackendSuite.scala | 43 +++++++++++++++++++ 4 files changed, 73 insertions(+), 8 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 84aec65b7765d..2279d77c91c89 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,11 +22,12 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Slave registration failed: " + message) System.exit(1) - case LaunchTask(taskDesc) => - logInfo("Got assigned task " + taskDesc.taskId) + case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { + val ser = SparkEnv.get.closureSerializer.newInstance() + val taskDesc = ser.deserialize[TaskDescription](data.value) + logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index ddbc74e82ac49..ca74069ef885c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { // Driver to executors - case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage + case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a6d6b3d26a3c6..e47a060683a2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,10 +27,10 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkException, TaskState} +import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var totalCoreCount = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(task) + val ser = SparkEnv.get.closureSerializer.newInstance() + val serializedTask = ser.serialize(task) + if (serializedTask.limit >= akkaFrameSize - 1024) { + val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) + scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + try { + var msg = "Serialized task %s:%d was %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes). " + + "Consider using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + taskSet.abort(msg) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + } + } + else { + freeCores(task.executorId) -= scheduler.CPUS_PER_TASK + executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..efef9d26dadca --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils} + +import org.scalatest.FunSuite + +class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext { + + test("serialized task larger than akka frame size") { + val conf = new SparkConf + conf.set("spark.akka.frameSize","1") + conf.set("spark.default.parallelism","1") + sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) + val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) + val larger = sc.parallelize(Seq(buffer)) + val thrown = intercept[SparkException] { + larger.collect() + } + assert(thrown.getMessage.contains("Consider using broadcast variables for large values")) + val smaller = sc.parallelize(1 to 4).collect() + assert(smaller.size === 4) + } + +}