-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark #27636
Changes from all commits
0ffdcbb
c745519
f69c217
c5ce9a7
e56ac55
d77248e
5dd1dce
64f5402
1acf665
e642754
c476e52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1873,4 +1873,67 @@ package object config { | |
.version("3.1.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_ENABLE = | ||
ConfigBuilder("spark.graceful.decommission.enable") | ||
.doc("Whether to enable the node graceful decommissioning handling") | ||
.version("3.1.0") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the documentation, I have don't know how I would configure this value. Can we provide guidance? Also, why would we or why would not want to skip fetch failed? In my mind, if a node is decommissioning and the fetch fails that's expected but retrying the same fetch is very unlikely to succeed. Or am I misunderstanding the purpose of this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tried to explain this here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this is a total number of fetch failures, or is it per stage? I'm sure I'll see it more in the code later, but if I don't understand from reading config text then the user won't either. If we keep this we may want to combined the names here, there was thread on dev about making to many .component. type names. |
||
ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") | ||
.doc("Threshold of number of times fetchfailed ignored due to node" + | ||
"decommission.This is configurable as per the need of the user and" + | ||
"depending upon type of the cloud. If we keep this a large value and " + | ||
"there is continuous decommission of nodes, in those scenarios stage" + | ||
"will never abort and keeps on retrying in an unbounded manner.") | ||
.version("3.1.0") | ||
.intConf | ||
.createWithDefault(8) | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT = | ||
ConfigBuilder("spark.graceful.decommission.executor.leasetimePct") | ||
.doc("Percentage of time to expiry after which executors are killed " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know what this mean. percentage of what time to expiry? We get a notification that node being decommissioned and I assume this is how long to let executor run before killing it, but it doesn't say what this is a percentage of |
||
"(if enabled) on the node. Value ranges between (0-100)") | ||
.version("3.1.0") | ||
.intConf | ||
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
.createWithDefault(50) // Pulled out of thin air. | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we exit at a different time? I know we probably want to keep the blocks as long as possible, but if any process is running the node can't gracefully shutdown so its seems strange to have these occur separately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This for minimising the recompute of the shuffle data and maximising the use of already generated shuffle data. Also tried to explain this in this comment #27636 (comment) |
||
ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") | ||
.doc("Percentage of time to expiry after which shuffle data " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar here we should describe what this is a percentage of. |
||
"cleaned up (if enabled) on the node. Value ranges between (0-100)" + | ||
"This value is always greater than or equal to executor" + | ||
"leaseTime (is set to be equal if incorrectly configured)." + | ||
"Near 0% would mean generated data is marked as lost too early." + | ||
"Too close to 100 would shuffle data may not get cleared proactively" + | ||
"leading to tasks going into fetchFail scenarios") | ||
.version("3.1.0") | ||
.intConf | ||
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
.createWithDefault(90) // Pulled out of thin air. | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this config used in the PR? What's its intended relationship with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added the information about this conf in the design doc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. haven't looked at design doc, but same question - I only see this used in test? If that is the case I think we atleast need a comment about it. I think I have same concern here as above, I don't really know what this config means as well. what is the min time to termination? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used in the DecommissionTracker.scala
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok, the file in the diffs must have been minimized at the time |
||
ConfigBuilder("spark.graceful.decommission.min.termination.time") | ||
.doc("Minimum time to termination below which node decommissioning is performed " + | ||
"immediately. If decommissioning time is less than the " + | ||
"configured time(spark.graceful.decommission.min.termination.time)," + | ||
"than in that scenario the executor decommissioning and shuffle data clean up will " + | ||
"take place immediately.First the executor decommission than the " + | ||
"shuffle data clean up.") | ||
.version("3.1.0") | ||
.timeConf(TimeUnit.SECONDS) | ||
.createWithDefaultString("60s") | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT = | ||
ConfigBuilder("spark.graceful.decommission.node.timeout") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I get the spot loss interaction here, the AM/RM won't know I think and we'd see more of like the K8s one happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can get the decommission timeout for hadoop-3.1 and later version of hadoop, so we can use that value to decide when the node is decommissioned. This config is added here to make decommissioning of nodes to work with multiple version of hadoop, Please find the logic used in YarnAllocator.scala to decide the timeout of the node
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so what happens with this in hadoop 3.1 and greater? is it ignored if a timeout is specified from yarn? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For hadoop3.1 and later version of hadoop there is an interface to get the value of decommissioning timeout method name - getDecommissioningTimeout. So we have used that here in the code
Since we are getting the value of decommiossioningTimeout from the RM in that scenario we will be using that value otherwise we use the value specified in the config. And also if someone has backported the hadoop3.1 change to lower version of hadoop2.8 etc , For them also they can use decommiossioningTimeout instead of using config GRACEFUL_DECOMMISSION_NODE_TIMEOUT There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so we need to update description to say when it applies There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the description |
||
.doc("Interval in seconds after which the node is decommissioned in case aws spotloss" + | ||
"the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" + | ||
"this config can be changed according to node type in the public cloud. This will" + | ||
"be applied if the decommission timeout is not sent by the Resource Manager") | ||
.version("3.1.0") | ||
.timeConf(TimeUnit.SECONDS) | ||
.createWithDefaultString("110s") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -222,6 +222,13 @@ private[spark] class DAGScheduler( | |
private val maxFailureNumTasksCheck = sc.getConf | ||
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) | ||
|
||
/** | ||
* Threshold to try number of times the ignore the fetch failed | ||
* due to decommissioning of nodes | ||
*/ | ||
private val maxIgnoredFailedStageAttempts = sc.getConf | ||
.get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) | ||
|
||
private val messageScheduler = | ||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") | ||
|
||
|
@@ -289,6 +296,13 @@ private[spark] class DAGScheduler( | |
eventProcessLoop.post(WorkerRemoved(workerId, host, message)) | ||
} | ||
|
||
/** | ||
* Called by DecommissionTracker when node is decommissioned | ||
*/ | ||
def nodeDecommissioned(host: String): Unit = { | ||
eventProcessLoop.post(NodeDecommissioned(host)) | ||
} | ||
|
||
/** | ||
* Called by TaskScheduler implementation when a host is added. | ||
*/ | ||
|
@@ -1617,10 +1631,27 @@ private[spark] class DAGScheduler( | |
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + | ||
s"(attempt ${failedStage.latestInfo.attemptNumber}) running") | ||
} else { | ||
// Gracefully handling the stage abort due to fetch failure in the | ||
// decommission nodes | ||
if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) { | ||
// Ignore stage attempts due to fetch failed only | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think comment should specifically say for decommission tracking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// once per attempt due to nodes decommissioning event | ||
if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { | ||
failedStage.ignoredDecommissionFailedStage += 1 | ||
DecommissionTracker.incrFetchFailIgnoreCnt() | ||
|
||
logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + | ||
s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + | ||
s""""totalIgnoredAttempts":"${failedStage.ignoredDecommissionFailedStage}",""" + | ||
s""""node":"$bmAddress"}""") | ||
} | ||
} | ||
failedStage.failedAttemptIds.add(task.stageAttemptId) | ||
val shouldAbortStage = | ||
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || | ||
disallowStageRetryForTest | ||
val shouldAbortStage = failedStage.failedAttemptIds.size >= | ||
(maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || | ||
disallowStageRetryForTest || | ||
failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
|
||
|
||
// It is likely that we receive multiple FetchFailed for a single stage (because we have | ||
// multiple tasks running concurrently on different executors). In that case, it is | ||
|
@@ -1661,6 +1692,10 @@ private[spark] class DAGScheduler( | |
} | ||
|
||
if (shouldAbortStage) { | ||
if (failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
&& DecommissionTracker.isDecommissionEnabled(sc.getConf)) { | ||
DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true) | ||
} | ||
val abortMessage = if (disallowStageRetryForTest) { | ||
"Fetch failure will not retry stage due to testing config" | ||
} else { | ||
|
@@ -1823,9 +1858,10 @@ private[spark] class DAGScheduler( | |
failedStage.failedAttemptIds.add(task.stageAttemptId) | ||
// TODO Refactor the failure handling logic to combine similar code with that of | ||
// FetchFailed. | ||
val shouldAbortStage = | ||
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || | ||
disallowStageRetryForTest | ||
val shouldAbortStage = failedStage.failedAttemptIds.size >= | ||
(maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || | ||
disallowStageRetryForTest || | ||
failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts | ||
|
||
if (shouldAbortStage) { | ||
val abortMessage = if (disallowStageRetryForTest) { | ||
|
@@ -1980,6 +2016,18 @@ private[spark] class DAGScheduler( | |
clearCacheLocs() | ||
} | ||
|
||
/** | ||
* Remove shuffle data mapping when node is decomissioned. | ||
* | ||
* @param host host of the node that is decommissioned | ||
*/ | ||
private[scheduler] def handleNodeDecommissioned(host: String) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if we remove the shuffle files on decommissioning then we're essentially forcing a recompute even if the files are still present and the node hasn't gone away which doesn't seem ideal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have tried to answer this query in this comment #27636 (comment) |
||
logInfo(s"Marking shuffle files lost on the decommissioning host $host") | ||
mapOutputTracker.removeOutputsOnHost(host) | ||
clearCacheLocs() | ||
} | ||
|
||
|
||
private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { | ||
// remove from failedEpoch(execId) ? | ||
if (failedEpoch.contains(execId)) { | ||
|
@@ -2281,6 +2329,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler | |
case WorkerRemoved(workerId, host, message) => | ||
dagScheduler.handleWorkerRemoved(workerId, host, message) | ||
|
||
case NodeDecommissioned(host) => | ||
dagScheduler.handleNodeDecommissioned(host) | ||
|
||
case BeginEvent(task, taskInfo) => | ||
dagScheduler.handleBeginEvent(task, taskInfo) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to handle this with
spark.worker.decommission.enabled
or not? I'm not sure just for discussion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this specific to yarn or been tied in with previous work for all cluster managers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also all the configs need to be documented in configuration.md