From 0ffdcbbb75e85646d165b8a9355fbe784eae6b20 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 18 Feb 2020 14:53:26 +0530 Subject: [PATCH 01/11] Adding the code change for the Decommission tracker of the node (cherry picked from commit a6ff57a9772afb8d6b078184a785117e36ba75d9) --- .../org/apache/spark/TaskEndReason.scala | 8 + .../spark/internal/config/package.scala | 52 +++ .../apache/spark/scheduler/DAGScheduler.scala | 61 ++- .../spark/scheduler/DAGSchedulerEvent.scala | 3 + .../spark/scheduler/DecommissionTracker.scala | 412 ++++++++++++++++++ .../org/apache/spark/scheduler/Stage.scala | 7 + .../apache/spark/scheduler/StageInfo.scala | 6 + .../spark/scheduler/TaskSchedulerImpl.scala | 28 +- .../spark/scheduler/TaskSetManager.scala | 14 + .../spark/scheduler/cluster/ClusterInfo.scala | 94 ++++ .../cluster/CoarseGrainedClusterMessage.scala | 18 +- .../CoarseGrainedSchedulerBackend.scala | 62 ++- .../apache/spark/HeartbeatReceiverSuite.scala | 1 + .../scheduler/DecommissionTrackerSuite.scala | 266 +++++++++++ .../scheduler/TaskSchedulerImplSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 3 +- .../spark/deploy/yarn/YarnAllocator.scala | 78 +++- .../cluster/YarnSchedulerBackend.scala | 73 +++- 18 files changed, 1172 insertions(+), 18 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index b13028f868072..c69f1d7ae94c3 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -61,6 +61,14 @@ sealed trait TaskFailedReason extends TaskEndReason { * on was killed. */ def countTowardsTaskFailures: Boolean = true + + /** + * Whether this task failure should be counted towards the maximum number of times the stage is + * allowed to fail before the stage is aborted. Set to false in cases where the task's failure + * was unrelated to the task; for example, if the task failed because fetch failed exception + * from the decommissioned node. + */ + var countTowardsStageFailures: Boolean = true } /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ee437c696b47e..f0c61ca6ff49e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1873,4 +1873,56 @@ package object config { .version("3.1.0") .booleanConf .createWithDefault(false) + + private[spark] val GRACEFUL_DECOMMISSION_ENABLE = + ConfigBuilder("spark.graceful.decommission.enable") + .doc("Whether to enable the node graceful decommissioning handling") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = + ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") + .doc("Threshold of number of times fetchfailed ignored due to node" + + " decommission.This is configurable as per the need of the user and" + + " depending upon type of the cloud. If we keep this a large value and " + + " there is continuous decommission of nodes, in those scenarios stage" + + " will never abort and keeps on retrying in an unbounded manner.") + .version("3.1.0") + .intConf + .createWithDefault(8) + + private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT = + ConfigBuilder("spark.graceful.decommission.executor.leasetimePct") + .doc("Percentage of time to expiry after which executors are killed " + + "(if enabled) on the node. Value ranges between (0-100)") + .version("3.1.0") + .intConf + .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") + .createWithDefault(50) // Pulled out of thin air. + + private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = + ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") + .doc("Percentage of time to expiry after which shuffle data " + + "cleaned up (if enabled) on the node. Value ranges between (0-100)") + .version("3.1.0") + .intConf + .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") + .createWithDefault(90) // Pulled out of thin air. + + private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = + ConfigBuilder("spark.graceful.decommission.min.termination.time") + .doc("Minimum time to termination below which node decommissioning is performed immediately") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("60s") + + private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT = + ConfigBuilder("spark.graceful.decommission.node.timeout") + .doc("Interval in seconds after which the node is decommissioned in case aws spotloss" + + "the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" + + "this config can be changed according to node type in the public cloud") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("110s") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 37f9e0bb483c2..9c49b8bc252b0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -222,6 +222,13 @@ private[spark] class DAGScheduler( private val maxFailureNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) + /** + * Threshold to try number of times the ignore the fetch failed + * due to decommissioning of nodes + */ + private val maxIgnoredFailedStageAttempts = sc.getConf + .get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) + private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") @@ -289,6 +296,13 @@ private[spark] class DAGScheduler( eventProcessLoop.post(WorkerRemoved(workerId, host, message)) } + /** + * Called by DecommissionTracker when node is decommissioned + */ + def nodeDecommissioned(host: String): Unit = { + eventProcessLoop.post(NodeDecommissioned(host)) + } + /** * Called by TaskScheduler implementation when a host is added. */ @@ -1617,10 +1631,25 @@ private[spark] class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { + if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { + // Ignore stage attempts due to fetch failed only + // once per attempt + if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { + failedStage.ignoredFailedStageAttempts += 1 + DecommissionTracker.incrFetchFailIgnoreCnt() + failedStage.latestInfo.stageFailureIgnored(true) + } + logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + + s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + + s""""totalIgnoredAttempts":"${failedStage.ignoredFailedStageAttempts}",""" + + s""""node":"$bmAddress"}""") + } failedStage.failedAttemptIds.add(task.stageAttemptId) - val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + val shouldAbortStage = failedStage.failedAttemptIds.size >= + (maxConsecutiveStageAttempts + failedStage.ignoredFailedStageAttempts) || + disallowStageRetryForTest || + failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts + // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is @@ -1661,6 +1690,10 @@ private[spark] class DAGScheduler( } if (shouldAbortStage) { + if (failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts + && DecommissionTracker.isDecommissionEnabled(sc.getConf)) { + DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true) + } val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { @@ -1823,9 +1856,10 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) // TODO Refactor the failure handling logic to combine similar code with that of // FetchFailed. - val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + val shouldAbortStage = failedStage.failedAttemptIds.size >= + (maxConsecutiveStageAttempts + failedStage.ignoredFailedStageAttempts) || + disallowStageRetryForTest || + failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { @@ -1980,6 +2014,18 @@ private[spark] class DAGScheduler( clearCacheLocs() } + /** + * Remove shuffle data mapping when node is decomissioned. + * + * @param host host of the node that is decommissioned + */ + private[scheduler] def handleNodeDecommissioned(host: String) { + logInfo(s"Marking shuffle files lost on the decommissioning host $host") + mapOutputTracker.removeOutputsOnHost(host) + clearCacheLocs() + } + + private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { @@ -2281,6 +2327,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) + case NodeDecommissioned(host) => + dagScheduler.handleNodeDecommissioned(host) + case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 78d458338e8fb..faa3ef3142363 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -88,6 +88,9 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String) extends DAGSchedulerEvent +private[scheduler] case class NodeDecommissioned(host: String) + extends DAGSchedulerEvent + private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala new file mode 100644 index 0000000000000..b76df9ad346c2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.HashMap + +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +/** + * DecommissionTracker tracks the list of decommissioned nodes. + * + */ +private[scheduler] class DecommissionTracker ( + conf: SparkConf, + executorAllocClient: Option[ExecutorAllocationClient], + dagScheduler: Option[DAGScheduler], + clock: Clock = new SystemClock()) extends Logging { + + def this(sc: SparkContext, + client: Option[ExecutorAllocationClient], + dagScheduler: Option[DAGScheduler]) = { + this(sc.conf, client, dagScheduler) + } + + // Decommission thread of node decommissioning!! + private val decommissionThread = + ThreadUtils.newDaemonThreadPoolScheduledExecutor("node-decommissioning-thread", 20) + + // Contains workers hostname which are decommissioning. Added when spot-loss or + // graceful decommissioning event arrives from the AM. And is removed when the + // last node (identified by nodeId) is running again. + private val decommissionHostnameMap = new HashMap[String, NodeDecommissionInfo] + + private val minDecommissionTime = + conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC) + + private val executorDecommissionLeasePct = + conf.get(config.GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT) + + private val shuffleDataDecommissionLeasePct = + conf.get(config.GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT) + + /* + * Is the node decommissioned i.e from driver point of + * view the node is considered decommissioned. + */ + def isNodeDecommissioned(hostname: String): Boolean = synchronized { + decommissionHostnameMap.get(hostname) match { + case None => false + case Some(info) => + return info.state == NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED || + info.state == NodeDecommissionState.TERMINATED + } + } + + /* + * Is the node decommissioning i.e from driver point of + * view the node is candidate for decommissioning. + * Not necessarily decommissioned or terminated + */ + def isNodeDecommissioning(hostname: String): Boolean = synchronized { + decommissionHostnameMap.contains(hostname) + } + + /** + * visible only for Unit Test + */ + def getDecommissionedNodeState(hostname: + String): Option[NodeDecommissionState.Value] = synchronized { + decommissionHostnameMap.get(hostname) match { + case Some(info) => Some(info.state) + case _ => None + } + } + + def addNodeToDecommission(hostname: String, terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = synchronized { + + val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTimeMs) + val curTimeMs = clock.getTimeMillis() + + if (terminationTimeMs <= curTimeMs) { + // Ignoring the decommission request if termination + // time is less than or same as the current time + logWarning(s"Ignoring decommissioning request for host $hostname as" + + s" terminationTimeMs ${terminationTimeMs} is less" + + s" than current time ${curTimeMs}") + return + } + + // Consider node is picked up for nodeRotation it will be marked for + // Graceful Decommission with termination time of -1. + // But it is possible that it may then be genuinely nodeLoss. + // In those case override needs to be allowed. + // Override decommissionHostnameMap in case termination time is less than + // existing the terminationTime in decommissionHostnameMap. + if (decommissionHostnameMap.contains(hostname)) { + val nodeDecommissionInfo = decommissionHostnameMap(hostname) + // There will be no duplicate entry of terminationTimeMs in decommissionHostnameMap + // since the terminationTime is updated only when it is less than the existing termination + // time in decommissionHostnameMap + if (decommissionHostnameMap(hostname).terminationTime <= terminationTimeMs) { + logDebug( + s"""Ignoring decommissioning """ + + s""" request : {"node":"$hostname","reason":"${reason.message}",terminationTime"""" + + s""":"$tDateTime"} current : {"node":"$hostname",$nodeDecommissionInfo}""") + return + } else { + logInfo(s"Updating the termination time to :${terminationTimeMs} in " + + s"decommission tracker for the hostname ${hostname}") + } + } + + val delay = terminationTimeMs - curTimeMs + + var executorDecommissionTimeMs = terminationTimeMs + var shuffleDataDecommissionTimeMs = terminationTimeMs + + // if delay is less than a minDecommissionTime than decommission immediately + if (terminationTimeMs - curTimeMs < minDecommissionTime * 1000) { + executorDecommissionTimeMs = curTimeMs + // Added the delay of 1 second in case of delay is less than a minute + // Since we want executor to be decommissioned first + // than after that shuffleDataDecommission + shuffleDataDecommissionTimeMs = curTimeMs + 1000 + } else { + reason match { + case SpotRotationLoss | NodeLoss => + // In Spot block Rotation loss and SpotLoss case adjust termination time so + // that enough buffer to real termination is available for job to finish + // consuming shuffle data. + executorDecommissionTimeMs = (delay * executorDecommissionLeasePct) / 100 + curTimeMs + shuffleDataDecommissionTimeMs = + (delay * shuffleDataDecommissionLeasePct) / 100 + curTimeMs + case _ => + // No action + } + + if (executorDecommissionTimeMs > shuffleDataDecommissionTimeMs) { + executorDecommissionTimeMs = shuffleDataDecommissionTimeMs + logInfo(s"""Executor decommission time $executorDecommissionTimeMs needs to be less""" + + s""" than shuffle data decommission time $shuffleDataDecommissionTimeMs. Setting it """ + + s""" to shuffle data decommission time.""") + } + } + + // Count of executors/worker which went to decommissioning + // state over lifetime of application + DecommissionTracker.incrNodeDecommissionCnt() + val nodeDecommissionInfo = new NodeDecommissionInfo( + executorDecommissionTime = executorDecommissionTimeMs, + shuffleDataDecommissionTime = shuffleDataDecommissionTimeMs, + terminationTime = terminationTimeMs, + reason = reason, + state = NodeDecommissionState.DECOMMISSIONING) + + logInfo(s"""Adding decommissioning""" + + s""" request : {"node":"$hostname",$nodeDecommissionInfo} """) + + // Add node to the list of decommissioning nodes. + decommissionHostnameMap.put(hostname, nodeDecommissionInfo) + + // Schedule executor decommission + decommissionThread.schedule(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + executorDecommission(hostname, nodeDecommissionInfo) + } + }, executorDecommissionTimeMs - curTimeMs, TimeUnit.MILLISECONDS) + + // Schedule shuffle decommission + decommissionThread.schedule(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + removeShuffleData(hostname, nodeDecommissionInfo) + } + }, shuffleDataDecommissionTimeMs - curTimeMs, TimeUnit.MILLISECONDS) + } + + def removeNodeToDecommission(hostname: String): Unit = synchronized { + if (!decommissionHostnameMap.contains(hostname)) { + return + } + + val nodeDecommissionInfo = decommissionHostnameMap(hostname) + logInfo(s"""Removing decommissioning""" + + s""" request : {"node":"$hostname",$nodeDecommissionInfo}""") + decommissionHostnameMap -= hostname + } + + def updateNodeToDecommissionSetTerminate(hostname: String): Unit = synchronized { + terminate(hostname) + } + + private def executorDecommission(hostname: String, + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + // Scenario where nodeLoss terminated the node + // for the Graceful Decommission node. + // If the node is already terminated and hostname is re-used in that scenario + // no need to kill the executor on that host + if (! decommissionHostnameMap.contains(hostname)) { + logInfo(s"""Node $hostname not found in decommisssionTrackerList while""" + + """performing executor decommission""") + return + } + // if the terminationTime in the thread is not equal to + // terminationTime in decommissionHostnameMap for that + // host than Ignore the ExecutorDecommission + if (decommissionHostnameMap(hostname).terminationTime + != nodeDecommissionInfo.terminationTime) { + logInfo(s"Ignoring ExecutorDecommission for hostname ${hostname}," + + s" since node is already terminated") + return + } + + // Kill executor if there still are some running. This call is + // async does not wait for response. Otherwise it may cause + // deadlock between schedulerBacked (ExecutorAllocationManager) + // and this. + executorAllocClient.map(_.killExecutorsOnHost(hostname)) + + decommissionHostnameMap(hostname).state = NodeDecommissionState.EXECUTOR_DECOMMISSIONED + + logInfo(s"Node $hostname decommissioned") + + return + } + + private def removeShuffleData(hostname: String, + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + // This for scenario where about_to_be_lost terminated the node + // for the Graceful Decommission node. + // If the node is already terminated and hostname is reused in that scenario + // no need to remove the shuffle entry from map-output tracker + if (! decommissionHostnameMap.contains(hostname)) { + logInfo(s"""Node $hostname not found in decommisssionTrackerList while """ + + """performing shuffle data decommission""") + return + } + // if the terminationTime in the thread is not equal to + // terminationTime in decommissionHostnameMap for that + // host than Ignore the removeShuffleData + if (decommissionHostnameMap(hostname).terminationTime + != nodeDecommissionInfo.terminationTime) { + logInfo(s"Ignoring removeShuffleData for hostname ${hostname}," + + s" since node is already terminated") + return + } + + // Unregister shuffle data. + dagScheduler.map(_.nodeDecommissioned(hostname)) + + decommissionHostnameMap(hostname).state = NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED + + logInfo(s"Node $hostname Shuffle data decommissioned") + + return + } + + private def terminate(hostname: String): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + if (!decommissionHostnameMap.contains(hostname)) { + logWarning(s"Node $hostname not found in decommisssionTrackerList") + return + } + + // Remove all the shuffle data of all the executors for the terminated node + dagScheduler.map(_.nodeDecommissioned(hostname)) + + decommissionHostnameMap(hostname).state = NodeDecommissionState.TERMINATED + + logInfo(s"Node $hostname terminated") + } + + def stop (): Unit = { + val decommissionNodeCnt = DecommissionTracker.getNodeDecommissionCnt() + val fetchFailIgnoreCnt = DecommissionTracker.getFetchFailIgnoreCnt() + val fetchFailIgnoreThresholdExceed = DecommissionTracker.getFetchFailIgnoreCntThresholdFlag() + val fetchFailedIgnoreThreshold = + conf.get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) + decommissionThread.shutdown() + var message = s"Decommission metrics: ${decommissionNodeCnt} nodes decommissioned" + + s" (either due to node loss or node block rotation) while running ${conf.getAppId}." + + s" Ignored ${fetchFailIgnoreCnt} fetch failed exception caused by decommissioned node." + + if (fetchFailIgnoreThresholdExceed) { + message += s"Fetch fail ignore exceeded threshold ${fetchFailedIgnoreThreshold}" + + s" causing stage abort" + } else { + message += s"Fetch fail ignored under allowed threshold ${fetchFailedIgnoreThreshold}" + } + // logging the metrics related to graceful decommission from decommission tracker + logDebug(message) + } + +} + +private[spark] object DecommissionTracker extends Logging { + val infiniteTime = Long.MaxValue + + // Stats + val decommissionNodeCnt = new AtomicInteger(0) + val fetchFailIgnoreCnt = new AtomicInteger(0) + val fetchFailIgnoreCntThresholdExceeded = new AtomicBoolean(false) + val abortStage = new AtomicBoolean(false) + + def incrNodeDecommissionCnt(): Unit = { + decommissionNodeCnt.getAndIncrement() + } + + def incrFetchFailIgnoreCnt(): Unit = { + fetchFailIgnoreCnt.getAndIncrement() + } + + def setFetchFailIgnoreCntThresholdFlag(thresholdExceedFlag: Boolean): Unit = { + fetchFailIgnoreCntThresholdExceeded.set(thresholdExceedFlag) + } + + def setAbortStageFlag(AbortStageFlag: Boolean): Unit = { + abortStage.set(AbortStageFlag) + } + + def getNodeDecommissionCnt(): Int = { + decommissionNodeCnt.get() + } + + def getFetchFailIgnoreCnt(): Int = { + fetchFailIgnoreCnt.get() + } + + def getFetchFailIgnoreCntThresholdFlag(): Boolean = { + fetchFailIgnoreCntThresholdExceeded.get() + } + + def getAbortStageFlag(): Boolean = { + abortStage.get() + } + + def isDecommissionEnabled(conf: SparkConf): Boolean = { + conf.get(config.GRACEFUL_DECOMMISSION_ENABLE) + } +} + +private class NodeDecommissionInfo( + var terminationTime: Long, + var executorDecommissionTime: Long, + var shuffleDataDecommissionTime: Long, + var state: NodeDecommissionState.Value, + var reason: NodeDecommissionReason) { + override def toString(): String = { + val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTime) + val edDateTime = df.format(executorDecommissionTime) + val sdDateTime = df.format(shuffleDataDecommissionTime) + s""""terminationTime":"$tDateTime","reason":"${reason.message}","executorDecommissionTime"""" + + s""":"$edDateTime","shuffleDataDecommissionTime":"$sdDateTime","state":"$state"""" + } +} + +/* + * NB: exposed for testing + */ +private[scheduler] object NodeDecommissionState extends Enumeration { + val DECOMMISSIONING, EXECUTOR_DECOMMISSIONED, SHUFFLEDATA_DECOMMISSIONED, TERMINATED = Value + type NodeDecommissionState = Value +} + +/** + * Represents an explanation for a Node being decommissioned. + * NB: exposed for testing + */ +@DeveloperApi +private[spark] sealed trait NodeDecommissionReason extends Serializable { + def message: String +} + +@DeveloperApi +private[spark] case object NodeLoss extends NodeDecommissionReason { + override def message: String = "nodeLoss" +} + +@DeveloperApi +private[spark] case object SpotRotationLoss extends NodeDecommissionReason { + override def message: String = "nodeRotationLoss" +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index ae7924d66a301..9650159329612 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -91,8 +91,15 @@ private[scheduler] abstract class Stage( */ val failedAttemptIds = new HashSet[Int] + /** + * Number of times the stage failure needs to be ignored. e.g failed due to fetch failed + * exception caused by node decommissioning. + */ + var ignoredFailedStageAttempts = 0 + private[scheduler] def clearFailures() : Unit = { failedAttemptIds.clear() + ignoredFailedStageAttempts = 0 } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 556478d83cf39..30555bc5dd3d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -46,6 +46,8 @@ class StageInfo( var completionTime: Option[Long] = None /** If the stage failed, the reason why. */ var failureReason: Option[String] = None + /** if stage failure ignored */ + var ignoredFailure: Option[Boolean] = None /** * Terminal values of accumulables updated during this stage, including all the user-defined @@ -58,6 +60,10 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def stageFailureIgnored(ignored: Boolean) { + ignoredFailure = Some(ignored) + } + // This would just be the second constructor arg, except we need to maintain this method // with parentheses for compatibility def attemptNumber(): Int = attemptId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2c37fec271766..01cd6ecfb37c3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -92,6 +92,10 @@ private[spark] class TaskSchedulerImpl( // because ExecutorAllocationClient is created after this TaskSchedulerImpl. private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc) + // initializing decommissionTracker + // NB: Public for testing + lazy val decommissionTrackerOpt = maybeCreateDecommissionTracker(sc) + val conf = sc.conf // How often to check for speculative tasks @@ -274,7 +278,7 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, decommissionTrackerOpt) } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { @@ -896,6 +900,7 @@ private[spark] class TaskSchedulerImpl( } starvationTimer.cancel() abortTimer.cancel() + decommissionTrackerOpt.map(_.stop) } override def defaultParallelism(): Int = backend.defaultParallelism() @@ -1156,4 +1161,25 @@ private[spark] object TaskSchedulerImpl { } } + private def maybeCreateDecommissionTracker(sc: SparkContext): Option[DecommissionTracker] = { + if (DecommissionTracker.isDecommissionEnabled(sc.conf)) { + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b) + case _ => None + } + + val dagScheduler: Option[DAGScheduler] = sc.dagScheduler match { + case b: DAGScheduler => Some(b) + case _ => None + } + if (executorAllocClient.isDefined && dagScheduler.isDefined) { + Some(new DecommissionTracker(sc, executorAllocClient, dagScheduler)) + } else { + None + } + } else { + None + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a302f680a272e..305ee0ea24d40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -56,6 +56,7 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, blacklistTracker: Option[BlacklistTracker] = None, + decommissionTracker: Option[DecommissionTracker] = None, clock: Clock = new SystemClock()) extends Schedulable with Logging { private val conf = sched.sc.conf @@ -813,6 +814,19 @@ private[spark] class TaskSetManager( if (fetchFailed.bmAddress != null) { blacklistTracker.foreach(_.updateBlacklistForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) + + // Do account fetch failure exception raised by decommissioned + // node against stage failure. + decommissionTracker match { + case Some(decommissionTracker) => + if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) { + logInfo(s"Do not count fetch failure from decommissioned" + + s" node ${fetchFailed.bmAddress.host}") + fetchFailed.countTowardsStageFailures = false + } + case _ => + // No action + } } None diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala new file mode 100644 index 0000000000000..c1b4ea38f5776 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.text.SimpleDateFormat + +import scala.collection.mutable.{HashMap} + +import org.apache.hadoop.yarn.api.records.{NodeState => YarnNodeState} + +/** + * State of the node. + * Add the node state depending upon the cluster manager, For Yarn + * getYarnNodeState is added to create the node state for Decommission Tracker + */ +private[spark] object NodeState extends Enumeration { + val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value + type NodeState = Value + + // Helper method to get NodeState of the Yarn. + def getYarnNodeState(state: YarnNodeState): NodeState.Value = { + // In hadoop-2.7 there is no support for node state DECOMMISSIONING + // In Hadoop-2.8, hadoop3.1 and later version of spark there is a support + // to node state DECOMMISSIONING. + // Inorder to build the spark using hadoop2 and hadoop3, comparing the value of + // DECOMMISSIONING here and for other state we are matching + // the state and assigning the node state at spark end + if (state.toString.equals(NodeState.DECOMMISSIONING.toString)) { + NodeState.GRACEFUL_DECOMMISSIONING + } else { + state match { + case YarnNodeState.RUNNING => NodeState.RUNNING + case YarnNodeState.DECOMMISSIONED => NodeState.DECOMMISSIONED + case YarnNodeState.LOST => NodeState.LOST + case YarnNodeState.UNHEALTHY => NodeState.LOST + case _ => NodeState.OTHER + } + } + } +} + +/** + * Node information used by CoarseGrainedSchedulerBackend. + * + * @param nodeState node state + * @param terminationTime time at which node will terminate + */ +private[spark] case class NodeInfo(nodeState: NodeState.Value, terminationTime: Option[Long]) + extends Serializable { + override def toString(): String = { + val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTime.getOrElse(0L)) + s""""terminationTime":"$tDateTime","state":"$nodeState"""" + } +} + +/** + * Cluster status information used by CoarseGrainedSchedulerBackend. + * + * @param nodeInfos Information about node about to be decommissioned + * resource in the cluster. + */ +private[spark] case class ClusterInfo( + nodeInfos: HashMap[String, NodeInfo]) + extends Serializable { + override def toString(): String = { + + var nodeInfoStr = "" + nodeInfos.foreach { + case (k, v) => + nodeInfoStr = nodeInfoStr + s"""{"node":"$k",$v}""" + } + if (nodeInfoStr == "") { + s"""{"nodeInfos":{}}""" + } else { + s"""{"nodeInfos":$nodeInfoStr}""" + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 465c0d20de481..8eb5ac4d9fab9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorLossReason +import org.apache.spark.scheduler.{ExecutorLossReason, NodeDecommissionReason} import org.apache.spark.util.SerializableBuffer private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable @@ -96,6 +96,17 @@ private[spark] object CoarseGrainedClusterMessages { case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class NodeLossNotification(nodeIds: Seq[String]) + + case class AddNodeToDecommission(hostname: String, decommissionTimeInSeconds: Long, + reason: NodeDecommissionReason) extends CoarseGrainedClusterMessage + + case class RemoveNodeToDecommission(hostname: String) + extends CoarseGrainedClusterMessage + + case class UpdateNodeToDecommissionSetTerminate(hostname: String) + extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage @@ -106,6 +117,11 @@ private[spark] object CoarseGrainedClusterMessages { filterName: String, filterParams: Map[String, String], proxyBase: String) extends CoarseGrainedClusterMessage + // cluster info update notification + case class ClusterInfoUpdate(clusterInfo: ClusterInfo) + extends CoarseGrainedClusterMessage + + // Messages exchanged between the driver and the cluster manager for executor allocation // In Yarn mode, these are exchanged between the driver and the AM diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5f9593c..9584db8556c62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -57,6 +57,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Total number of executors that are currently registered protected val totalRegisteredExecutors = new AtomicInteger(0) protected val conf = scheduler.sc.conf + // Invoked lazily to prevent creating the DecommissionTracker before + // ExecutorAllocationClient and DAGScheduler + protected lazy val decommissionTracker = scheduler.decommissionTrackerOpt private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) // Submit tasks only after (registered resources / total expected resources) @@ -203,6 +206,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp data.freeCores = data.totalCores } makeOffers(executorId) + + case AddNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) => + decommissionTracker.foreach( + _.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason)) + + case RemoveNodeToDecommission(hostname) => + decommissionTracker.foreach(_.removeNodeToDecommission(hostname)) + + case UpdateNodeToDecommissionSetTerminate(hostname) => + decommissionTracker.foreach(_.updateNodeToDecommissionSetTerminate(hostname)) + case e => logError(s"Received unexpected message. ${e}") } @@ -220,6 +234,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) + } else if (isNodeDecommissioning(hostname)) { + // Refuse any new executor registered from the decommissioning worker. Can only happen + // in case of spot loss nodes about to be lost. For nodes gracefuly decommissioning this + // won't happen. + logInfo(s"Rejecting executor:$executorId registration on decommissioning node:$hostname") + context.sendFailure(new IllegalStateException( + s"Executor host is decommissioning: $executorId")) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -295,6 +316,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(isExecutorActive) + .filter(x => !isNodeDecommissioning(x._2.executorHost)) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, @@ -322,9 +344,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def makeOffers(executorId: String): Unit = { // Make sure no executor is killed while some task is launching on it val taskDescs = withLock { + val executorData = executorDataMap(executorId) // Filter out executors under killing - if (isExecutorActive(executorId)) { - val executorData = executorDataMap(executorId) + // add the filter to check executor doesnot belong to + // decommission nodes + if (isExecutorActive(executorId) && !isNodeDecommissioning(executorData.executorHost)) { val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort), @@ -851,6 +875,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def isBlacklisted(executorId: String, hostname: String): Boolean = false + /** + * Add nodes to decommission list, time in ms at which node will be decommissioned. + * NB: Public for testing + * + * @param hostname hostname of worker to be decommissioned. + * @param terminationTimeMs time after which node will be forcefully terminated. + */ + protected def addNodeToDecommission(hostname: String, terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = { None } + // SPARK-27112: We need to ensure that there is ordering of lock acquisition // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix // the deadlock issue exposed in SPARK-27112 @@ -858,6 +892,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp CoarseGrainedSchedulerBackend.this.synchronized { fn } } + /** + * Remover worker from the decommission map. + * NB: Public for testing + * + * @hostname hostname of worker to be removed from decommissioned list. + */ + def removeNodeToDecommission(hostname: String): Unit = { None } + + /** + * Mark node as terminated. + * NB: Public for testing + * + * @hostname hostname of worker to be marked as terminated. + */ + def updateNodeToDecommissionSetTerminate(hostname: String): Unit = { None } + + + private def isNodeDecommissioning(hostname: String): Boolean = { + decommissionTracker match { + case None => return false + case Some(decommissionTracker) => return decommissionTracker.isNodeDecommissioning(hostname) + } + } + } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 312691302b064..945a74fabc87f 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -77,6 +77,7 @@ class HeartbeatReceiverSuite scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]()) + when(scheduler.decommissionTrackerOpt).thenReturn(None) when(scheduler.sc).thenReturn(sc) heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala new file mode 100644 index 0000000000000..2eb8157fde47e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class DecommissionTrackerSuite extends SparkFunSuite + with BeforeAndAfter with BeforeAndAfterEach with MockitoSugar + with LocalSparkContext { + + private var decommissionTracker: DecommissionTracker = _ + private var dagScheduler : DAGScheduler = _ + private var executorAllocationClient : ExecutorAllocationClient = _ + private var conf: SparkConf = _ + private var currentDecommissionNode = "" + private var currentShuffleDecommissionNode = "" + + override def beforeAll(): Unit = { + conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.GRACEFUL_DECOMMISSION_ENABLE.key, "true") + .set(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC.key, "8s") + + executorAllocationClient = mock[ExecutorAllocationClient] + dagScheduler = mock[DAGScheduler] + } + + override def afterAll(): Unit = { + if (decommissionTracker != null) { + decommissionTracker = null + } + } + + before { + val clock = new ManualClock(0) + clock.setTime(0) + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), Some(dagScheduler), clock = clock) + + when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + when(dagScheduler.nodeDecommissioned(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentShuffleDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + } + + after { + decommissionTracker.removeNodeToDecommission("hostA") + decommissionTracker.removeNodeToDecommission("hostB") + super.afterEach() + } + + test("Check Node Decommission state") { + val clock = new ManualClock(0) + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), Some(dagScheduler), clock = clock) + + when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + when(dagScheduler.nodeDecommissioned(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentShuffleDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + assert(decommissionTracker.isNodeDecommissioning("hostA")) + + // Wait for executor decommission give 100ms over 50% + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for and terminate + Thread.sleep(2000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recomission + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodeDecommissioning("hostA")) + } + + test("Check the Multiple Node Decommission state") { + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + decommissionTracker.addNodeToDecommission("hostB", 30000, SpotRotationLoss) + assert(decommissionTracker.isNodeDecommissioning("hostA")) + assert(decommissionTracker.isNodeDecommissioning("hostB")) + + // Wait for hostA executor decommission + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for hostA shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for hostA termination and trigger termination + Thread.sleep(1000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission hostA + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodeDecommissioning("hostA")) + + // Wait for hostB executor decommission + Thread.sleep(5000) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostB") + + // Wait for hostB shuffledata decommission + Thread.sleep(12000) + assert(decommissionTracker.isNodeDecommissioned("hostB")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostB") + + // Wait for hostB termination and trigger termination + Thread.sleep(3000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostB") + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission hostB + decommissionTracker.removeNodeToDecommission("hostB") + assert(!decommissionTracker.isNodeDecommissioning("hostB")) + } + + test("Check Multiple Node Decommission state at same time") { + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + decommissionTracker.addNodeToDecommission("hostB", 10000, NodeLoss) + assert(decommissionTracker.isNodeDecommissioning("hostA")) + assert(decommissionTracker.isNodeDecommissioning("hostB")) + + // Wait for both hostA hostB executor decommission + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + + // Wait for both hostA hostB shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(decommissionTracker.isNodeDecommissioned("hostB")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + + // Wait for both node termination and trigger termination + Thread.sleep(2000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostB") + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission both + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodeDecommissioning("hostA")) + decommissionTracker.removeNodeToDecommission("hostB") + assert(!decommissionTracker.isNodeDecommissioning("hostB")) + } + + test("Check Node decommissioning with minimum termination time config") { + // Less than MIN_TERMINATION_TIME + decommissionTracker.addNodeToDecommission("hostA", 5000, NodeLoss) + // sleep for 1 sec which is lag between executor and shuffle + // decommission. + Thread.sleep(1100) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + } + + test("Check Node Decommissioning with non-default executor and shuffle data lease time config") { + + val clock = new ManualClock(0) + clock.setTime(0) + // override and recreated + conf = conf + .set(config.GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT.key, "20") + .set(config.GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT.key, "50") + + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), Some(dagScheduler), clock = clock) + + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + assert(decommissionTracker.isNodeDecommissioning("hostA")) + + // Wait for hostA executor decommission at 20% + Thread.sleep(2100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for hostA shuffle data decommission at 50% + Thread.sleep(3000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for terminate and trigger termination + Thread.sleep(5000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodeDecommissioning("hostA")) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a75bae56229b4..f16ac07bf9f95 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1337,7 +1337,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B offers } override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } } // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. @@ -1377,7 +1377,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock() val taskScheduler = new TaskSchedulerImpl(sc) { override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } } // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e4aad58d25064..777b6d6ec3234 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -493,7 +493,8 @@ class TaskSetManagerSuite // within the taskset. val mockListenerBus = mock(classOf[LiveListenerBus]) val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, None, clock)) - val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) + val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, + clock = clock) { val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index cd0e7d5c87bc8..024c13217b661 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -24,8 +24,10 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.util.{Failure, Success} import scala.util.control.NonFatal +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -42,9 +44,8 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId -import org.apache.spark.scheduler.cluster.SchedulerBackendUtils +import org.apache.spark.scheduler.cluster.{ClusterInfo, NodeInfo, NodeState, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{ClusterInfoUpdate, NodeLossNotification, RemoveExecutor, RetrieveLastAllocatedExecutorId} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -162,6 +163,17 @@ private[yarn] class YarnAllocator( private val allocatorBlacklistTracker = new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker) + // Cluster info is information about cluster passed from AM -> Driver. + // we keep it cached on the AM side till it is sent to driver and is + // cleared once successfully sent. On failure, the failed message is + // sent in next cycle. + // Visible for testing + private val currentClusterInfo = ClusterInfo(new HashMap[String, NodeInfo]) + + // Interval in seconds after which the node is decommissioned after this time node + // is not available to use. + private val nodeLossInterval = sparkConf.get(GRACEFUL_DECOMMISSION_NODE_TIMEOUT) + // Executor memory in MiB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Executor offHeap memory in MiB. @@ -436,6 +448,50 @@ private[yarn] class YarnAllocator( logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, getNumExecutorsRunning)) } + + // If the flags is enabled than GRACEFUL_DECOMMISSION_ENABLE + // than handling the Node loss scenario using the decommission tracker. + if (sparkConf.get(GRACEFUL_DECOMMISSION_ENABLE)) { + processGracefulDecommission(allocateResponse) + } + } + + def processGracefulDecommission(allocateResponse: AllocateResponse): Unit = { + // Create a consolidated node decommission info report. + val nodeInfos = new HashMap[String, NodeInfo] + + // node with updated information. + val getUpdatedNodes = allocateResponse.getUpdatedNodes() + if (getUpdatedNodes != null) { + val updatedNodes = getUpdatedNodes.asScala + for (x <- updatedNodes) { + if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) { + // In hadoop 2.7 there is no support getDecommissioningTimeout whereas + // In hadoop 3.1 and later version of hadoop there is support + // of getDecommissioningTimeout So the method call made using reflection + // to update the value nodeTerminationTime and for lower version of hadoop2.7 + // use the config spark.graceful.decommission.node.timeout which is specific to cloud + var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000 + try { + val decommiossioningTimeout = x.getClass.getMethod( + "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer] + if (decommiossioningTimeout != null) { + nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000 + } + } catch { + case e: NoSuchMethodException => logDebug(e.toString) + } + nodeInfos(x.getNodeId().getHost()) + = NodeInfo(terminationTime = Some(nodeTerminationTime), + nodeState = NodeState.getYarnNodeState(x.getNodeState())) + } else { + nodeInfos(x.getNodeId().getHost()) + = NodeInfo(terminationTime = None, + nodeState = NodeState.getYarnNodeState(x.getNodeState())) + } + } + } + processClusterInfo(ClusterInfo(nodeInfos = nodeInfos)) } /** @@ -896,6 +952,22 @@ private[yarn] class YarnAllocator( } } + private[yarn] def processClusterInfo(clusterInfo: ClusterInfo): Unit = { + + // Update cached currentClusterInfo. + clusterInfo.nodeInfos.foreach{case(k, v) => currentClusterInfo.nodeInfos(k) = v} + + logDebug(s"clusterInfoUpdate: full sync $currentClusterInfo") + + driverRef.ask[Boolean](ClusterInfoUpdate(currentClusterInfo)).andThen { + case Success(b) => currentClusterInfo.nodeInfos.clear() // Clear cached data + case Failure(f) => logInfo(s"clusterInfoUpdate: sync failed ($f)." + + s" Will be synced in next cycle") + }(ThreadUtils.sameThread) + // Since the time taken to complete is small , so used the single thread here. + } + + /** * Register that some RpcCallContext has asked the AM why the executor was lost. Note that * we can only find the loss reason to send back in the next call to allocateResources(). diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index e428bab4f96f3..1cc9e24ff7834 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import java.util.EnumSet -import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent.atomic.AtomicBoolean import javax.servlet.DispatcherType import scala.concurrent.ExecutionContext.Implicits.global @@ -36,8 +36,9 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.NodeDecommissionReason import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{Clock, RpcUtils, SystemClock, ThreadUtils} /** * Abstract Yarn scheduler backend that contains common logic @@ -210,6 +211,70 @@ private[spark] abstract class YarnSchedulerBackend( Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) } + /** + * Process cluster info update recieved from AM + * + */ + protected def processClusterInfoUpdate(clusterInfo: ClusterInfo): Boolean = { + + // NB: In case of clusterInfoFullSync nodes decommissioning info may be + // received multiple times but that is not an issue as long as information + // is correct. Actions here are idempotent. + + clusterInfo.nodeInfos.foreach { case (node, nodeInfo) => + nodeInfo.nodeState match { + + case NodeState.GRACEFUL_DECOMMISSIONING => + if (nodeInfo.terminationTime.isDefined && nodeInfo.terminationTime.get > 0) { + // eg. Spot Loss/ preemptible VMs loss case + addNodeToDecommission(node, nodeInfo.terminationTime.get, NodeLoss) + } + + case NodeState.RUNNING => + // If node reused (identified by hostname), + // This is the scenario of hostname reused + removeNodeToDecommission(node) + + case NodeState.DECOMMISSIONED | NodeState.LOST => + updateNodeToDecommissionSetTerminate(node) + + // NodeState.OTHER + case _ => + // ignore other states + } + } + true + } + + /** + * Add nodes to decommission list, time at which node will be terminated. + */ + override def addNodeToDecommission(hostname: String, terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(AddNodeToDecommission(hostname, terminationTimeMs, reason)) + } + } + + /** + * Remove node from the decommission map. + */ + override def removeNodeToDecommission(hostname: String): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(RemoveNodeToDecommission(hostname)) + } + } + + /** + * Update node to mark it terminated + */ + override def updateNodeToDecommissionSetTerminate(hostname: String): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(UpdateNodeToDecommissionSetTerminate(hostname)) + } + } + + /** * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. * This endpoint communicates with the executors and queries the AM for an executor's exit @@ -319,6 +384,10 @@ private[spark] abstract class YarnSchedulerBackend( case RetrieveDelegationTokens => context.reply(currentDelegationTokens) + + case ClusterInfoUpdate(clusterInfo) => + logDebug(s"clusterInfoUpdate received at YarnSchedulerEndpoint : $clusterInfo") + context.reply(processClusterInfoUpdate(clusterInfo)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { From c745519875263794008f47184fa4ce6f416e2b82 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Mon, 2 Mar 2020 19:10:36 +0530 Subject: [PATCH 02/11] Adding the code change to update some of the comments in the code --- .../spark/internal/config/package.scala | 7 ++++++- .../apache/spark/scheduler/DAGScheduler.scala | 2 ++ .../spark/scheduler/DecommissionTracker.scala | 19 ++++++------------- .../spark/scheduler/TaskSetManager.scala | 6 +++++- .../spark/scheduler/cluster/ClusterInfo.scala | 7 ++++--- .../scheduler/DecommissionTrackerSuite.scala | 2 +- 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f0c61ca6ff49e..5e89994388ef6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1904,7 +1904,12 @@ package object config { private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") .doc("Percentage of time to expiry after which shuffle data " + - "cleaned up (if enabled) on the node. Value ranges between (0-100)") + "cleaned up (if enabled) on the node. Value ranges between (0-100)" + + " This value is always greater than or equal to executor" + + " leaseTime (is set to be equal if incorrectly configured)." + + " Near 0% would mean generated data is marked as lost too early." + + " Too close to 100 would shuffle data may not get cleared proactively" + + " leading to tasks going into fetchFail scenarios") .version("3.1.0") .intConf .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9c49b8bc252b0..17a28868cfd5b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1631,6 +1631,8 @@ private[spark] class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { + // Gracefully handling the stage abort due to fetch failure in the + // decommission nodes if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { // Ignore stage attempts due to fetch failed only // once per attempt diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala index b76df9ad346c2..a3d73673ef543 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -31,7 +31,9 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * DecommissionTracker tracks the list of decommissioned nodes. - * + * This decommission trackers maintains the decommissioned nodes state. + * Decommission tracker schedules the executor decommission and shuffle + * decommission for that node. */ private[scheduler] class DecommissionTracker ( conf: SparkConf, @@ -112,10 +114,6 @@ private[scheduler] class DecommissionTracker ( return } - // Consider node is picked up for nodeRotation it will be marked for - // Graceful Decommission with termination time of -1. - // But it is possible that it may then be genuinely nodeLoss. - // In those case override needs to be allowed. // Override decommissionHostnameMap in case termination time is less than // existing the terminationTime in decommissionHostnameMap. if (decommissionHostnameMap.contains(hostname)) { @@ -149,8 +147,8 @@ private[scheduler] class DecommissionTracker ( shuffleDataDecommissionTimeMs = curTimeMs + 1000 } else { reason match { - case SpotRotationLoss | NodeLoss => - // In Spot block Rotation loss and SpotLoss case adjust termination time so + case NodeLoss => + // In Nodeloss(Spotloss in aws/ preemptible VMs i GCP) case adjust termination time so // that enough buffer to real termination is available for job to finish // consuming shuffle data. executorDecommissionTimeMs = (delay * executorDecommissionLeasePct) / 100 + curTimeMs @@ -404,9 +402,4 @@ private[spark] sealed trait NodeDecommissionReason extends Serializable { @DeveloperApi private[spark] case object NodeLoss extends NodeDecommissionReason { override def message: String = "nodeLoss" -} - -@DeveloperApi -private[spark] case object SpotRotationLoss extends NodeDecommissionReason { - override def message: String = "nodeRotationLoss" -} +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 305ee0ea24d40..fec9016ec1ced 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -816,7 +816,11 @@ private[spark] class TaskSetManager( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) // Do account fetch failure exception raised by decommissioned - // node against stage failure. + // node against stage failure. Here the logic is to specify, + // if the task failed due to fetchFailed of decommission nodes than + // don't count towards the stageFailure. countTowardsStageFailures + // variable of TaskEndReason, that can be used in DAG scheduler to account + // fetch failure while checking the stage abort decommissionTracker match { case Some(decommissionTracker) => if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala index c1b4ea38f5776..30e960f6989be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala @@ -37,9 +37,10 @@ private[spark] object NodeState extends Enumeration { // In hadoop-2.7 there is no support for node state DECOMMISSIONING // In Hadoop-2.8, hadoop3.1 and later version of spark there is a support // to node state DECOMMISSIONING. - // Inorder to build the spark using hadoop2 and hadoop3, comparing the value of - // DECOMMISSIONING here and for other state we are matching - // the state and assigning the node state at spark end + // Inorder to build the spark using hadoop2 and hadoop3, not + // using YarnNodeState for the node state DECOMMISSIONING here and + // and for other state we are matching the YarnNodeState and assigning + // the node state at spark end if (state.toString.equals(NodeState.DECOMMISSIONING.toString)) { NodeState.GRACEFUL_DECOMMISSIONING } else { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala index 2eb8157fde47e..6f065d8be444e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala @@ -127,7 +127,7 @@ class DecommissionTrackerSuite extends SparkFunSuite test("Check the Multiple Node Decommission state") { decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) - decommissionTracker.addNodeToDecommission("hostB", 30000, SpotRotationLoss) + decommissionTracker.addNodeToDecommission("hostB", 30000, NodeLoss) assert(decommissionTracker.isNodeDecommissioning("hostA")) assert(decommissionTracker.isNodeDecommissioning("hostB")) From f69c21713ee03075bd45671f236a04cc9da823d9 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Mon, 2 Mar 2020 22:33:54 +0530 Subject: [PATCH 03/11] fix scala s[Ctyle issue in decomission tracker --- .../scala/org/apache/spark/scheduler/DecommissionTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala index a3d73673ef543..1729915d7a07b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -402,4 +402,4 @@ private[spark] sealed trait NodeDecommissionReason extends Serializable { @DeveloperApi private[spark] case object NodeLoss extends NodeDecommissionReason { override def message: String = "nodeLoss" -} \ No newline at end of file +} From c5ce9a7d645a403efe6bd937221e65b45be29278 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 15 Mar 2020 20:34:40 +0530 Subject: [PATCH 04/11] Refactor the code as per the review comments --- .../org/apache/spark/TaskEndReason.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 11 +- .../spark/scheduler/DecommissionTracker.scala | 140 +++++++++++------- .../spark/scheduler/TaskSchedulerImpl.scala | 7 +- .../spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/DecommissionTrackerSuite.scala | 6 +- 6 files changed, 98 insertions(+), 70 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index c69f1d7ae94c3..9d3bf8d1c1b3c 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -68,7 +68,7 @@ sealed trait TaskFailedReason extends TaskEndReason { * was unrelated to the task; for example, if the task failed because fetch failed exception * from the decommissioned node. */ - var countTowardsStageFailures: Boolean = true + var countTowardsDecommissionStageFailures: Boolean = true } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 17a28868cfd5b..21ee3a1238bcc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1633,18 +1633,19 @@ private[spark] class DAGScheduler( } else { // Gracefully handling the stage abort due to fetch failure in the // decommission nodes - if (!event.reason.asInstanceOf[FetchFailed].countTowardsStageFailures) { + if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) { // Ignore stage attempts due to fetch failed only // once per attempt if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { failedStage.ignoredFailedStageAttempts += 1 DecommissionTracker.incrFetchFailIgnoreCnt() failedStage.latestInfo.stageFailureIgnored(true) + + logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + + s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + + s""""totalIgnoredAttempts":"${failedStage.ignoredFailedStageAttempts}",""" + + s""""node":"$bmAddress"}""") } - logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + - s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + - s""""totalIgnoredAttempts":"${failedStage.ignoredFailedStageAttempts}",""" + - s""""node":"$bmAddress"}""") } failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala index 1729915d7a07b..213ffdfe41b01 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -36,14 +36,14 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * decommission for that node. */ private[scheduler] class DecommissionTracker ( - conf: SparkConf, - executorAllocClient: Option[ExecutorAllocationClient], - dagScheduler: Option[DAGScheduler], - clock: Clock = new SystemClock()) extends Logging { + conf: SparkConf, + executorAllocClient: Option[ExecutorAllocationClient], + dagScheduler: DAGScheduler, + clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext, - client: Option[ExecutorAllocationClient], - dagScheduler: Option[DAGScheduler]) = { + client: Option[ExecutorAllocationClient], + dagScheduler: DAGScheduler) = { this(sc.conf, client, dagScheduler) } @@ -51,10 +51,10 @@ private[scheduler] class DecommissionTracker ( private val decommissionThread = ThreadUtils.newDaemonThreadPoolScheduledExecutor("node-decommissioning-thread", 20) - // Contains workers hostname which are decommissioning. Added when spot-loss or - // graceful decommissioning event arrives from the AM. And is removed when the + // Contains workers hostname which are decommissioning. Added when + // decommissioning event arrives from the AM. And is removed when the // last node (identified by nodeId) is running again. - private val decommissionHostnameMap = new HashMap[String, NodeDecommissionInfo] + private val decommissionHostNameMap = new HashMap[String, NodeDecommissionInfo] private val minDecommissionTime = conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC) @@ -70,7 +70,7 @@ private[scheduler] class DecommissionTracker ( * view the node is considered decommissioned. */ def isNodeDecommissioned(hostname: String): Boolean = synchronized { - decommissionHostnameMap.get(hostname) match { + decommissionHostNameMap.get(hostname) match { case None => false case Some(info) => return info.state == NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED || @@ -84,22 +84,54 @@ private[scheduler] class DecommissionTracker ( * Not necessarily decommissioned or terminated */ def isNodeDecommissioning(hostname: String): Boolean = synchronized { - decommissionHostnameMap.contains(hostname) + decommissionHostNameMap.contains(hostname) } /** * visible only for Unit Test */ - def getDecommissionedNodeState(hostname: - String): Option[NodeDecommissionState.Value] = synchronized { - decommissionHostnameMap.get(hostname) match { + def getDecommissionedNodeState(hostname: String): Option[NodeDecommissionState.Value] = + synchronized { + decommissionHostNameMap.get(hostname) match { case Some(info) => Some(info.state) case _ => None } } - def addNodeToDecommission(hostname: String, terminationTimeMs: Long, - reason: NodeDecommissionReason): Unit = synchronized { + /** + * @param delayTime - time after which the node will be terminated + * @param currentTimeMs - Current time in milliseconds + * @return executorDecommissionTimeMs and shuffleDataDecommissionTimeMs + */ + private def getDecommissionTimeOut( + delayTime: Long, + currentTimeMs: Long): (Long, Long) = { + val executorDecommissionTimeMs = + if (executorDecommissionLeasePct > shuffleDataDecommissionLeasePct) { + // if executorDecommissionLeasePct is greater than + // shuffleDataDecommissionLeasePct. In that scenario calculate + // executorDecommissionTimeMs using shuffleDataDecommissionLeasePct + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + } else { + (delayTime * executorDecommissionLeasePct) / 100 + currentTimeMs + } + val shuffleDataDecommissionTimeMs = + if (executorDecommissionLeasePct <= shuffleDataDecommissionLeasePct) { + // Add a delay of one second in shuffleDataDecommissionTimeMs if + // executorDecommissionLeasePct equals shuffleDataDecommissionLeasePct + // Since we want executor to be decommissioned first + // than after that shuffleDataDecommission + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + 1000 + } else { + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + } + (executorDecommissionTimeMs, shuffleDataDecommissionTimeMs) + } + + def addNodeToDecommission( + hostname: String, + terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = synchronized { val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") val tDateTime = df.format(terminationTimeMs) @@ -114,14 +146,14 @@ private[scheduler] class DecommissionTracker ( return } - // Override decommissionHostnameMap in case termination time is less than - // existing the terminationTime in decommissionHostnameMap. - if (decommissionHostnameMap.contains(hostname)) { - val nodeDecommissionInfo = decommissionHostnameMap(hostname) - // There will be no duplicate entry of terminationTimeMs in decommissionHostnameMap + // Override decommissionHostNameMap in case termination time is less than + // existing the terminationTime in decommissionHostNameMap. + if (decommissionHostNameMap.contains(hostname)) { + val nodeDecommissionInfo = decommissionHostNameMap(hostname) + // There will be no duplicate entry of terminationTimeMs in decommissionHostNameMap // since the terminationTime is updated only when it is less than the existing termination - // time in decommissionHostnameMap - if (decommissionHostnameMap(hostname).terminationTime <= terminationTimeMs) { + // time in decommissionHostNameMap + if (decommissionHostNameMap(hostname).terminationTime <= terminationTimeMs) { logDebug( s"""Ignoring decommissioning """ + s""" request : {"node":"$hostname","reason":"${reason.message}",terminationTime"""" + @@ -139,7 +171,7 @@ private[scheduler] class DecommissionTracker ( var shuffleDataDecommissionTimeMs = terminationTimeMs // if delay is less than a minDecommissionTime than decommission immediately - if (terminationTimeMs - curTimeMs < minDecommissionTime * 1000) { + if (delay < minDecommissionTime * 1000) { executorDecommissionTimeMs = curTimeMs // Added the delay of 1 second in case of delay is less than a minute // Since we want executor to be decommissioned first @@ -148,12 +180,13 @@ private[scheduler] class DecommissionTracker ( } else { reason match { case NodeLoss => - // In Nodeloss(Spotloss in aws/ preemptible VMs i GCP) case adjust termination time so - // that enough buffer to real termination is available for job to finish + // In Nodeloss case adjust termination time so that enough + // buffer to real termination is available for job to finish // consuming shuffle data. - executorDecommissionTimeMs = (delay * executorDecommissionLeasePct) / 100 + curTimeMs - shuffleDataDecommissionTimeMs = - (delay * shuffleDataDecommissionLeasePct) / 100 + curTimeMs + var (executorDecommissionTime, shuffleDataDecommissionTime) = getDecommissionTimeOut( + delay, curTimeMs) + executorDecommissionTimeMs = executorDecommissionTime + shuffleDataDecommissionTimeMs = shuffleDataDecommissionTime case _ => // No action } @@ -180,7 +213,7 @@ private[scheduler] class DecommissionTracker ( s""" request : {"node":"$hostname",$nodeDecommissionInfo} """) // Add node to the list of decommissioning nodes. - decommissionHostnameMap.put(hostname, nodeDecommissionInfo) + decommissionHostNameMap.put(hostname, nodeDecommissionInfo) // Schedule executor decommission decommissionThread.schedule(new Runnable { @@ -198,14 +231,14 @@ private[scheduler] class DecommissionTracker ( } def removeNodeToDecommission(hostname: String): Unit = synchronized { - if (!decommissionHostnameMap.contains(hostname)) { + if (!decommissionHostNameMap.contains(hostname)) { return } - val nodeDecommissionInfo = decommissionHostnameMap(hostname) + val nodeDecommissionInfo = decommissionHostNameMap(hostname) logInfo(s"""Removing decommissioning""" + s""" request : {"node":"$hostname",$nodeDecommissionInfo}""") - decommissionHostnameMap -= hostname + decommissionHostNameMap -= hostname } def updateNodeToDecommissionSetTerminate(hostname: String): Unit = synchronized { @@ -213,22 +246,22 @@ private[scheduler] class DecommissionTracker ( } private def executorDecommission(hostname: String, - nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { // Not found, only valid scenario is the nodes // has moved back to running state // Scenario where nodeLoss terminated the node // for the Graceful Decommission node. // If the node is already terminated and hostname is re-used in that scenario // no need to kill the executor on that host - if (! decommissionHostnameMap.contains(hostname)) { + if (! decommissionHostNameMap.contains(hostname)) { logInfo(s"""Node $hostname not found in decommisssionTrackerList while""" + """performing executor decommission""") return } // if the terminationTime in the thread is not equal to - // terminationTime in decommissionHostnameMap for that + // terminationTime in decommissionHostNameMap for that // host than Ignore the ExecutorDecommission - if (decommissionHostnameMap(hostname).terminationTime + if (decommissionHostNameMap(hostname).terminationTime != nodeDecommissionInfo.terminationTime) { logInfo(s"Ignoring ExecutorDecommission for hostname ${hostname}," + s" since node is already terminated") @@ -240,31 +273,28 @@ private[scheduler] class DecommissionTracker ( // deadlock between schedulerBacked (ExecutorAllocationManager) // and this. executorAllocClient.map(_.killExecutorsOnHost(hostname)) - - decommissionHostnameMap(hostname).state = NodeDecommissionState.EXECUTOR_DECOMMISSIONED - + decommissionHostNameMap(hostname).state = NodeDecommissionState.EXECUTOR_DECOMMISSIONED logInfo(s"Node $hostname decommissioned") - return } private def removeShuffleData(hostname: String, - nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { // Not found, only valid scenario is the nodes // has moved back to running state // This for scenario where about_to_be_lost terminated the node // for the Graceful Decommission node. // If the node is already terminated and hostname is reused in that scenario // no need to remove the shuffle entry from map-output tracker - if (! decommissionHostnameMap.contains(hostname)) { + if (! decommissionHostNameMap.contains(hostname)) { logInfo(s"""Node $hostname not found in decommisssionTrackerList while """ + """performing shuffle data decommission""") return } // if the terminationTime in the thread is not equal to - // terminationTime in decommissionHostnameMap for that + // terminationTime in decommissionHostNameMap for that // host than Ignore the removeShuffleData - if (decommissionHostnameMap(hostname).terminationTime + if (decommissionHostNameMap(hostname).terminationTime != nodeDecommissionInfo.terminationTime) { logInfo(s"Ignoring removeShuffleData for hostname ${hostname}," + s" since node is already terminated") @@ -272,9 +302,9 @@ private[scheduler] class DecommissionTracker ( } // Unregister shuffle data. - dagScheduler.map(_.nodeDecommissioned(hostname)) + dagScheduler.nodeDecommissioned(hostname) - decommissionHostnameMap(hostname).state = NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED + decommissionHostNameMap(hostname).state = NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED logInfo(s"Node $hostname Shuffle data decommissioned") @@ -284,15 +314,15 @@ private[scheduler] class DecommissionTracker ( private def terminate(hostname: String): Unit = { // Not found, only valid scenario is the nodes // has moved back to running state - if (!decommissionHostnameMap.contains(hostname)) { + if (!decommissionHostNameMap.contains(hostname)) { logWarning(s"Node $hostname not found in decommisssionTrackerList") return } // Remove all the shuffle data of all the executors for the terminated node - dagScheduler.map(_.nodeDecommissioned(hostname)) + dagScheduler.nodeDecommissioned(hostname) - decommissionHostnameMap(hostname).state = NodeDecommissionState.TERMINATED + decommissionHostNameMap(hostname).state = NodeDecommissionState.TERMINATED logInfo(s"Node $hostname terminated") } @@ -367,11 +397,11 @@ private[spark] object DecommissionTracker extends Logging { } private class NodeDecommissionInfo( - var terminationTime: Long, - var executorDecommissionTime: Long, - var shuffleDataDecommissionTime: Long, - var state: NodeDecommissionState.Value, - var reason: NodeDecommissionReason) { + var terminationTime: Long, + var executorDecommissionTime: Long, + var shuffleDataDecommissionTime: Long, + var state: NodeDecommissionState.Value, + var reason: NodeDecommissionReason) { override def toString(): String = { val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") val tDateTime = df.format(terminationTime) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 01cd6ecfb37c3..b2b41ad01dd7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -1168,11 +1168,8 @@ private[spark] object TaskSchedulerImpl { case _ => None } - val dagScheduler: Option[DAGScheduler] = sc.dagScheduler match { - case b: DAGScheduler => Some(b) - case _ => None - } - if (executorAllocClient.isDefined && dagScheduler.isDefined) { + val dagScheduler = sc.dagScheduler + if (executorAllocClient.isDefined && dagScheduler != null) { Some(new DecommissionTracker(sc, executorAllocClient, dagScheduler)) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fec9016ec1ced..3269d7cf6a33e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -826,7 +826,7 @@ private[spark] class TaskSetManager( if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) { logInfo(s"Do not count fetch failure from decommissioned" + s" node ${fetchFailed.bmAddress.host}") - fetchFailed.countTowardsStageFailures = false + fetchFailed.countTowardsDecommissionStageFailures = false } case _ => // No action diff --git a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala index 6f065d8be444e..a55234277aea3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala @@ -58,7 +58,7 @@ class DecommissionTrackerSuite extends SparkFunSuite val clock = new ManualClock(0) clock.setTime(0) decommissionTracker = new DecommissionTracker(conf, - Some(executorAllocationClient), Some(dagScheduler), clock = clock) + Some(executorAllocationClient), dagScheduler, clock = clock) when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { override def answer(invocation: InvocationOnMock): Boolean = { @@ -83,7 +83,7 @@ class DecommissionTrackerSuite extends SparkFunSuite test("Check Node Decommission state") { val clock = new ManualClock(0) decommissionTracker = new DecommissionTracker(conf, - Some(executorAllocationClient), Some(dagScheduler), clock = clock) + Some(executorAllocationClient), dagScheduler, clock = clock) when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { override def answer(invocation: InvocationOnMock): Boolean = { @@ -235,7 +235,7 @@ class DecommissionTrackerSuite extends SparkFunSuite .set(config.GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT.key, "50") decommissionTracker = new DecommissionTracker(conf, - Some(executorAllocationClient), Some(dagScheduler), clock = clock) + Some(executorAllocationClient), dagScheduler, clock = clock) decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) assert(decommissionTracker.isNodeDecommissioning("hostA")) From e56ac5582f08ebee35134d1c911acc4c44140218 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Sun, 15 Mar 2020 20:51:49 +0530 Subject: [PATCH 05/11] updatecd the code as per the review comments --- .../spark/scheduler/DecommissionTracker.scala | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala index 213ffdfe41b01..63d1b381b38e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -56,8 +56,8 @@ private[scheduler] class DecommissionTracker ( // last node (identified by nodeId) is running again. private val decommissionHostNameMap = new HashMap[String, NodeDecommissionInfo] - private val minDecommissionTime = - conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC) + private val minDecommissionTimeMs = + conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC)*1000 private val executorDecommissionLeasePct = conf.get(config.GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT) @@ -88,7 +88,7 @@ private[scheduler] class DecommissionTracker ( } /** - * visible only for Unit Test + * Used for Unit Test */ def getDecommissionedNodeState(hostname: String): Option[NodeDecommissionState.Value] = synchronized { @@ -108,7 +108,7 @@ private[scheduler] class DecommissionTracker ( currentTimeMs: Long): (Long, Long) = { val executorDecommissionTimeMs = if (executorDecommissionLeasePct > shuffleDataDecommissionLeasePct) { - // if executorDecommissionLeasePct is greater than + // if executorDecommissionLeasePct is greater than // shuffleDataDecommissionLeasePct. In that scenario calculate // executorDecommissionTimeMs using shuffleDataDecommissionLeasePct (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs @@ -116,9 +116,9 @@ private[scheduler] class DecommissionTracker ( (delayTime * executorDecommissionLeasePct) / 100 + currentTimeMs } val shuffleDataDecommissionTimeMs = - if (executorDecommissionLeasePct <= shuffleDataDecommissionLeasePct) { + if (executorDecommissionLeasePct >= shuffleDataDecommissionLeasePct) { // Add a delay of one second in shuffleDataDecommissionTimeMs if - // executorDecommissionLeasePct equals shuffleDataDecommissionLeasePct + // executorDecommissionLeasePct greater than equals to shuffleDataDecommissionLeasePct // Since we want executor to be decommissioned first // than after that shuffleDataDecommission (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + 1000 @@ -171,9 +171,9 @@ private[scheduler] class DecommissionTracker ( var shuffleDataDecommissionTimeMs = terminationTimeMs // if delay is less than a minDecommissionTime than decommission immediately - if (delay < minDecommissionTime * 1000) { + if (delay < minDecommissionTimeMs) { executorDecommissionTimeMs = curTimeMs - // Added the delay of 1 second in case of delay is less than a minute + // Added the delay of 1 second in case of delay is less than minDecommissionTime // Since we want executor to be decommissioned first // than after that shuffleDataDecommission shuffleDataDecommissionTimeMs = curTimeMs + 1000 @@ -190,13 +190,6 @@ private[scheduler] class DecommissionTracker ( case _ => // No action } - - if (executorDecommissionTimeMs > shuffleDataDecommissionTimeMs) { - executorDecommissionTimeMs = shuffleDataDecommissionTimeMs - logInfo(s"""Executor decommission time $executorDecommissionTimeMs needs to be less""" + - s""" than shuffle data decommission time $shuffleDataDecommissionTimeMs. Setting it """ + - s""" to shuffle data decommission time.""") - } } // Count of executors/worker which went to decommissioning From d77248e78cb4e7586e94311d0493260076e431f4 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 17 Mar 2020 00:10:34 +0530 Subject: [PATCH 06/11] add the code change as per the discussion in the PR --- .../apache/spark/scheduler/DAGScheduler.scala | 17 +++++++------ .../spark/scheduler/DecommissionTracker.scala | 11 +++++---- .../org/apache/spark/scheduler/Stage.scala | 6 ++--- .../apache/spark/scheduler/StageInfo.scala | 6 ----- .../spark/scheduler/TaskSetManager.scala | 2 +- .../spark/scheduler/cluster/ClusterInfo.scala | 3 ++- .../CoarseGrainedSchedulerBackend.scala | 12 ++++++---- .../scheduler/DecommissionTrackerSuite.scala | 24 +++++++++---------- 8 files changed, 39 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 21ee3a1238bcc..0cfc02256abe4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1635,23 +1635,22 @@ private[spark] class DAGScheduler( // decommission nodes if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) { // Ignore stage attempts due to fetch failed only - // once per attempt + // once per attempt due to nodes decommissioning event if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { - failedStage.ignoredFailedStageAttempts += 1 + failedStage.ignoredDecommissionFailedStage += 1 DecommissionTracker.incrFetchFailIgnoreCnt() - failedStage.latestInfo.stageFailureIgnored(true) logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + - s""""totalIgnoredAttempts":"${failedStage.ignoredFailedStageAttempts}",""" + + s""""totalIgnoredAttempts":"${failedStage.ignoredDecommissionFailedStage}",""" + s""""node":"$bmAddress"}""") } } failedStage.failedAttemptIds.add(task.stageAttemptId) val shouldAbortStage = failedStage.failedAttemptIds.size >= - (maxConsecutiveStageAttempts + failedStage.ignoredFailedStageAttempts) || + (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || disallowStageRetryForTest || - failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts + failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts // It is likely that we receive multiple FetchFailed for a single stage (because we have @@ -1693,7 +1692,7 @@ private[spark] class DAGScheduler( } if (shouldAbortStage) { - if (failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts + if (failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts && DecommissionTracker.isDecommissionEnabled(sc.getConf)) { DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true) } @@ -1860,9 +1859,9 @@ private[spark] class DAGScheduler( // TODO Refactor the failure handling logic to combine similar code with that of // FetchFailed. val shouldAbortStage = failedStage.failedAttemptIds.size >= - (maxConsecutiveStageAttempts + failedStage.ignoredFailedStageAttempts) || + (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || disallowStageRetryForTest || - failedStage.ignoredFailedStageAttempts > maxIgnoredFailedStageAttempts + failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala index 63d1b381b38e8..d3bd230e4059b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -79,11 +79,12 @@ private[scheduler] class DecommissionTracker ( } /* - * Is the node decommissioning i.e from driver point of - * view the node is candidate for decommissioning. - * Not necessarily decommissioned or terminated + * Is the node present in decommission Tracker + * This is used by driver for not registering any new executor + * on that node which is present in decommission Tracker and + * neither to assign any new task to that existing executor on that node. */ - def isNodeDecommissioning(hostname: String): Boolean = synchronized { + def isNodePresentInDecommissionTracker(hostname: String): Boolean = synchronized { decommissionHostNameMap.contains(hostname) } @@ -133,7 +134,7 @@ private[scheduler] class DecommissionTracker ( terminationTimeMs: Long, reason: NodeDecommissionReason): Unit = synchronized { - val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val df: SimpleDateFormat = new SimpleDateFormat("yy/MM/dd HH:mm:ss") val tDateTime = df.format(terminationTimeMs) val curTimeMs = clock.getTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 9650159329612..c892daab67e85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -92,14 +92,14 @@ private[scheduler] abstract class Stage( val failedAttemptIds = new HashSet[Int] /** - * Number of times the stage failure needs to be ignored. e.g failed due to fetch failed + * Number of times the stage failure is ignored. e.g failed due to fetch failed * exception caused by node decommissioning. */ - var ignoredFailedStageAttempts = 0 + var ignoredDecommissionFailedStage = 0 private[scheduler] def clearFailures() : Unit = { failedAttemptIds.clear() - ignoredFailedStageAttempts = 0 + ignoredDecommissionFailedStage = 0 } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 30555bc5dd3d2..556478d83cf39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -46,8 +46,6 @@ class StageInfo( var completionTime: Option[Long] = None /** If the stage failed, the reason why. */ var failureReason: Option[String] = None - /** if stage failure ignored */ - var ignoredFailure: Option[Boolean] = None /** * Terminal values of accumulables updated during this stage, including all the user-defined @@ -60,10 +58,6 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } - def stageFailureIgnored(ignored: Boolean) { - ignoredFailure = Some(ignored) - } - // This would just be the second constructor arg, except we need to maintain this method // with parentheses for compatibility def attemptNumber(): Int = attemptId diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3269d7cf6a33e..df22544759ad0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -819,7 +819,7 @@ private[spark] class TaskSetManager( // node against stage failure. Here the logic is to specify, // if the task failed due to fetchFailed of decommission nodes than // don't count towards the stageFailure. countTowardsStageFailures - // variable of TaskEndReason, that can be used in DAG scheduler to account + // variable of TaskEndReason, that can be used in DAGScheduler to account // fetch failure while checking the stage abort decommissionTracker match { case Some(decommissionTracker) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala index 30e960f6989be..4ac1da04bb8ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala @@ -59,7 +59,8 @@ private[spark] object NodeState extends Enumeration { * Node information used by CoarseGrainedSchedulerBackend. * * @param nodeState node state - * @param terminationTime time at which node will terminate + * @param terminationTime Time at which node will terminate. + * This optional and will be set only for node state DECOMMISSIONING */ private[spark] case class NodeInfo(nodeState: NodeState.Value, terminationTime: Option[Long]) extends Serializable { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9584db8556c62..531eac7f08f9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -234,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) - } else if (isNodeDecommissioning(hostname)) { + } else if (isNodePresentInDecommissionTracker(hostname)) { // Refuse any new executor registered from the decommissioning worker. Can only happen // in case of spot loss nodes about to be lost. For nodes gracefuly decommissioning this // won't happen. @@ -316,7 +316,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(isExecutorActive) - .filter(x => !isNodeDecommissioning(x._2.executorHost)) + .filter(x => !isNodePresentInDecommissionTracker(x._2.executorHost)) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, @@ -348,7 +348,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Filter out executors under killing // add the filter to check executor doesnot belong to // decommission nodes - if (isExecutorActive(executorId) && !isNodeDecommissioning(executorData.executorHost)) { + if (isExecutorActive(executorId) && + !isNodePresentInDecommissionTracker(executorData.executorHost)) { val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort), @@ -909,10 +910,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp def updateNodeToDecommissionSetTerminate(hostname: String): Unit = { None } - private def isNodeDecommissioning(hostname: String): Boolean = { + private def isNodePresentInDecommissionTracker(hostname: String): Boolean = { decommissionTracker match { case None => return false - case Some(decommissionTracker) => return decommissionTracker.isNodeDecommissioning(hostname) + case Some(decommissionTracker) => + return decommissionTracker.isNodePresentInDecommissionTracker(hostname) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala index a55234277aea3..4c0a5b5dc1e7d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala @@ -99,7 +99,7 @@ class DecommissionTrackerSuite extends SparkFunSuite }) decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) - assert(decommissionTracker.isNodeDecommissioning("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) // Wait for executor decommission give 100ms over 50% Thread.sleep(5100) @@ -122,14 +122,14 @@ class DecommissionTrackerSuite extends SparkFunSuite // Recomission decommissionTracker.removeNodeToDecommission("hostA") - assert(!decommissionTracker.isNodeDecommissioning("hostA")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) } test("Check the Multiple Node Decommission state") { decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) decommissionTracker.addNodeToDecommission("hostB", 30000, NodeLoss) - assert(decommissionTracker.isNodeDecommissioning("hostA")) - assert(decommissionTracker.isNodeDecommissioning("hostB")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostB")) // Wait for hostA executor decommission Thread.sleep(5100) @@ -152,7 +152,7 @@ class DecommissionTrackerSuite extends SparkFunSuite // Recommission hostA decommissionTracker.removeNodeToDecommission("hostA") - assert(!decommissionTracker.isNodeDecommissioning("hostA")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) // Wait for hostB executor decommission Thread.sleep(5000) @@ -175,14 +175,14 @@ class DecommissionTrackerSuite extends SparkFunSuite // Recommission hostB decommissionTracker.removeNodeToDecommission("hostB") - assert(!decommissionTracker.isNodeDecommissioning("hostB")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostB")) } test("Check Multiple Node Decommission state at same time") { decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) decommissionTracker.addNodeToDecommission("hostB", 10000, NodeLoss) - assert(decommissionTracker.isNodeDecommissioning("hostA")) - assert(decommissionTracker.isNodeDecommissioning("hostB")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostB")) // Wait for both hostA hostB executor decommission Thread.sleep(5100) @@ -211,9 +211,9 @@ class DecommissionTrackerSuite extends SparkFunSuite // Recommission both decommissionTracker.removeNodeToDecommission("hostA") - assert(!decommissionTracker.isNodeDecommissioning("hostA")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) decommissionTracker.removeNodeToDecommission("hostB") - assert(!decommissionTracker.isNodeDecommissioning("hostB")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostB")) } test("Check Node decommissioning with minimum termination time config") { @@ -238,7 +238,7 @@ class DecommissionTrackerSuite extends SparkFunSuite Some(executorAllocationClient), dagScheduler, clock = clock) decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) - assert(decommissionTracker.isNodeDecommissioning("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) // Wait for hostA executor decommission at 20% Thread.sleep(2100) @@ -261,6 +261,6 @@ class DecommissionTrackerSuite extends SparkFunSuite // Recommission decommissionTracker.removeNodeToDecommission("hostA") - assert(!decommissionTracker.isNodeDecommissioning("hostA")) + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) } } From 5dd1dceb1b7018b5b74ad1a7c7f8452def85ff88 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 31 Mar 2020 16:32:08 +0530 Subject: [PATCH 07/11] Refactoring the code --- .../spark/internal/config/package.scala | 21 ++++++++------- .../spark/scheduler/cluster/ClusterInfo.scala | 26 +----------------- .../spark/deploy/yarn/YarnAllocator.scala | 27 +++++++++++++++++-- .../cluster/YarnSchedulerBackend.scala | 2 +- 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5e89994388ef6..7e1e9f692533a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1884,10 +1884,10 @@ package object config { private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") .doc("Threshold of number of times fetchfailed ignored due to node" + - " decommission.This is configurable as per the need of the user and" + - " depending upon type of the cloud. If we keep this a large value and " + - " there is continuous decommission of nodes, in those scenarios stage" + - " will never abort and keeps on retrying in an unbounded manner.") + "decommission.This is configurable as per the need of the user and" + + "depending upon type of the cloud. If we keep this a large value and " + + "there is continuous decommission of nodes, in those scenarios stage" + + "will never abort and keeps on retrying in an unbounded manner.") .version("3.1.0") .intConf .createWithDefault(8) @@ -1905,11 +1905,11 @@ package object config { ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") .doc("Percentage of time to expiry after which shuffle data " + "cleaned up (if enabled) on the node. Value ranges between (0-100)" + - " This value is always greater than or equal to executor" + - " leaseTime (is set to be equal if incorrectly configured)." + - " Near 0% would mean generated data is marked as lost too early." + - " Too close to 100 would shuffle data may not get cleared proactively" + - " leading to tasks going into fetchFail scenarios") + "This value is always greater than or equal to executor" + + "leaseTime (is set to be equal if incorrectly configured)." + + "Near 0% would mean generated data is marked as lost too early." + + "Too close to 100 would shuffle data may not get cleared proactively" + + "leading to tasks going into fetchFail scenarios") .version("3.1.0") .intConf .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") @@ -1926,7 +1926,8 @@ package object config { ConfigBuilder("spark.graceful.decommission.node.timeout") .doc("Interval in seconds after which the node is decommissioned in case aws spotloss" + "the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" + - "this config can be changed according to node type in the public cloud") + "this config can be changed according to node type in the public cloud. This will" + + "be applied if the decommission timeout is not sent by the Resource Manager") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefaultString("110s") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala index 4ac1da04bb8ed..942810d9fce90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala @@ -21,38 +21,14 @@ import java.text.SimpleDateFormat import scala.collection.mutable.{HashMap} -import org.apache.hadoop.yarn.api.records.{NodeState => YarnNodeState} - /** * State of the node. * Add the node state depending upon the cluster manager, For Yarn * getYarnNodeState is added to create the node state for Decommission Tracker */ private[spark] object NodeState extends Enumeration { - val RUNNING, DECOMMISSIONED, GRACEFUL_DECOMMISSIONING, DECOMMISSIONING, LOST, OTHER = Value + val RUNNING, DECOMMISSIONED, DECOMMISSIONING, LOST, OTHER = Value type NodeState = Value - - // Helper method to get NodeState of the Yarn. - def getYarnNodeState(state: YarnNodeState): NodeState.Value = { - // In hadoop-2.7 there is no support for node state DECOMMISSIONING - // In Hadoop-2.8, hadoop3.1 and later version of spark there is a support - // to node state DECOMMISSIONING. - // Inorder to build the spark using hadoop2 and hadoop3, not - // using YarnNodeState for the node state DECOMMISSIONING here and - // and for other state we are matching the YarnNodeState and assigning - // the node state at spark end - if (state.toString.equals(NodeState.DECOMMISSIONING.toString)) { - NodeState.GRACEFUL_DECOMMISSIONING - } else { - state match { - case YarnNodeState.RUNNING => NodeState.RUNNING - case YarnNodeState.DECOMMISSIONED => NodeState.DECOMMISSIONED - case YarnNodeState.LOST => NodeState.LOST - case YarnNodeState.UNHEALTHY => NodeState.LOST - case _ => NodeState.OTHER - } - } - } } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 024c13217b661..ad38b7da10a00 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.records.{NodeState => YarnNodeState} import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -456,6 +457,28 @@ private[yarn] class YarnAllocator( } } + // Helper method to get NodeState of the Yarn. + def getYarnNodeState(state: YarnNodeState): NodeState.Value = { + // In hadoop-2.7 there is no support for node state DECOMMISSIONING + // In Hadoop-2.8, hadoop3.1 and later version of spark there is a support + // to node state DECOMMISSIONING. + // Inorder to build the spark using hadoop2 and hadoop3, not + // using YarnNodeState for the node state DECOMMISSIONING here and + // and for other state we are matching the YarnNodeState and assigning + // the node state at spark end + if (state.toString.equals(NodeState.DECOMMISSIONING.toString)) { + NodeState.DECOMMISSIONING + } else { + state match { + case YarnNodeState.RUNNING => NodeState.RUNNING + case YarnNodeState.DECOMMISSIONED => NodeState.DECOMMISSIONED + case YarnNodeState.LOST => NodeState.LOST + case YarnNodeState.UNHEALTHY => NodeState.LOST + case _ => NodeState.OTHER + } + } + } + def processGracefulDecommission(allocateResponse: AllocateResponse): Unit = { // Create a consolidated node decommission info report. val nodeInfos = new HashMap[String, NodeInfo] @@ -483,11 +506,11 @@ private[yarn] class YarnAllocator( } nodeInfos(x.getNodeId().getHost()) = NodeInfo(terminationTime = Some(nodeTerminationTime), - nodeState = NodeState.getYarnNodeState(x.getNodeState())) + nodeState = getYarnNodeState(x.getNodeState())) } else { nodeInfos(x.getNodeId().getHost()) = NodeInfo(terminationTime = None, - nodeState = NodeState.getYarnNodeState(x.getNodeState())) + nodeState = getYarnNodeState(x.getNodeState())) } } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1cc9e24ff7834..deac6d17bc4e0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -224,7 +224,7 @@ private[spark] abstract class YarnSchedulerBackend( clusterInfo.nodeInfos.foreach { case (node, nodeInfo) => nodeInfo.nodeState match { - case NodeState.GRACEFUL_DECOMMISSIONING => + case NodeState.DECOMMISSIONING => if (nodeInfo.terminationTime.isDefined && nodeInfo.terminationTime.get > 0) { // eg. Spot Loss/ preemptible VMs loss case addNodeToDecommission(node, nodeInfo.terminationTime.get, NodeLoss) From 64f540201015b2bd127c479554a6b525d48c8684 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Mon, 25 May 2020 19:14:44 +0530 Subject: [PATCH 08/11] Rebase with master --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f16ac07bf9f95..d85153800923f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -203,7 +203,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) { override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { // Don't shuffle the offers around for this test. Instead, we'll just pass in all From 1acf665496772dd5591ce4e8804a5bc9cf2f275c Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Tue, 9 Jun 2020 13:48:18 +0530 Subject: [PATCH 09/11] Add the decommission configuration in configuration.md --- .../spark/internal/config/package.scala | 7 +- docs/configuration.md | 66 +++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7e1e9f692533a..0fb78762bc510 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1917,7 +1917,12 @@ package object config { private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = ConfigBuilder("spark.graceful.decommission.min.termination.time") - .doc("Minimum time to termination below which node decommissioning is performed immediately") + .doc("Minimum time to termination below which node decommissioning is performed " + + "immediately. If decommissioning time is less than the " + + "configured time(spark.graceful.decommission.min.termination.time)," + + "than in that scenario the executor decommissioning and shuffle data clean up will " + + "take place immediately.First the executor decommission than the " + + "shuffle data clean up.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefaultString("60s") diff --git a/docs/configuration.md b/docs/configuration.md index 420942f7b7bbb..c4d8877cc898c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2146,6 +2146,72 @@ Apart from these, the following properties are also available, and may be useful 3.1.0 + + spark.graceful.decommission.enable + false + + If set to "true", Spark will handle the node decommissioning gracefully + for YARN Resource Manager. + + 3.1.0 + + + spark.graceful.decommission.node.timeout + 110s + + Interval in seconds after which the node is decommissioned. In case aws spotloss + the time is approximately 110s and in case of GCP preemptible VMs this is around 30s + this config can be changed according to node type in the public cloud. This will + be applied if the decommission timeout is not sent by the Resource Manager. + + 3.1.0 + + + spark.graceful.decommission.fetchfailed.ignore.threshold + 8 + + Threshold of number of times fetchfailed ignored due to node decommission. + This is configurable as per the need of the user and depending upon + type of the cloud. If we keep this a large value and there is + continuous decommission of nodes, in those scenarios stage + will never abort and keeps on retrying in an unbounded manner. + + 3.1.0 + + + spark.graceful.decommission.executor.leasetimePct + 50 + + Percentage of time to expiry after which executors are killed + (if enabled) on the node. Value ranges between (0-100). + + 3.1.0 + + + spark.graceful.decommission.shuffedata.leasetimePct + 90 + + Percentage of time to expiry after which shuffle data + cleaned up (if enabled) on the node. Value ranges between (0-100) + This value is always greater than or equal to executor + leaseTime (is set to be equal if incorrectly configured). + Near 0% would mean generated data is marked as lost too early. + Too close to 100 would shuffle data may not get cleared proactively + leading to tasks going into fetchFail scenarios. + + 3.1.0 + + + spark.graceful.decommission.min.termination.time + 60s + + Minimum time to termination below which node decommissioning is performed immediately. + If decommissioning time is less than the configured time(spark.graceful.decommission.min.termination.time), + than in that scenario the executor decommissioning and shuffle data clean up will take place + immediately. First the executor decommission than the shuffle data clean up. + + 3.1.0 + spark.scheduler.blacklist.unschedulableTaskSetTimeout 120s From e642754d03d7a27ee9710ef5d8cace9c634d7faa Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Thu, 11 Jun 2020 15:54:13 +0530 Subject: [PATCH 10/11] use the replicating the cached blocks(https://github.com/apache/spark/pull/27864) in the current PR --- .../CoarseGrainedSchedulerBackend.scala | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 531eac7f08f9e..753495e97b2fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -208,8 +208,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp makeOffers(executorId) case AddNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) => - decommissionTracker.foreach( - _.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason)) + decommissionTracker.foreach { tracker => + tracker.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) + val nodeDecommissionState = tracker.getDecommissionedNodeState(hostname) + if(nodeDecommissionState.isDefined && + !nodeDecommissionState.contains(NodeDecommissionState.TERMINATED)) { + val exe = scheduler.getExecutorsAliveOnHost(hostname) + exe match { + case Some(set) => + for (e <- set) { + moveCachedRddFromDecommissionExecutor(e) + } + case None => logInfo("There is active no executor available" + + " in decommission node for moving the cached RDD") + } + } + } case RemoveNodeToDecommission(hostname) => decommissionTracker.foreach(_.removeNodeToDecommission(hostname)) @@ -463,25 +477,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(s"Unexpected error during decommissioning ${e.toString}", e) } logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } + moveCachedRddFromDecommissionExecutor(executorId) } else { logInfo(s"Skipping decommissioning of executor $executorId.") } shouldDisable } + /** + * Move cached Rdd from Decommission executors + */ + private def moveCachedRddFromDecommissionExecutor(executorId: String): Unit = { + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo("Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. From c476e5271ca8b8fd3c401980a080334b8d7b9a36 Mon Sep 17 00:00:00 2001 From: SaurabhChawla Date: Thu, 11 Jun 2020 16:12:27 +0530 Subject: [PATCH 11/11] Add the comment in the code change --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 753495e97b2fb..5c92b2a3e6693 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -210,6 +210,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case AddNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) => decommissionTracker.foreach { tracker => tracker.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) + // handle the cached RDD on Decommission nodes val nodeDecommissionState = tracker.getDecommissionedNodeState(hostname) if(nodeDecommissionState.isDefined && !nodeDecommissionState.contains(NodeDecommissionState.TERMINATED)) { @@ -484,9 +485,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp shouldDisable } - /** - * Move cached Rdd from Decommission executors - */ + // Move cached Rdd from Decommission executors private def moveCachedRddFromDecommissionExecutor(executorId: String): Unit = { if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { try {