Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1712]: TaskDescription instance is too big causes Spark to hang #694

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2a89adc
SPARK-1712: TaskDescription instance is too big causes Spark to hang
witgo May 7, 2014
86e2048
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 8, 2014
743a7ad
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 12, 2014
3ea1ca1
remove duplicate serialize
witgo May 12, 2014
0e29eac
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 14, 2014
7965580
fix Statement order
witgo May 14, 2014
1d35c7d
fix hang
witgo May 14, 2014
9e4ffa7
review commit
witgo May 14, 2014
4afe71d
review commit
witgo May 14, 2014
158b2dc
review commit
witgo May 14, 2014
0a428cf
add unit tests
witgo May 14, 2014
062c182
fix small bug for code style
witgo May 14, 2014
1d35c3c
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 14, 2014
689495d
fix scala style bug
witgo May 14, 2014
f76679b
merge master
witgo May 14, 2014
b1174bd
merge master
witgo May 14, 2014
b0930b0
review commit
witgo May 14, 2014
03cc562
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 14, 2014
9a5cfad
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 14, 2014
926bd6a
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 14, 2014
3b6d48c
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 15, 2014
44a59ee
review commit
witgo May 15, 2014
63636b6
review commit
witgo May 15, 2014
52e6752
reset test SparkContext
witgo May 15, 2014
83ce29b
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
witgo May 15, 2014
0f52483
review commit
witgo May 17, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{AkkaUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand Down Expand Up @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
System.exit(1)

case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = SparkEnv.get.closureSerializer.newInstance()
val taskDesc =ser.deserialize[TaskDescription](data.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space after =

logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
private[spark] object CoarseGrainedClusterMessages {

// Driver to executors
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkException, TaskState}
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand All @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
var totalCoreCount = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
Expand Down Expand Up @@ -140,8 +141,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try serializing a task here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L767 to make sure everything is serializable. Can we do this check there as well -- or does that not work because of task size variability within a stage?

var msg = "Serialized task %s:%d were %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes)."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might add to the exception "Consider using broadcast variables for large values".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it should say "was" instead of "were"

msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
scheduler.error(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kills the entire Spark application, which seems too severe -- I think we should just abort the task set with the too-large tasks

// TODO: Need to throw an exception?
throw new SparkException(msg)
}
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
}
}

Expand Down