Skip to content

Commit

Permalink
[SPARK-1712]: TaskDescription instance is too big causes Spark to hang
Browse files Browse the repository at this point in the history
Author: witgo <witgo@qq.com>

Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits:

0f52483 [witgo] review commit
83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
52e6752 [witgo] reset test SparkContext
63636b6 [witgo] review commit
44a59ee [witgo] review commit
3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
b0930b0 [witgo] review commit
b1174bd [witgo] merge master
f76679b [witgo] merge master
689495d [witgo] fix scala style bug
1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
062c182 [witgo] fix small bug for code style
0a428cf [witgo] add unit tests
158b2dc [witgo] review commit
4afe71d [witgo] review commit
9e4ffa7 [witgo] review commit
1d35c7d [witgo] fix hang
7965580 [witgo] fix Statement order
0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
3ea1ca1 [witgo] remove duplicate serialize
743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new
2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang
  • Loading branch information
witgo authored and pdeyhim committed Jun 25, 2014
1 parent 369b6dc commit be53829
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand Down Expand Up @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)

case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {

// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes). " +
"Consider using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}

import org.scalatest.FunSuite

class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {

test("serialized task larger than akka frame size") {
val conf = new SparkConf
conf.set("spark.akka.frameSize","1")
conf.set("spark.default.parallelism","1")
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
larger.collect()
}
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
val smaller = sc.parallelize(1 to 4).collect()
assert(smaller.size === 4)
}

}

0 comments on commit be53829

Please sign in to comment.