Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark #27636

Closed
wants to merge 11 commits into from

Conversation

SaurabhChawla100
Copy link
Contributor

@SaurabhChawla100 SaurabhChawla100 commented Feb 19, 2020

What changes were proposed in this pull request?

In many public cloud environments, the node loss (in case of AWS SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed activity.
The cloud provider intimates the cluster manager about the possible loss of node ahead of time. Few examples is listed here:
a) Spot loss in AWS(2 min before event)
b) GCP Pre-emptible VM loss (30 second before event)
c) AWS Spot block loss with info on termination time (generally few tens of minutes before decommission as configured in Yarn)

Design Doc of this PR https://docs.google.com/document/d/1urypkQVZY6oMNZXLuEPifNPxo_gqJng2osKTDRfWi08/edit#heading=h.y1m32a7uhh1z

This PR tries to make spark leverage the knowledge of the node loss in future, and tries to adjust the scheduling of tasks to minimise the impact on the application.
It is well known that when a host is lost, the executors, its running tasks, their caches and also Shuffle data is lost. This could result in wastage of compute and other resources.

The focus here is to build a framework for YARN, that can be extended for other cluster managers to handle such scenario.

The framework must handle one or more of the following:-

  1. Prevent new tasks from starting on any executors on decommissioning Nodes.
  2. Decide to kill the running tasks so that they can be restarted elsewhere (assuming they will not complete within the deadline) OR we can allow them to continue hoping they will finish within deadline.
  3. Clear the shuffle data entry from MapOutputTracker of decommission node hostname to prevent the shuffle fetchfailed exception.The most significant advantage of unregistering shuffle outputs when Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in only one attempt. This speeds up the recovery process significantly over the scheduled Spark implementation, where stages might be rescheduled multiple times to recompute missing shuffles from all nodes, and prevent jobs from being stuck for hours failing and recomputing.
  4. Prevent the stage to abort due to the fetchfailed exception in case of decommissioning of node. In Spark there is number of consecutive stage attempts allowed before a stage is aborted.This is controlled by the config spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due decommissioning of nodes towards stage failure improves the reliability of the system.

Main components of change

  1. Get the ClusterInfo update from the Resource Manager -> Application Master -> Spark Driver.
  2. DecommissionTracker, resides inside driver, tracks all the decommissioned nodes and take necessary action and state transition.
  3. Based on the decommission node list add hooks at code to achieve
    a) No new task on executor
    b) Remove shuffle data mapping info for the node to be decommissioned from the mapOutputTracker
    c) Do not count fetchFailure from decommissioned towards stage failure

On the receiving info that node is to be decommissioned, the below action needs to be performed by DecommissionTracker on driver:

  • Add the entry of Nodes in DecommissionTracker with termination time and node state as "DECOMMISSIONING".
  • Stop assigning any new tasks on executors on the nodes which are candidate for decommission. This makes sure slowly as the tasks finish the usage of this node would die down.
  • Kill all the executors for the decommissioning nodes after configurable period of time, say "spark.graceful.decommission.executor.leasetimePct". This killing ensures two things. Firstly, the task failure will be attributed in job failure count. Second, avoid generation on more shuffle data on the node that will eventually be lost. The node state is set to "EXECUTOR_DECOMMISSIONED".
  • Mark Shuffle data on the node as unavailable after "spark.graceful.decommission.shuffedata.leasetimePct" time. This will ensure that recomputation of missing shuffle partition is done early, rather than reducers failing with a time-consuming FetchFailure. The node state is set to "SHUFFLE_DECOMMISSIONED".
  • Mark Node as Terminated after the termination time. Now the state of the node is "TERMINATED".
  • Remove the node entry from Decommission Tracker if the same host name is reused.(This is not uncommon in many public cloud environments).

This is the life cycle of the nodes which is decommissioned
DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> TERMINATED.

Why are the changes needed?

Add the support to handle the Node Decommissioning for Yarn cluster manger in Spark

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Added the Unit Test and run the manual test for aws spot nodes on the cluster

@itskals
Copy link

itskals commented Feb 19, 2020

@SaurabhChawla100 Will start looking into this PR... thanks.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Feb 19, 2020

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Feb 19, 2020

This PR is inspired from the work done by @holdenk in #26440 and @juanrh Juan Rodriguez Hortala PR #19267

@SaurabhChawla100 SaurabhChawla100 force-pushed the SPARK-30873 branch 2 times, most recently from d68bb4d to aafe423 Compare February 20, 2020 11:52
.createWithDefaultString("60s")

private[spark] val ABOUT_TO_BE_LOST_NODE_INTERVAL =
ConfigBuilder("spark.abouttobelost.node.interval")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the design doc, you could consider naming this spark.graceful.decommission.timeout or at least something under spark.graceful to keep these properties grouped together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
.createWithDefault(90) // Pulled out of thin air.

private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config used in the PR? What's its intended relationship with ABOUT_TO_BE_LOST_NODE_INTERVAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the information about this conf in the design doc.

Copy link
Contributor

@tgravescs tgravescs Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't looked at design doc, but same question - I only see this used in test? If that is the case I think we atleast need a comment about it. I think I have same concern here as above, I don't really know what this config means as well. what is the min time to termination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the DecommissionTracker.scala

private val minDecommissionTime =
    conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, the file in the diffs must have been minimized at the time


private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD =
ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold")
.doc("Threshold of number of times fetchfailed ignored due to node decommission")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here should clarify that it's the number of stage failures ignored, not number of task failures (I was a bit confused by that at first)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the description

@SaurabhChawla100 SaurabhChawla100 force-pushed the SPARK-30873 branch 2 times, most recently from b13eeb3 to 1141ae0 Compare February 26, 2020 03:09
@SaurabhChawla100 SaurabhChawla100 changed the title [WIP][SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark [SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark Feb 26, 2020
@holdenk
Copy link
Contributor

holdenk commented Feb 26, 2020

I'll take a look at this sometime this week (hopefully by Saturday) :)

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this PR. I'm not super familiar with the YARN code path, so we should get some folks with more YARN background to look at this. That being said I'm a little confused with some elements of the design:

  1. Why do we exit the executors before the shuffle service? I know we want to keep the blocks, but leaving the shuffle service probably blocks the maintaince or decom task so it seems not ideal.
  2. Is there a way we could just track the executors for decom like the we have basic infra for?
  3. The task fetch retry logic I don't think I understand what the expected case is here and how it is going to help.

* was unrelated to the task; for example, if the task failed because fetch failed exception
* from the decommissioned node.
*/
var countTowardsStageFailures: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a var?

Copy link
Contributor Author

@SaurabhChawla100 SaurabhChawla100 Mar 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We updated the value of countTowardsStageFailures when there is node decommissioning

// Do account fetch failure exception raised by decommissioned
          // node against stage failure. Here the logic is to specify,
          // if the task failed due to fetchFailed of decommission nodes than
          // don't count towards the stageFailure. countTowardsStageFailures
          // variable of TaskEndReason, that can be used in DAG scheduler to account
          // fetch failure while checking the stage abort
          decommissionTracker match {
            case Some(decommissionTracker) =>
              if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) {
                logInfo(s"Do not count fetch failure from decommissioned" +
                  s" node ${fetchFailed.bmAddress.host}")
                fetchFailed.countTowardsStageFailures = false
              }
            case _ =>
            // No action
          }

@@ -1542,4 +1542,50 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createOptional

private[spark] val GRACEFUL_DECOMMISSION_ENABLE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to handle this with spark.worker.decommission.enabled or not? I'm not sure just for discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this specific to yarn or been tied in with previous work for all cluster managers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also all the configs need to be documented in configuration.md

.booleanConf
.createWithDefault(false)

private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation, I have don't know how I would configure this value. Can we provide guidance? Also, why would we or why would not want to skip fetch failed? In my mind, if a node is decommissioning and the fetch fails that's expected but retrying the same fetch is very unlikely to succeed. Or am I misunderstanding the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to explain this here
#27636 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is a total number of fetch failures, or is it per stage? I'm sure I'll see it more in the code later, but if I don't understand from reading config text then the user won't either.
Without having read the rest of the code, this seems like kind of a weird setting to me. If we are handling decomissioned nodes then why do we need this. Is it in case the entire cluster is being decommissioned?

If we keep this we may want to combined the names here, there was thread on dev about making to many .component. type names.

.createWithDefaultString("60s")

private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT =
ConfigBuilder("spark.graceful.decommission.node.timeout")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get the spot loss interaction here, the AM/RM won't know I think and we'd see more of like the K8s one happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the decommission timeout for hadoop-3.1 and later version of hadoop, so we can use that value to decide when the node is decommissioned.
Whereas for lower version of hadoop(hadoop-2.8) there is no decommissionTimeout for decommissioning nodes in those scenario we already knew from our experience that in AWS spotloss nodes will stay for 2 min and GCP preemptible VM will stay for 30 sec after receiving the node decommissioning from hadoop end.

This config is added here to make decommissioning of nodes to work with multiple version of hadoop,

Please find the logic used in YarnAllocator.scala to decide the timeout of the node

if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) {
          // In hadoop 2.7 there is no support getDecommissioningTimeout whereas
          // In hadoop 3.1 and later version of hadoop there is support
          // of getDecommissioningTimeout So the method call made using reflection
          // to update the value nodeTerminationTime and for lower version of hadoop2.7
          // use the config spark.graceful.decommission.node.timeout which is specific to cloud
          var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000
          try {
              val decommiossioningTimeout = x.getClass.getMethod(
                "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
              if (decommiossioningTimeout != null) {
                nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
              }
          } catch {
            case e: NoSuchMethodException => logDebug(e.toString)
          }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happens with this in hadoop 3.1 and greater? is it ignored if a timeout is specified from yarn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hadoop3.1 and later version of hadoop there is an interface to get the value of decommissioning timeout method name - getDecommissioningTimeout. So we have used that here in the code

val decommiossioningTimeout = x.getClass.getMethod(
                "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
              if (decommiossioningTimeout != null) {
                nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
              }

Since we are getting the value of decommiossioningTimeout from the RM in that scenario we will be using that value otherwise we use the value specified in the config.

And also if someone has backported the hadoop3.1 change to lower version of hadoop2.8 etc , For them also they can use decommiossioningTimeout instead of using config GRACEFUL_DECOMMISSION_NODE_TIMEOUT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so we need to update description to say when it applies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the description

@@ -1525,10 +1539,25 @@ private[spark] class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
} else {
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a comment describing what the intended behaviour is? Back to the questions around retrying I'm not sure this is the right thing to do, but maybe I just don't understand yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to explain this here
#27636 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the countTowardsStageFailures=false can be for other things then decommission nodes. killed tasks, blacklisted,etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

countTowardsStageFailures is introduced as the part of this PR - So this is for the decommission nodes only. Below is the code where we are setting the value countTowardsStageFailures=false

// Do account fetch failure exception raised by decommissioned
          // node against stage failure. Here the logic is to specify,
          // if the task failed due to fetchFailed of decommission nodes than
          // don't count towards the stageFailure. countTowardsStageFailures
          // variable of TaskEndReason, that can be used in DAG scheduler to account
          // fetch failure while checking the stage abort
          decommissionTracker match {
            case Some(decommissionTracker) =>
              if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) {
                logInfo(s"Do not count fetch failure from decommissioned" +
                  s" node ${fetchFailed.bmAddress.host}")
                fetchFailed.countTowardsStageFailures = false
              }
            case _ =>
            // No action
          }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry, I misread it as countTowardsTaskFailures which is pre-existing. Right now the logic for this is very specific to decommision, so think it should be renamed to indicate so. If there are other cases this variable could be useful - then the question is can you differenitate those from the decomission case so you can still do this? If not just rename it to be specific to decomission

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the name of the variable here

shuffleDataDecommissionTimeMs = curTimeMs + 1000
} else {
reason match {
case SpotRotationLoss | NodeLoss =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is spotrotationloss?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required , removed it

.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
.createWithDefault(50) // Pulled out of thin air.

private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we exit at a different time? I know we probably want to keep the blocks as long as possible, but if any process is running the node can't gracefully shutdown so its seems strange to have these occur separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for minimising the recompute of the shuffle data and maximising the use of already generated shuffle data. Also tried to explain this in this comment #27636 (comment)

Comment on lines 37 to 42
// In hadoop-2.7 there is no support for node state DECOMMISSIONING
// In Hadoop-2.8, hadoop3.1 and later version of spark there is a support
// to node state DECOMMISSIONING.
// Inorder to build the spark using hadoop2 and hadoop3, comparing the value of
// DECOMMISSIONING here and for other state we are matching
// the state and assigning the node state at spark end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get the first part of this comment, but the second part I'm not understanding, could you try and clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained it in better way

@@ -798,6 +799,19 @@ private[spark] class TaskSetManager(
if (fetchFailed.bmAddress != null) {
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))

// Do account fetch failure exception raised by decommissioned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the description in the code

* getYarnNodeState is added to create the node state for Decommission Tracker
*/
private[spark] object NodeState extends Enumeration {
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we aren't adding this to an existing place where tracking state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the node states used in the decommission tracker, whereas in other places it was for executor State

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder the same thing. We have an executor monitor, this is for node state, but I wonder if they shouldn't be linked better. Also why is this file called ClusterInfo? Couldn't there also be other states like terminating. Perhaps we need a node monitor, but not sure if that overlaps to much with the resource-manager pieces.
But I'll have to think about it more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Above states of the yarn right now is needed here graceful decommission of nodes and the if there some other states that can be considered as other.
  2. document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING - DECOMMISSIONING is the yarn node state and GRACEFUL_DECOMMISSIONING we have used to for decommission tracker. Will keep on only one state here to avoid confusion i.e. DECOMMISSIONING

@SaurabhChawla100
Copy link
Contributor Author

Thanks for working on this PR. I'm not super familiar with the YARN code path, so we should get some folks with more YARN background to look at this. That being said I'm a little confused with some elements of the design:

  1. Why do we exit the executors before the shuffle service? I know we want to keep the blocks, but leaving the shuffle service probably blocks the maintaince or decom task so it seems not ideal.
  2. Is there a way we could just track the executors for decom like the we have basic infra for?
  3. The task fetch retry logic I don't think I understand what the expected case is here and how it is going to help.

Thanks for reviewing this PR . Can you kindly add other experts for this , whom you consider valuable in reviewing this PR. Below I have tried to answer your queries

  1. Why do we exit the executors before the shuffle service? I know we want to keep the blocks, but leaving the shuffle service probably blocks the maintaince or decom task so it seems not ideal - This is for Spark using External Shuffle Service.There are 2 reasons why we are exiting the executors before the shuffle service
    a) As per the current logic whenever we recived the node decomissioning we stop assiging the new task to the executor running on that node. We give some time to the task already running on that executor to complete before killing the executors. If we keep the excutors running till the end, there are chances of generating more shuffle data which will be eventually lost, triggering a recompute in future. This approach minimizes the recomputation of the shuffle data and maximise the usage of that shuffle data on the node by increasing the avilability of it till the end.
    b) We want to keep the shuffle data till the time where the node is about to be lost, So if there are some task that is dependent on that shuffle data can complete and we dont have to recompute the shuffle data if none of the task required the shuffle data.

If the user is not using the External Shuffle Service than in that scenario we have to keep spark.graceful.decommission.executor.leasetimePct and spark.graceful.decommission.shuffedata.leasetimePct with same value to prevent fetch failure exceptions

  1. Is there a way we could just track the executors for decom like the we have basic infra for? - The entire logic is for decommission of nodes . So the decommission tracker has the information about the decommissioned nodes.

  2. The task fetch retry logic I don't think I understand what the expected case is here and how it is going to help. - As per the existing code this is the logic to check the abortStage

val shouldAbortStage = failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || disallowStageRetryForTest

There are chances that some of reason for the failure of stage is getting the fetch failed execption due to the decommissioned nodes,so we need to handle the abort stage gracefully here by dicounting the shuffle fetch failure due to decommissioned nodes to some extent.

val shouldAbortStage = failedStage.failedAttemptIds.size >=
              (maxConsecutiveStageAttempts + failedStage.ignoredFailedStageAttempts) ||
              disallowStageRetryForTest ||
              failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts

This ignoredFailedStageAttempts will be considered if there is fetch failed due to node decommissioning. By this way we are making the spark application more relaible towards such failures which cannot be controlled

@holdenk
Copy link
Contributor

holdenk commented Mar 7, 2020

Maybe @vanzin @tgravescs have some thoughts on the YARN side?


private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT =
ConfigBuilder("spark.graceful.decommission.executor.leasetimePct")
.doc("Percentage of time to expiry after which executors are killed " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this mean. percentage of what time to expiry? We get a notification that node being decommissioned and I assume this is how long to let executor run before killing it, but it doesn't say what this is a percentage of


private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT =
ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct")
.doc("Percentage of time to expiry after which shuffle data " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here we should describe what this is a percentage of.
I assume this has to be >= so that executors don't write more shuffle data.
It might be better just to fail if this is incorrectly set assuming we can check it early enough. I guess I'll see more code later

// Gracefully handling the stage abort due to fetch failure in the
// decommission nodes
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) {
// Ignore stage attempts due to fetch failed only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think comment should specifically say for decommission tracking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

DecommissionTracker.incrFetchFailIgnoreCnt()
failedStage.latestInfo.stageFailureIgnored(true)
}
logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this log message be inside the if statement above. you are just inside a conditional for fetch failure !countTowardsStageFailures. That doesn't mean its from decomissioning. I haven't seen anything specified here that marks it for decommissioning case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, moved the log line inside the if condition

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Mar 9, 2020

Test build #119569 has finished for PR 27636 at commit 5072edb.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took one pass through, definitely need ot understand some things as a whole and how this fits in with the stuff @holdenk did as I didn't have time to look at the last version of that

@@ -91,8 +91,15 @@ private[scheduler] abstract class Stage(
*/
val failedAttemptIds = new HashSet[Int]

/**
* Number of times the stage failure needs to be ignored. e.g failed due to fetch failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think comment needs updating, this is the number of times it was ignored (not needs to be ignored).

Also I think we should rename to have Decommissioned in the name of it. Although I'm not sure we really know that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// node against stage failure. Here the logic is to specify,
// if the task failed due to fetchFailed of decommission nodes than
// don't count towards the stageFailure. countTowardsStageFailures
// variable of TaskEndReason, that can be used in DAG scheduler to account
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit DAGScheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1525,10 +1539,25 @@ private[spark] class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
} else {
if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the countTowardsStageFailures=false can be for other things then decommission nodes. killed tasks, blacklisted,etc.

if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) {
logInfo(s"Do not count fetch failure from decommissioned" +
s" node ${fetchFailed.bmAddress.host}")
fetchFailed.countTowardsStageFailures = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a lot of logic in DAGScheduler doing account and just assuming this is decommission. If we know its being decommissioned perhaps we should add something to indicate it so we aren't guessing

clock: Clock = new SystemClock()) extends Logging {

def this(sc: SparkContext,
client: Option[ExecutorAllocationClient],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indenatation is off

* getYarnNodeState is added to create the node state for Decommission Tracker
*/
private[spark] object NodeState extends Enumeration {
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder the same thing. We have an executor monitor, this is for node state, but I wonder if they shouldn't be linked better. Also why is this file called ClusterInfo? Couldn't there also be other states like terminating. Perhaps we need a node monitor, but not sure if that overlaps to much with the resource-manager pieces.
But I'll have to think about it more

type NodeState = Value

// Helper method to get NodeState of the Yarn.
def getYarnNodeState(state: YarnNodeState): NodeState.Value = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to see this in yarn specified file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the code

* getYarnNodeState is added to create the node state for Decommission Tracker
*/
private[spark] object NodeState extends Enumeration {
val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document differences between GRACEFUL_DECOMMISSIONING and DECOMMISSIONING

* Node information used by CoarseGrainedSchedulerBackend.
*
* @param nodeState node state
* @param terminationTime time at which node will terminate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update param to be optional and assume its only set when notestate is decommissioing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/**
* Cluster status information used by CoarseGrainedSchedulerBackend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be only used by yarn, is the intention to expand and use for other resource managers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right Now Its for yarn . But In future , other Resource manager can also use this code

@SaurabhChawla100 SaurabhChawla100 force-pushed the SPARK-30873 branch 2 times, most recently from 8ba08ba to dc19b9f Compare March 15, 2020 18:00
@tgravescs
Copy link
Contributor

@SaurabhChawla100 just checking, is this ready for another review? Wasn't sure if you had time to address all my comments

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Mar 31, 2020

@SaurabhChawla100 just checking, is this ready for another review? Wasn't sure if you had time to address all my comments

Yes the code is ready for another review. I have replied to most of your comments related to design and also refactored the code suggested in the review comments.

@tgravescs
Copy link
Contributor

Note, I want to look more at #26440 to make sure this fits in with that and we aren't duplicating functionality. If we can commonize things we should.

@agrawaldevesh
Copy link
Contributor

@tgravescs @SaurabhChawla100 .. I am wondering if you guys are still pushing for this PR to get into Spark master ? Are you also thinking of targeting to Spark 3.0 branch ?

I like the shuffle block handling bit in this PR and I was wondering when this might go in. Thanks !

@tgravescs
Copy link
Contributor

sorry I haven't had time to look at this more. It will not go into spark 3.0 as that is very close to shipping.

@SaurabhChawla100
Copy link
Contributor Author

@tgravescs @SaurabhChawla100 .. I am wondering if you guys are still pushing for this PR to get into Spark master ? Are you also thinking of targeting to Spark 3.0 branch ?

I like the shuffle block handling bit in this PR and I was wondering when this might go in. Thanks !

@agrawaldevesh - We are planning to merge this in the master branch of spark.

@tgravescs @holdenk - Handle all the review comments, Can we start second round of review on this PR.

@agrawaldevesh
Copy link
Contributor

@SaurabhChawla100 , can you briefly update the PR description to reflect how work relates to the recently merged in #27864 ? Perhaps you can leverage or enhance the abstractions added in that PR a bit ?

I also don't fully understand the relationship b/w this PR and the original decommissioning PR #26440.

I am trying to get a sense of the end state with all these multiple decommissioning PR's trying to stretch the framework in different ways.

@holdenk or @prakharjain09, you recently (greatly) enhanced Spark's decommissioning story and I am curious on your thoughts on this PR and how you see it fitting it in with the work that you have done.

From what I can tell:

Thank you for working on this.

@SaurabhChawla100
Copy link
Contributor Author

SaurabhChawla100 commented Jun 10, 2020

@agrawaldevesh - Thanks for checking this PR, Sure will check this PR #27864 and tried to use this in the current PR

Regarding this

I also don't fully understand the relationship b/w this PR and the original decommissioning PR https://github.com/apache/spark/pull/26440 I am trying to get a sense of the end state with all these multiple decommissioning PR's trying to stretch the framework in different ways.

So #26440 this is based on executor decommission where in k8s/ standalone mode the pods/worker received the SIGPWR and stop scheduling the task on the new task to executors on the same worker and creating new executors on worker. Its more of decommissioning of Executors

Where as in this current PR we are using the node information from the Yarn and using the information related to Node (spot node , Preemptible VM in GCP) to take an action. So on receiving the information related to node decommissioning we take the following action as described in the PR description #27636 (comment). Its more of Node Decommissioning where driver is responsible for executor decommission and shuffle decommission. In this PR we are maintaining a life cycle how node decommissioned is handled

This is the life cycle of the nodes which is decommissioned
DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> TERMINATED.

Also we can create another PR for making common parts for both PR once this PR is merged , the current PR and #27864

@SaurabhChawla100
Copy link
Contributor Author

Made the code change to make use of #27864 in this PR.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this PR can be reworked to just live on top of the decommission executor framework with some enhancements. You don't need the parallel machinery for decommissioning 'nodes' in the CoarseGrainedSchedulerBackend. This is redundant and duplicates the logic that already exists pertaining to decommissioning executors. Please allow me to explain more.

The current framework for K8s and Standalone (Master) scheduler works as follows:

  • CoarseGrainedSchedulerBackend received DecommissionExecutor from a variety of sources:
    • The executor can send DecommissionExecutor to CoarseGrainedSchedulerBackend when the executor receives a SIGPWR (in the case of k8s)
    • In the standalone case, when the Worker node is being decommissioned: The Master sends ExecutorUpdated (with state ExecutorState.DECOMMISSIONED) for each executor in the decommissioned worker. The driver (StandaloneAppClient), then relays the DecommissionExecutor to the CoarseGrainedSchedulerBackend for each of the executors on those worker.
  • CoarseGrainedSchedulerBackend.DriverEndpoint#decommissionExecutor then handles all of the decommissioning logic

The standalone scheduler (Master)'s concept of Worker is same as YARN concept of Node. The YARN node DECOMMISSIONING case is conceptually similar to the Master worker DECOMMISSIONED case -- both mean that the decommission is about to happen.

You should be able to do this mapping from host/node to executors in the YarnAllocator: containerIdToExecutorIdAndResourceProfileId maps a container id to executor id. allocatedContainerToHostMap maps a container id to a host map. YarnAllocator#runAllocatedContainers knows of the executor id, container id and host id when it is actually spawning the executors. So this mapping of NodeId / Host to executorId can be rebuilt in the YarnAllocator. You might want to build some other reverse maps for efficiency.

If you prefer not do this mapping in the YarnAllocator, you can do this in the YarnSchedulerBackend using scheduler.getExecutorsAliveOnHost.

You may need to enhance the existing executor decommission framework: For example, you would want to extend the DecommissionExecutor message with some timeout to signify how much more time the executor has. (Although I didn't follow if you strictly need these semantics)

This approach will allow you to piggy back on the work being done in https://issues.apache.org/jira/browse/SPARK-20624:

  • They are adding support for example for eager task speculation out of the decommissioned executor.
  • They already have support for re-replicating cached blocks (which is redundantly redone in this PR)
  • They are adding support to offload shuffle blocks
  • They will add support to have the executor early exit when all the offloading etc is done.
  • It already has good testing framework and the like

Having said that, this PR does differ from #28708 mainly around its better handling of fetch failures. I think job failures caused by fetch failures can still happen in PR 28708, since shuffle block migration is somewhat best effort. Your PR goes a good distance to be more robust here. I have commented on this potential shortcoming in #28708 (comment).

Decommissioning is really important to reduce cost and play well in cloud environments. Having one battle tested framework that covers all use cases would allow such a feature to reach maturity quicker.

cc: @holdenk

Thanks.

@@ -436,6 +449,72 @@ private[yarn] class YarnAllocator(
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, getNumExecutorsRunning))
}

// If the flags is enabled than GRACEFUL_DECOMMISSION_ENABLE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is superfluous.

thank -> then

val getUpdatedNodes = allocateResponse.getUpdatedNodes()
if (getUpdatedNodes != null) {
val updatedNodes = getUpdatedNodes.asScala
for (x <- updatedNodes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a better variable name than 'x'.

// use the config spark.graceful.decommission.node.timeout which is specific to cloud
var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000
try {
val decommiossioningTimeout = x.getClass.getMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

cloud-fan pushed a commit that referenced this pull request Aug 19, 2020
… highest hierarchy

### What changes were proposed in this pull request?

Rename `spark.worker.decommission.enabled` to `spark.decommission.enabled` and move it from `org.apache.spark.internal.config.Worker` to `org.apache.spark.internal.config.package`.

### Why are the changes needed?

Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(#27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers.

### Does this PR introduce _any_ user-facing change?

No, as the decommission feature hasn't been released.

### How was this patch tested?

Pass existed tests.

Closes #29466 from Ngone51/fix-decom-conf.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Sep 22, 2020
@github-actions github-actions bot closed this Sep 23, 2020
holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
… highest hierarchy

Rename `spark.worker.decommission.enabled` to `spark.decommission.enabled` and move it from `org.apache.spark.internal.config.Worker` to `org.apache.spark.internal.config.package`.

Decommission has been supported in Standalone and k8s yet and may be supported in Yarn(apache#27636) in the future. Therefore, the switch configuration should have the highest hierarchy rather than belongs to Standalone's Worker. In other words, it should be independent of the cluster managers.

No, as the decommission feature hasn't been released.

Pass existed tests.

Closes apache#29466 from Ngone51/fix-decom-conf.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants