Skip to content

Commit

Permalink
SPARK-1712: TaskDescription instance is too big causes Spark to hang
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed May 7, 2014
1 parent 3eb53bd commit 2a89adc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand Down Expand Up @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)

case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc =ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {

// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -141,7 +142,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskBytes = ser.serialize(task).array()
val serializedTask = ser.serialize(taskBytes)
if (serializedTask.limit >= akkaFrameSize - 1024) {
var msg = "Serialized task %s:%d were %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes)."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
val exception = new SparkException(msg)
logError(msg, exception)
throw exception
}
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
}
}

Expand Down

0 comments on commit 2a89adc

Please sign in to comment.