-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark #27636
Conversation
@SaurabhChawla100 Will start looking into this PR... thanks. |
d68bb4d
to
aafe423
Compare
.createWithDefaultString("60s") | ||
|
||
private[spark] val ABOUT_TO_BE_LOST_NODE_INTERVAL = | ||
ConfigBuilder("spark.abouttobelost.node.interval") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the design doc, you could consider naming this spark.graceful.decommission.timeout
or at least something under spark.graceful
to keep these properties grouped together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
.createWithDefault(90) // Pulled out of thin air. | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this config used in the PR? What's its intended relationship with ABOUT_TO_BE_LOST_NODE_INTERVAL
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the information about this conf in the design doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't looked at design doc, but same question - I only see this used in test? If that is the case I think we atleast need a comment about it. I think I have same concern here as above, I don't really know what this config means as well. what is the min time to termination?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used in the DecommissionTracker.scala
private val minDecommissionTime =
conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok, the file in the diffs must have been minimized at the time
|
||
private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = | ||
ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") | ||
.doc("Threshold of number of times fetchfailed ignored due to node decommission") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment here should clarify that it's the number of stage failures ignored, not number of task failures (I was a bit confused by that at first)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the description
b13eeb3
to
1141ae0
Compare
I'll take a look at this sometime this week (hopefully by Saturday) :) |
1141ae0
to
7bef8bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this PR. I'm not super familiar with the YARN code path, so we should get some folks with more YARN background to look at this. That being said I'm a little confused with some elements of the design:
- Why do we exit the executors before the shuffle service? I know we want to keep the blocks, but leaving the shuffle service probably blocks the maintaince or decom task so it seems not ideal.
- Is there a way we could just track the executors for decom like the we have basic infra for?
- The task fetch retry logic I don't think I understand what the expected case is here and how it is going to help.
* was unrelated to the task; for example, if the task failed because fetch failed exception | ||
* from the decommissioned node. | ||
*/ | ||
var countTowardsStageFailures: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We updated the value of countTowardsStageFailures when there is node decommissioning
// Do account fetch failure exception raised by decommissioned
// node against stage failure. Here the logic is to specify,
// if the task failed due to fetchFailed of decommission nodes than
// don't count towards the stageFailure. countTowardsStageFailures
// variable of TaskEndReason, that can be used in DAG scheduler to account
// fetch failure while checking the stage abort
decommissionTracker match {
case Some(decommissionTracker) =>
if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) {
logInfo(s"Do not count fetch failure from decommissioned" +
s" node ${fetchFailed.bmAddress.host}")
fetchFailed.countTowardsStageFailures = false
}
case _ =>
// No action
}
@@ -1542,4 +1542,50 @@ package object config { | |||
.bytesConf(ByteUnit.BYTE) | |||
.createOptional | |||
|
|||
private[spark] val GRACEFUL_DECOMMISSION_ENABLE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to handle this with spark.worker.decommission.enabled
or not? I'm not sure just for discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this specific to yarn or been tied in with previous work for all cluster managers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also all the configs need to be documented in configuration.md
.booleanConf | ||
.createWithDefault(false) | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the documentation, I have don't know how I would configure this value. Can we provide guidance? Also, why would we or why would not want to skip fetch failed? In my mind, if a node is decommissioning and the fetch fails that's expected but retrying the same fetch is very unlikely to succeed. Or am I misunderstanding the purpose of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to explain this here
#27636 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is a total number of fetch failures, or is it per stage? I'm sure I'll see it more in the code later, but if I don't understand from reading config text then the user won't either.
Without having read the rest of the code, this seems like kind of a weird setting to me. If we are handling decomissioned nodes then why do we need this. Is it in case the entire cluster is being decommissioned?
If we keep this we may want to combined the names here, there was thread on dev about making to many .component. type names.
.createWithDefaultString("60s") | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT = | ||
ConfigBuilder("spark.graceful.decommission.node.timeout") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I get the spot loss interaction here, the AM/RM won't know I think and we'd see more of like the K8s one happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the decommission timeout for hadoop-3.1 and later version of hadoop, so we can use that value to decide when the node is decommissioned.
Whereas for lower version of hadoop(hadoop-2.8) there is no decommissionTimeout for decommissioning nodes in those scenario we already knew from our experience that in AWS spotloss nodes will stay for 2 min and GCP preemptible VM will stay for 30 sec after receiving the node decommissioning from hadoop end.
This config is added here to make decommissioning of nodes to work with multiple version of hadoop,
Please find the logic used in YarnAllocator.scala to decide the timeout of the node
if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) {
// In hadoop 2.7 there is no support getDecommissioningTimeout whereas
// In hadoop 3.1 and later version of hadoop there is support
// of getDecommissioningTimeout So the method call made using reflection
// to update the value nodeTerminationTime and for lower version of hadoop2.7
// use the config spark.graceful.decommission.node.timeout which is specific to cloud
var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000
try {
val decommiossioningTimeout = x.getClass.getMethod(
"getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
if (decommiossioningTimeout != null) {
nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
}
} catch {
case e: NoSuchMethodException => logDebug(e.toString)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so what happens with this in hadoop 3.1 and greater? is it ignored if a timeout is specified from yarn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For hadoop3.1 and later version of hadoop there is an interface to get the value of decommissioning timeout method name - getDecommissioningTimeout. So we have used that here in the code
val decommiossioningTimeout = x.getClass.getMethod(
"getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
if (decommiossioningTimeout != null) {
nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
}
Since we are getting the value of decommiossioningTimeout from the RM in that scenario we will be using that value otherwise we use the value specified in the config.
And also if someone has backported the hadoop3.1 change to lower version of hadoop2.8 etc , For them also they can use decommiossioningTimeout instead of using config GRACEFUL_DECOMMISSION_NODE_TIMEOUT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so we need to update description to say when it applies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the description
@@ -1525,10 +1539,25 @@ private[spark] class DAGScheduler( | |||
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + | |||
s"(attempt ${failedStage.latestInfo.attemptNumber}) running") | |||
} else { | |||
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a comment describing what the intended behaviour is? Back to the questions around retrying I'm not sure this is the right thing to do, but maybe I just don't understand yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to explain this here
#27636 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the countTowardsStageFailures=false can be for other things then decommission nodes. killed tasks, blacklisted,etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
countTowardsStageFailures is introduced as the part of this PR - So this is for the decommission nodes only. Below is the code where we are setting the value countTowardsStageFailures=false
// Do account fetch failure exception raised by decommissioned
// node against stage failure. Here the logic is to specify,
// if the task failed due to fetchFailed of decommission nodes than
// don't count towards the stageFailure. countTowardsStageFailures
// variable of TaskEndReason, that can be used in DAG scheduler to account
// fetch failure while checking the stage abort
decommissionTracker match {
case Some(decommissionTracker) =>
if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) {
logInfo(s"Do not count fetch failure from decommissioned" +
s" node ${fetchFailed.bmAddress.host}")
fetchFailed.countTowardsStageFailures = false
}
case _ =>
// No action
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry, I misread it as countTowardsTaskFailures which is pre-existing. Right now the logic for this is very specific to decommision, so think it should be renamed to indicate so. If there are other cases this variable could be useful - then the question is can you differenitate those from the decomission case so you can still do this? If not just rename it to be specific to decomission
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the name of the variable here
shuffleDataDecommissionTimeMs = curTimeMs + 1000 | ||
} else { | ||
reason match { | ||
case SpotRotationLoss | NodeLoss => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is spotrotationloss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required , removed it
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") | ||
.createWithDefault(50) // Pulled out of thin air. | ||
|
||
private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would we exit at a different time? I know we probably want to keep the blocks as long as possible, but if any process is running the node can't gracefully shutdown so its seems strange to have these occur separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This for minimising the recompute of the shuffle data and maximising the use of already generated shuffle data. Also tried to explain this in this comment #27636 (comment)
// In hadoop-2.7 there is no support for node state DECOMMISSIONING | ||
// In Hadoop-2.8, hadoop3.1 and later version of spark there is a support | ||
// to node state DECOMMISSIONING. | ||
// Inorder to build the spark using hadoop2 and hadoop3, comparing the value of | ||
// DECOMMISSIONING here and for other state we are matching | ||
// the state and assigning the node state at spark end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get the first part of this comment, but the second part I'm not understanding, could you try and clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explained it in better way
@@ -798,6 +799,19 @@ private[spark] class TaskSetManager( | |||
if (fetchFailed.bmAddress != null) { | |||
blacklistTracker.foreach(_.updateBlacklistForFetchFailure( | |||
fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) | |||
|
|||
// Do account fetch failure exception raised by decommissioned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the description in the code
* getYarnNodeState is added to create the node state for Decommission Tracker | ||
*/ | ||
private[spark] object NodeState extends Enumeration { | ||
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we aren't adding this to an existing place where tracking state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the node states used in the decommission tracker, whereas in other places it was for executor State
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder the same thing. We have an executor monitor, this is for node state, but I wonder if they shouldn't be linked better. Also why is this file called ClusterInfo? Couldn't there also be other states like terminating. Perhaps we need a node monitor, but not sure if that overlaps to much with the resource-manager pieces.
But I'll have to think about it more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Above states of the yarn right now is needed here graceful decommission of nodes and the if there some other states that can be considered as other.
- document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING - DECOMMISSIONING is the yarn node state and GRACEFUL_DECOMMISSIONING we have used to for decommission tracker. Will keep on only one state here to avoid confusion i.e. DECOMMISSIONING
Thanks for reviewing this PR . Can you kindly add other experts for this , whom you consider valuable in reviewing this PR. Below I have tried to answer your queries
If the user is not using the External Shuffle Service than in that scenario we have to keep spark.graceful.decommission.executor.leasetimePct and spark.graceful.decommission.shuffedata.leasetimePct with same value to prevent fetch failure exceptions
There are chances that some of reason for the failure of stage is getting the fetch failed execption due to the decommissioned nodes,so we need to handle the abort stage gracefully here by dicounting the shuffle fetch failure due to decommissioned nodes to some extent.
This ignoredFailedStageAttempts will be considered if there is fetch failed due to node decommissioning. By this way we are making the spark application more relaible towards such failures which cannot be controlled |
227aa5f
to
80214db
Compare
Maybe @vanzin @tgravescs have some thoughts on the YARN side? |
|
||
private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT = | ||
ConfigBuilder("spark.graceful.decommission.executor.leasetimePct") | ||
.doc("Percentage of time to expiry after which executors are killed " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what this mean. percentage of what time to expiry? We get a notification that node being decommissioned and I assume this is how long to let executor run before killing it, but it doesn't say what this is a percentage of
|
||
private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = | ||
ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") | ||
.doc("Percentage of time to expiry after which shuffle data " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar here we should describe what this is a percentage of.
I assume this has to be >= so that executors don't write more shuffle data.
It might be better just to fail if this is incorrectly set assuming we can check it early enough. I guess I'll see more code later
// Gracefully handling the stage abort due to fetch failure in the | ||
// decommission nodes | ||
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { | ||
// Ignore stage attempts due to fetch failed only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think comment should specifically say for decommission tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
DecommissionTracker.incrFetchFailIgnoreCnt() | ||
failedStage.latestInfo.stageFailureIgnored(true) | ||
} | ||
logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this log message be inside the if statement above. you are just inside a conditional for fetch failure !countTowardsStageFailures. That doesn't mean its from decomissioning. I haven't seen anything specified here that marks it for decommissioning case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, moved the log line inside the if condition
test this please |
Test build #119569 has finished for PR 27636 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took one pass through, definitely need ot understand some things as a whole and how this fits in with the stuff @holdenk did as I didn't have time to look at the last version of that
@@ -91,8 +91,15 @@ private[scheduler] abstract class Stage( | |||
*/ | |||
val failedAttemptIds = new HashSet[Int] | |||
|
|||
/** | |||
* Number of times the stage failure needs to be ignored. e.g failed due to fetch failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think comment needs updating, this is the number of times it was ignored (not needs to be ignored).
Also I think we should rename to have Decommissioned in the name of it. Although I'm not sure we really know that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// node against stage failure. Here the logic is to specify, | ||
// if the task failed due to fetchFailed of decommission nodes than | ||
// don't count towards the stageFailure. countTowardsStageFailures | ||
// variable of TaskEndReason, that can be used in DAG scheduler to account |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit DAGScheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -1525,10 +1539,25 @@ private[spark] class DAGScheduler( | |||
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + | |||
s"(attempt ${failedStage.latestInfo.attemptNumber}) running") | |||
} else { | |||
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the countTowardsStageFailures=false can be for other things then decommission nodes. killed tasks, blacklisted,etc.
if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) { | ||
logInfo(s"Do not count fetch failure from decommissioned" + | ||
s" node ${fetchFailed.bmAddress.host}") | ||
fetchFailed.countTowardsStageFailures = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a lot of logic in DAGScheduler doing account and just assuming this is decommission. If we know its being decommissioned perhaps we should add something to indicate it so we aren't guessing
clock: Clock = new SystemClock()) extends Logging { | ||
|
||
def this(sc: SparkContext, | ||
client: Option[ExecutorAllocationClient], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indenatation is off
* getYarnNodeState is added to create the node state for Decommission Tracker | ||
*/ | ||
private[spark] object NodeState extends Enumeration { | ||
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder the same thing. We have an executor monitor, this is for node state, but I wonder if they shouldn't be linked better. Also why is this file called ClusterInfo? Couldn't there also be other states like terminating. Perhaps we need a node monitor, but not sure if that overlaps to much with the resource-manager pieces.
But I'll have to think about it more
type NodeState = Value | ||
|
||
// Helper method to get NodeState of the Yarn. | ||
def getYarnNodeState(state: YarnNodeState): NodeState.Value = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to see this in yarn specified file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the code
* getYarnNodeState is added to create the node state for Decommission Tracker | ||
*/ | ||
private[spark] object NodeState extends Enumeration { | ||
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING
* Node information used by CoarseGrainedSchedulerBackend. | ||
* | ||
* @param nodeState node state | ||
* @param terminationTime time at which node will terminate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update param to be optional and assume its only set when notestate is decommissioing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
/** | ||
* Cluster status information used by CoarseGrainedSchedulerBackend. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be only used by yarn, is the intention to expand and use for other resource managers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right Now Its for yarn . But In future , other Resource manager can also use this code
8ba08ba
to
dc19b9f
Compare
@SaurabhChawla100 just checking, is this ready for another review? Wasn't sure if you had time to address all my comments |
Yes the code is ready for another review. I have replied to most of your comments related to design and also refactored the code suggested in the review comments. |
Note, I want to look more at #26440 to make sure this fits in with that and we aren't duplicating functionality. If we can commonize things we should. |
4c1eea9
to
58000cb
Compare
@tgravescs @SaurabhChawla100 .. I am wondering if you guys are still pushing for this PR to get into Spark master ? Are you also thinking of targeting to Spark 3.0 branch ? I like the shuffle block handling bit in this PR and I was wondering when this might go in. Thanks ! |
sorry I haven't had time to look at this more. It will not go into spark 3.0 as that is very close to shipping. |
@agrawaldevesh - We are planning to merge this in the master branch of spark. @tgravescs @holdenk - Handle all the review comments, Can we start second round of review on this PR. |
@SaurabhChawla100 , can you briefly update the PR description to reflect how work relates to the recently merged in #27864 ? Perhaps you can leverage or enhance the abstractions added in that PR a bit ? I also don't fully understand the relationship b/w this PR and the original decommissioning PR #26440. I am trying to get a sense of the end state with all these multiple decommissioning PR's trying to stretch the framework in different ways. @holdenk or @prakharjain09, you recently (greatly) enhanced Spark's decommissioning story and I am curious on your thoughts on this PR and how you see it fitting it in with the work that you have done. From what I can tell:
Thank you for working on this. |
@agrawaldevesh - Thanks for checking this PR, Sure will check this PR #27864 and tried to use this in the current PR Regarding this
So #26440 this is based on executor decommission where in k8s/ standalone mode the pods/worker received the SIGPWR and stop scheduling the task on the new task to executors on the same worker and creating new executors on worker. Its more of decommissioning of Executors Where as in this current PR we are using the node information from the Yarn and using the information related to Node (spot node , Preemptible VM in GCP) to take an action. So on receiving the information related to node decommissioning we take the following action as described in the PR description #27636 (comment). Its more of Node Decommissioning where driver is responsible for executor decommission and shuffle decommission. In this PR we are maintaining a life cycle how node decommissioned is handled This is the life cycle of the nodes which is decommissioned Also we can create another PR for making common parts for both PR once this PR is merged , the current PR and #27864 |
(cherry picked from commit a6ff57a)
00a7c84
to
e642754
Compare
Made the code change to make use of #27864 in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this PR can be reworked to just live on top of the decommission executor framework with some enhancements. You don't need the parallel machinery for decommissioning 'nodes' in the CoarseGrainedSchedulerBackend. This is redundant and duplicates the logic that already exists pertaining to decommissioning executors. Please allow me to explain more.
The current framework for K8s and Standalone (Master) scheduler works as follows:
- CoarseGrainedSchedulerBackend received DecommissionExecutor from a variety of sources:
- The executor can send
DecommissionExecutor
toCoarseGrainedSchedulerBackend
when the executor receives a SIGPWR (in the case of k8s) - In the standalone case, when the Worker node is being decommissioned: The Master sends
ExecutorUpdated
(with stateExecutorState.DECOMMISSIONED
) for each executor in the decommissioned worker. The driver (StandaloneAppClient), then relays the DecommissionExecutor to the CoarseGrainedSchedulerBackend for each of the executors on those worker.
- The executor can send
- CoarseGrainedSchedulerBackend.DriverEndpoint#decommissionExecutor then handles all of the decommissioning logic
The standalone scheduler (Master
)'s concept of Worker
is same as YARN concept of Node
. The YARN node DECOMMISSIONING case is conceptually similar to the Master worker DECOMMISSIONED case -- both mean that the decommission is about to happen.
You should be able to do this mapping from host/node to executors in the YarnAllocator
: containerIdToExecutorIdAndResourceProfileId
maps a container id to executor id. allocatedContainerToHostMap
maps a container id to a host map. YarnAllocator#runAllocatedContainers knows of the executor id, container id and host id when it is actually spawning the executors. So this mapping of NodeId / Host to executorId can be rebuilt in the YarnAllocator. You might want to build some other reverse maps for efficiency.
If you prefer not do this mapping in the YarnAllocator, you can do this in the YarnSchedulerBackend using scheduler.getExecutorsAliveOnHost
.
You may need to enhance the existing executor decommission framework: For example, you would want to extend the DecommissionExecutor message with some timeout to signify how much more time the executor has. (Although I didn't follow if you strictly need these semantics)
This approach will allow you to piggy back on the work being done in https://issues.apache.org/jira/browse/SPARK-20624:
- They are adding support for example for eager task speculation out of the decommissioned executor.
- They already have support for re-replicating cached blocks (which is redundantly redone in this PR)
- They are adding support to offload shuffle blocks
- They will add support to have the executor early exit when all the offloading etc is done.
- It already has good testing framework and the like
Having said that, this PR does differ from #28708 mainly around its better handling of fetch failures. I think job failures caused by fetch failures can still happen in PR 28708, since shuffle block migration is somewhat best effort. Your PR goes a good distance to be more robust here. I have commented on this potential shortcoming in #28708 (comment).
Decommissioning is really important to reduce cost and play well in cloud environments. Having one battle tested framework that covers all use cases would allow such a feature to reach maturity quicker.
cc: @holdenk
Thanks.
@@ -436,6 +449,72 @@ private[yarn] class YarnAllocator( | |||
logDebug("Finished processing %d completed containers. Current running executor count: %d." | |||
.format(completedContainers.size, getNumExecutorsRunning)) | |||
} | |||
|
|||
// If the flags is enabled than GRACEFUL_DECOMMISSION_ENABLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is superfluous.
thank -> then
val getUpdatedNodes = allocateResponse.getUpdatedNodes() | ||
if (getUpdatedNodes != null) { | ||
val updatedNodes = getUpdatedNodes.asScala | ||
for (x <- updatedNodes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a better variable name than 'x'.
// use the config spark.graceful.decommission.node.timeout which is specific to cloud | ||
var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000 | ||
try { | ||
val decommiossioningTimeout = x.getClass.getMethod( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
… highest hierarchy ### What changes were proposed in this pull request? Rename `spark.worker.decommission.enabled` to `spark.decommission.enabled` and move it from `org.apache.spark.internal.config.Worker` to `org.apache.spark.internal.config.package`. ### Why are the changes needed? Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(#27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers. ### Does this PR introduce _any_ user-facing change? No, as the decommission feature hasn't been released. ### How was this patch tested? Pass existed tests. Closes #29466 from Ngone51/fix-decom-conf. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
… highest hierarchy Rename `spark.worker.decommission.enabled` to `spark.decommission.enabled` and move it from `org.apache.spark.internal.config.Worker` to `org.apache.spark.internal.config.package`. Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(apache#27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers. No, as the decommission feature hasn't been released. Pass existed tests. Closes apache#29466 from Ngone51/fix-decom-conf. Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In many public cloud environments, the node loss (in case of AWS SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed activity.
The cloud provider intimates the cluster manager about the possible loss of node ahead of time. Few examples is listed here:
a) Spot loss in AWS(2 min before event)
b) GCP Pre-emptible VM loss (30 second before event)
c) AWS Spot block loss with info on termination time (generally few tens of minutes before decommission as configured in Yarn)
Design Doc of this PR https://docs.google.com/document/d/1urypkQVZY6oMNZXLuEPifNPxo_gqJng2osKTDRfWi08/edit#heading=h.y1m32a7uhh1z
This PR tries to make spark leverage the knowledge of the node loss in future, and tries to adjust the scheduling of tasks to minimise the impact on the application.
It is well known that when a host is lost, the executors, its running tasks, their caches and also Shuffle data is lost. This could result in wastage of compute and other resources.
The focus here is to build a framework for YARN, that can be extended for other cluster managers to handle such scenario.
The framework must handle one or more of the following:-
Main components of change
a) No new task on executor
b) Remove shuffle data mapping info for the node to be decommissioned from the mapOutputTracker
c) Do not count fetchFailure from decommissioned towards stage failure
On the receiving info that node is to be decommissioned, the below action needs to be performed by DecommissionTracker on driver:
This is the life cycle of the nodes which is decommissioned
DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> TERMINATED.
Why are the changes needed?
Add the support to handle the Node Decommissioning for Yarn cluster manger in Spark
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Added the Unit Test and run the manual test for aws spot nodes on the cluster