Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30873][CORE][YARN]Handling Node Decommissioning for Yarn cluster manger in Spark #27636

Closed
wants to merge 11 commits into from
Closed
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ sealed trait TaskFailedReason extends TaskEndReason {
* on was killed.
*/
def countTowardsTaskFailures: Boolean = true

/**
* Whether this task failure should be counted towards the maximum number of times the stage is
* allowed to fail before the stage is aborted. Set to false in cases where the task's failure
* was unrelated to the task; for example, if the task failed because fetch failed exception
* from the decommissioned node.
*/
var countTowardsDecommissionStageFailures: Boolean = true
}

/**
Expand Down
63 changes: 63 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1873,4 +1873,67 @@ package object config {
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val GRACEFUL_DECOMMISSION_ENABLE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to handle this with spark.worker.decommission.enabled or not? I'm not sure just for discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this specific to yarn or been tied in with previous work for all cluster managers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also all the configs need to be documented in configuration.md

ConfigBuilder("spark.graceful.decommission.enable")
.doc("Whether to enable the node graceful decommissioning handling")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation, I have don't know how I would configure this value. Can we provide guidance? Also, why would we or why would not want to skip fetch failed? In my mind, if a node is decommissioning and the fetch fails that's expected but retrying the same fetch is very unlikely to succeed. Or am I misunderstanding the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to explain this here
#27636 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is a total number of fetch failures, or is it per stage? I'm sure I'll see it more in the code later, but if I don't understand from reading config text then the user won't either.
Without having read the rest of the code, this seems like kind of a weird setting to me. If we are handling decomissioned nodes then why do we need this. Is it in case the entire cluster is being decommissioned?

If we keep this we may want to combined the names here, there was thread on dev about making to many .component. type names.

ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold")
.doc("Threshold of number of times fetchfailed ignored due to node" +
"decommission.This is configurable as per the need of the user and" +
"depending upon type of the cloud. If we keep this a large value and " +
"there is continuous decommission of nodes, in those scenarios stage" +
"will never abort and keeps on retrying in an unbounded manner.")
.version("3.1.0")
.intConf
.createWithDefault(8)

private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT =
ConfigBuilder("spark.graceful.decommission.executor.leasetimePct")
.doc("Percentage of time to expiry after which executors are killed " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this mean. percentage of what time to expiry? We get a notification that node being decommissioned and I assume this is how long to let executor run before killing it, but it doesn't say what this is a percentage of

"(if enabled) on the node. Value ranges between (0-100)")
.version("3.1.0")
.intConf
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
.createWithDefault(50) // Pulled out of thin air.

private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we exit at a different time? I know we probably want to keep the blocks as long as possible, but if any process is running the node can't gracefully shutdown so its seems strange to have these occur separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This for minimising the recompute of the shuffle data and maximising the use of already generated shuffle data. Also tried to explain this in this comment #27636 (comment)

ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct")
.doc("Percentage of time to expiry after which shuffle data " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here we should describe what this is a percentage of.
I assume this has to be >= so that executors don't write more shuffle data.
It might be better just to fail if this is incorrectly set assuming we can check it early enough. I guess I'll see more code later

"cleaned up (if enabled) on the node. Value ranges between (0-100)" +
"This value is always greater than or equal to executor" +
"leaseTime (is set to be equal if incorrectly configured)." +
"Near 0% would mean generated data is marked as lost too early." +
"Too close to 100 would shuffle data may not get cleared proactively" +
"leading to tasks going into fetchFail scenarios")
.version("3.1.0")
.intConf
.checkValue(v => v >= 0 && v < 100, "The percentage should be positive.")
.createWithDefault(90) // Pulled out of thin air.

private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config used in the PR? What's its intended relationship with ABOUT_TO_BE_LOST_NODE_INTERVAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the information about this conf in the design doc.

Copy link
Contributor

@tgravescs tgravescs Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't looked at design doc, but same question - I only see this used in test? If that is the case I think we atleast need a comment about it. I think I have same concern here as above, I don't really know what this config means as well. what is the min time to termination?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in the DecommissionTracker.scala

private val minDecommissionTime =
    conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, the file in the diffs must have been minimized at the time

ConfigBuilder("spark.graceful.decommission.min.termination.time")
.doc("Minimum time to termination below which node decommissioning is performed " +
"immediately. If decommissioning time is less than the " +
"configured time(spark.graceful.decommission.min.termination.time)," +
"than in that scenario the executor decommissioning and shuffle data clean up will " +
"take place immediately.First the executor decommission than the " +
"shuffle data clean up.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("60s")

private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT =
ConfigBuilder("spark.graceful.decommission.node.timeout")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get the spot loss interaction here, the AM/RM won't know I think and we'd see more of like the K8s one happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the decommission timeout for hadoop-3.1 and later version of hadoop, so we can use that value to decide when the node is decommissioned.
Whereas for lower version of hadoop(hadoop-2.8) there is no decommissionTimeout for decommissioning nodes in those scenario we already knew from our experience that in AWS spotloss nodes will stay for 2 min and GCP preemptible VM will stay for 30 sec after receiving the node decommissioning from hadoop end.

This config is added here to make decommissioning of nodes to work with multiple version of hadoop,

Please find the logic used in YarnAllocator.scala to decide the timeout of the node

if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) {
          // In hadoop 2.7 there is no support getDecommissioningTimeout whereas
          // In hadoop 3.1 and later version of hadoop there is support
          // of getDecommissioningTimeout So the method call made using reflection
          // to update the value nodeTerminationTime and for lower version of hadoop2.7
          // use the config spark.graceful.decommission.node.timeout which is specific to cloud
          var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000
          try {
              val decommiossioningTimeout = x.getClass.getMethod(
                "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
              if (decommiossioningTimeout != null) {
                nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
              }
          } catch {
            case e: NoSuchMethodException => logDebug(e.toString)
          }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happens with this in hadoop 3.1 and greater? is it ignored if a timeout is specified from yarn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For hadoop3.1 and later version of hadoop there is an interface to get the value of decommissioning timeout method name - getDecommissioningTimeout. So we have used that here in the code

val decommiossioningTimeout = x.getClass.getMethod(
                "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer]
              if (decommiossioningTimeout != null) {
                nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000
              }

Since we are getting the value of decommiossioningTimeout from the RM in that scenario we will be using that value otherwise we use the value specified in the config.

And also if someone has backported the hadoop3.1 change to lower version of hadoop2.8 etc , For them also they can use decommiossioningTimeout instead of using config GRACEFUL_DECOMMISSION_NODE_TIMEOUT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so we need to update description to say when it applies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the description

.doc("Interval in seconds after which the node is decommissioned in case aws spotloss" +
"the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" +
"this config can be changed according to node type in the public cloud. This will" +
"be applied if the decommission timeout is not sent by the Resource Manager")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("110s")
}
63 changes: 57 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ private[spark] class DAGScheduler(
private val maxFailureNumTasksCheck = sc.getConf
.get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)

/**
* Threshold to try number of times the ignore the fetch failed
* due to decommissioning of nodes
*/
private val maxIgnoredFailedStageAttempts = sc.getConf
.get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD)

private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

Expand Down Expand Up @@ -289,6 +296,13 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(WorkerRemoved(workerId, host, message))
}

/**
* Called by DecommissionTracker when node is decommissioned
*/
def nodeDecommissioned(host: String): Unit = {
eventProcessLoop.post(NodeDecommissioned(host))
}

/**
* Called by TaskScheduler implementation when a host is added.
*/
Expand Down Expand Up @@ -1617,10 +1631,27 @@ private[spark] class DAGScheduler(
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
} else {
// Gracefully handling the stage abort due to fetch failure in the
// decommission nodes
if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) {
// Ignore stage attempts due to fetch failed only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think comment should specifically say for decommission tracking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// once per attempt due to nodes decommissioning event
if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) {
failedStage.ignoredDecommissionFailedStage += 1
DecommissionTracker.incrFetchFailIgnoreCnt()

logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" +
s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" +
s""""totalIgnoredAttempts":"${failedStage.ignoredDecommissionFailedStage}",""" +
s""""node":"$bmAddress"}""")
}
}
failedStage.failedAttemptIds.add(task.stageAttemptId)
val shouldAbortStage =
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
val shouldAbortStage = failedStage.failedAttemptIds.size >=
(maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) ||
disallowStageRetryForTest ||
failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts


// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is
Expand Down Expand Up @@ -1661,6 +1692,10 @@ private[spark] class DAGScheduler(
}

if (shouldAbortStage) {
if (failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts
&& DecommissionTracker.isDecommissionEnabled(sc.getConf)) {
DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true)
}
val abortMessage = if (disallowStageRetryForTest) {
"Fetch failure will not retry stage due to testing config"
} else {
Expand Down Expand Up @@ -1823,9 +1858,10 @@ private[spark] class DAGScheduler(
failedStage.failedAttemptIds.add(task.stageAttemptId)
// TODO Refactor the failure handling logic to combine similar code with that of
// FetchFailed.
val shouldAbortStage =
failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
disallowStageRetryForTest
val shouldAbortStage = failedStage.failedAttemptIds.size >=
(maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) ||
disallowStageRetryForTest ||
failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts

if (shouldAbortStage) {
val abortMessage = if (disallowStageRetryForTest) {
Expand Down Expand Up @@ -1980,6 +2016,18 @@ private[spark] class DAGScheduler(
clearCacheLocs()
}

/**
* Remove shuffle data mapping when node is decomissioned.
*
* @param host host of the node that is decommissioned
*/
private[scheduler] def handleNodeDecommissioned(host: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we remove the shuffle files on decommissioning then we're essentially forcing a recompute even if the files are still present and the node hasn't gone away which doesn't seem ideal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have tried to answer this query in this comment #27636 (comment)

logInfo(s"Marking shuffle files lost on the decommissioning host $host")
mapOutputTracker.removeOutputsOnHost(host)
clearCacheLocs()
}


private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
Expand Down Expand Up @@ -2281,6 +2329,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case WorkerRemoved(workerId, host, message) =>
dagScheduler.handleWorkerRemoved(workerId, host, message)

case NodeDecommissioned(host) =>
dagScheduler.handleNodeDecommissioned(host)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR
private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
extends DAGSchedulerEvent

private[scheduler] case class NodeDecommissioned(host: String)
extends DAGSchedulerEvent

private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
extends DAGSchedulerEvent
Expand Down
Loading