-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1712]: TaskDescription instance is too big causes Spark to hang #694
Changes from 7 commits
2a89adc
86e2048
743a7ad
3ea1ca1
0e29eac
7965580
1d35c7d
9e4ffa7
4afe71d
158b2dc
0a428cf
062c182
1d35c3c
689495d
f76679b
b1174bd
b0930b0
03cc562
9a5cfad
926bd6a
3b6d48c
44a59ee
63636b6
52e6752
83ce29b
0f52483
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,10 +27,10 @@ import akka.actor._ | |
import akka.pattern.ask | ||
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} | ||
|
||
import org.apache.spark.{Logging, SparkException, TaskState} | ||
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} | ||
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} | ||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ | ||
import org.apache.spark.util.{AkkaUtils, Utils} | ||
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} | ||
|
||
/** | ||
* A scheduler backend that waits for coarse grained executors to connect to it through Akka. | ||
|
@@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A | |
var totalCoreCount = new AtomicInteger(0) | ||
val conf = scheduler.sc.conf | ||
private val timeout = AkkaUtils.askTimeout(conf) | ||
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) | ||
|
||
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { | ||
private val executorActor = new HashMap[String, ActorRef] | ||
|
@@ -140,8 +141,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A | |
// Launch tasks returned by a set of resource offers | ||
def launchTasks(tasks: Seq[Seq[TaskDescription]]) { | ||
for (task <- tasks.flatten) { | ||
val ser = SparkEnv.get.closureSerializer.newInstance() | ||
val serializedTask = ser.serialize(task) | ||
if (serializedTask.limit >= akkaFrameSize - 1024) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We try serializing a task here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L767 to make sure everything is serializable. Can we do this check there as well -- or does that not work because of task size variability within a stage? |
||
var msg = "Serialized task %s:%d were %d bytes which " + | ||
"exceeds spark.akka.frameSize (%d bytes)." | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might add to the exception "Consider using broadcast variables for large values". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also it should say "was" instead of "were" |
||
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) | ||
scheduler.error(msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This kills the entire Spark application, which seems too severe -- I think we should just abort the task set with the too-large tasks |
||
// TODO: Need to throw an exception? | ||
throw new SparkException(msg) | ||
} | ||
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK | ||
executorActor(task.executorId) ! LaunchTask(task) | ||
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing space after
=