diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index b13028f868072..9d3bf8d1c1b3c 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -61,6 +61,14 @@ sealed trait TaskFailedReason extends TaskEndReason { * on was killed. */ def countTowardsTaskFailures: Boolean = true + + /** + * Whether this task failure should be counted towards the maximum number of times the stage is + * allowed to fail before the stage is aborted. Set to false in cases where the task's failure + * was unrelated to the task; for example, if the task failed because fetch failed exception + * from the decommissioned node. + */ + var countTowardsDecommissionStageFailures: Boolean = true } /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ee437c696b47e..0fb78762bc510 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1873,4 +1873,67 @@ package object config { .version("3.1.0") .booleanConf .createWithDefault(false) + + private[spark] val GRACEFUL_DECOMMISSION_ENABLE = + ConfigBuilder("spark.graceful.decommission.enable") + .doc("Whether to enable the node graceful decommissioning handling") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD = + ConfigBuilder("spark.graceful.decommission.fetchfailed.ignore.threshold") + .doc("Threshold of number of times fetchfailed ignored due to node" + + "decommission.This is configurable as per the need of the user and" + + "depending upon type of the cloud. If we keep this a large value and " + + "there is continuous decommission of nodes, in those scenarios stage" + + "will never abort and keeps on retrying in an unbounded manner.") + .version("3.1.0") + .intConf + .createWithDefault(8) + + private[spark] val GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT = + ConfigBuilder("spark.graceful.decommission.executor.leasetimePct") + .doc("Percentage of time to expiry after which executors are killed " + + "(if enabled) on the node. Value ranges between (0-100)") + .version("3.1.0") + .intConf + .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") + .createWithDefault(50) // Pulled out of thin air. + + private[spark] val GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT = + ConfigBuilder("spark.graceful.decommission.shuffedata.leasetimePct") + .doc("Percentage of time to expiry after which shuffle data " + + "cleaned up (if enabled) on the node. Value ranges between (0-100)" + + "This value is always greater than or equal to executor" + + "leaseTime (is set to be equal if incorrectly configured)." + + "Near 0% would mean generated data is marked as lost too early." + + "Too close to 100 would shuffle data may not get cleared proactively" + + "leading to tasks going into fetchFail scenarios") + .version("3.1.0") + .intConf + .checkValue(v => v >= 0 && v < 100, "The percentage should be positive.") + .createWithDefault(90) // Pulled out of thin air. + + private[spark] val GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC = + ConfigBuilder("spark.graceful.decommission.min.termination.time") + .doc("Minimum time to termination below which node decommissioning is performed " + + "immediately. If decommissioning time is less than the " + + "configured time(spark.graceful.decommission.min.termination.time)," + + "than in that scenario the executor decommissioning and shuffle data clean up will " + + "take place immediately.First the executor decommission than the " + + "shuffle data clean up.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("60s") + + private[spark] val GRACEFUL_DECOMMISSION_NODE_TIMEOUT = + ConfigBuilder("spark.graceful.decommission.node.timeout") + .doc("Interval in seconds after which the node is decommissioned in case aws spotloss" + + "the time is approximately 110s and in case of GCP preemptible VMs this is around 30s" + + "this config can be changed according to node type in the public cloud. This will" + + "be applied if the decommission timeout is not sent by the Resource Manager") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("110s") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 37f9e0bb483c2..0cfc02256abe4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -222,6 +222,13 @@ private[spark] class DAGScheduler( private val maxFailureNumTasksCheck = sc.getConf .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) + /** + * Threshold to try number of times the ignore the fetch failed + * due to decommissioning of nodes + */ + private val maxIgnoredFailedStageAttempts = sc.getConf + .get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) + private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") @@ -289,6 +296,13 @@ private[spark] class DAGScheduler( eventProcessLoop.post(WorkerRemoved(workerId, host, message)) } + /** + * Called by DecommissionTracker when node is decommissioned + */ + def nodeDecommissioned(host: String): Unit = { + eventProcessLoop.post(NodeDecommissioned(host)) + } + /** * Called by TaskScheduler implementation when a host is added. */ @@ -1617,10 +1631,27 @@ private[spark] class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { + // Gracefully handling the stage abort due to fetch failure in the + // decommission nodes + if (!event.reason.asInstanceOf[FetchFailed].countTowardsDecommissionStageFailures) { + // Ignore stage attempts due to fetch failed only + // once per attempt due to nodes decommissioning event + if (!failedStage.failedAttemptIds.contains(task.stageAttemptId)) { + failedStage.ignoredDecommissionFailedStage += 1 + DecommissionTracker.incrFetchFailIgnoreCnt() + + logInfo(s"""Ignoring stage failure due to fetch failed from the decommissioned""" + + s""" node : {"stage":"$failedStage","attempt":"${task.stageAttemptId}",""" + + s""""totalIgnoredAttempts":"${failedStage.ignoredDecommissionFailedStage}",""" + + s""""node":"$bmAddress"}""") + } + } failedStage.failedAttemptIds.add(task.stageAttemptId) - val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + val shouldAbortStage = failedStage.failedAttemptIds.size >= + (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || + disallowStageRetryForTest || + failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts + // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is @@ -1661,6 +1692,10 @@ private[spark] class DAGScheduler( } if (shouldAbortStage) { + if (failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts + && DecommissionTracker.isDecommissionEnabled(sc.getConf)) { + DecommissionTracker.setFetchFailIgnoreCntThresholdFlag(true) + } val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" } else { @@ -1823,9 +1858,10 @@ private[spark] class DAGScheduler( failedStage.failedAttemptIds.add(task.stageAttemptId) // TODO Refactor the failure handling logic to combine similar code with that of // FetchFailed. - val shouldAbortStage = - failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest + val shouldAbortStage = failedStage.failedAttemptIds.size >= + (maxConsecutiveStageAttempts + failedStage.ignoredDecommissionFailedStage) || + disallowStageRetryForTest || + failedStage.ignoredDecommissionFailedStage > maxIgnoredFailedStageAttempts if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { @@ -1980,6 +2016,18 @@ private[spark] class DAGScheduler( clearCacheLocs() } + /** + * Remove shuffle data mapping when node is decomissioned. + * + * @param host host of the node that is decommissioned + */ + private[scheduler] def handleNodeDecommissioned(host: String) { + logInfo(s"Marking shuffle files lost on the decommissioning host $host") + mapOutputTracker.removeOutputsOnHost(host) + clearCacheLocs() + } + + private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { @@ -2281,6 +2329,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) + case NodeDecommissioned(host) => + dagScheduler.handleNodeDecommissioned(host) + case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 78d458338e8fb..faa3ef3142363 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -88,6 +88,9 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String) extends DAGSchedulerEvent +private[scheduler] case class NodeDecommissioned(host: String) + extends DAGSchedulerEvent + private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala new file mode 100644 index 0000000000000..d3bd230e4059b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/DecommissionTracker.scala @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.HashMap + +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +/** + * DecommissionTracker tracks the list of decommissioned nodes. + * This decommission trackers maintains the decommissioned nodes state. + * Decommission tracker schedules the executor decommission and shuffle + * decommission for that node. + */ +private[scheduler] class DecommissionTracker ( + conf: SparkConf, + executorAllocClient: Option[ExecutorAllocationClient], + dagScheduler: DAGScheduler, + clock: Clock = new SystemClock()) extends Logging { + + def this(sc: SparkContext, + client: Option[ExecutorAllocationClient], + dagScheduler: DAGScheduler) = { + this(sc.conf, client, dagScheduler) + } + + // Decommission thread of node decommissioning!! + private val decommissionThread = + ThreadUtils.newDaemonThreadPoolScheduledExecutor("node-decommissioning-thread", 20) + + // Contains workers hostname which are decommissioning. Added when + // decommissioning event arrives from the AM. And is removed when the + // last node (identified by nodeId) is running again. + private val decommissionHostNameMap = new HashMap[String, NodeDecommissionInfo] + + private val minDecommissionTimeMs = + conf.get(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC)*1000 + + private val executorDecommissionLeasePct = + conf.get(config.GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT) + + private val shuffleDataDecommissionLeasePct = + conf.get(config.GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT) + + /* + * Is the node decommissioned i.e from driver point of + * view the node is considered decommissioned. + */ + def isNodeDecommissioned(hostname: String): Boolean = synchronized { + decommissionHostNameMap.get(hostname) match { + case None => false + case Some(info) => + return info.state == NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED || + info.state == NodeDecommissionState.TERMINATED + } + } + + /* + * Is the node present in decommission Tracker + * This is used by driver for not registering any new executor + * on that node which is present in decommission Tracker and + * neither to assign any new task to that existing executor on that node. + */ + def isNodePresentInDecommissionTracker(hostname: String): Boolean = synchronized { + decommissionHostNameMap.contains(hostname) + } + + /** + * Used for Unit Test + */ + def getDecommissionedNodeState(hostname: String): Option[NodeDecommissionState.Value] = + synchronized { + decommissionHostNameMap.get(hostname) match { + case Some(info) => Some(info.state) + case _ => None + } + } + + /** + * @param delayTime - time after which the node will be terminated + * @param currentTimeMs - Current time in milliseconds + * @return executorDecommissionTimeMs and shuffleDataDecommissionTimeMs + */ + private def getDecommissionTimeOut( + delayTime: Long, + currentTimeMs: Long): (Long, Long) = { + val executorDecommissionTimeMs = + if (executorDecommissionLeasePct > shuffleDataDecommissionLeasePct) { + // if executorDecommissionLeasePct is greater than + // shuffleDataDecommissionLeasePct. In that scenario calculate + // executorDecommissionTimeMs using shuffleDataDecommissionLeasePct + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + } else { + (delayTime * executorDecommissionLeasePct) / 100 + currentTimeMs + } + val shuffleDataDecommissionTimeMs = + if (executorDecommissionLeasePct >= shuffleDataDecommissionLeasePct) { + // Add a delay of one second in shuffleDataDecommissionTimeMs if + // executorDecommissionLeasePct greater than equals to shuffleDataDecommissionLeasePct + // Since we want executor to be decommissioned first + // than after that shuffleDataDecommission + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + 1000 + } else { + (delayTime * shuffleDataDecommissionLeasePct) / 100 + currentTimeMs + } + (executorDecommissionTimeMs, shuffleDataDecommissionTimeMs) + } + + def addNodeToDecommission( + hostname: String, + terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = synchronized { + + val df: SimpleDateFormat = new SimpleDateFormat("yy/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTimeMs) + val curTimeMs = clock.getTimeMillis() + + if (terminationTimeMs <= curTimeMs) { + // Ignoring the decommission request if termination + // time is less than or same as the current time + logWarning(s"Ignoring decommissioning request for host $hostname as" + + s" terminationTimeMs ${terminationTimeMs} is less" + + s" than current time ${curTimeMs}") + return + } + + // Override decommissionHostNameMap in case termination time is less than + // existing the terminationTime in decommissionHostNameMap. + if (decommissionHostNameMap.contains(hostname)) { + val nodeDecommissionInfo = decommissionHostNameMap(hostname) + // There will be no duplicate entry of terminationTimeMs in decommissionHostNameMap + // since the terminationTime is updated only when it is less than the existing termination + // time in decommissionHostNameMap + if (decommissionHostNameMap(hostname).terminationTime <= terminationTimeMs) { + logDebug( + s"""Ignoring decommissioning """ + + s""" request : {"node":"$hostname","reason":"${reason.message}",terminationTime"""" + + s""":"$tDateTime"} current : {"node":"$hostname",$nodeDecommissionInfo}""") + return + } else { + logInfo(s"Updating the termination time to :${terminationTimeMs} in " + + s"decommission tracker for the hostname ${hostname}") + } + } + + val delay = terminationTimeMs - curTimeMs + + var executorDecommissionTimeMs = terminationTimeMs + var shuffleDataDecommissionTimeMs = terminationTimeMs + + // if delay is less than a minDecommissionTime than decommission immediately + if (delay < minDecommissionTimeMs) { + executorDecommissionTimeMs = curTimeMs + // Added the delay of 1 second in case of delay is less than minDecommissionTime + // Since we want executor to be decommissioned first + // than after that shuffleDataDecommission + shuffleDataDecommissionTimeMs = curTimeMs + 1000 + } else { + reason match { + case NodeLoss => + // In Nodeloss case adjust termination time so that enough + // buffer to real termination is available for job to finish + // consuming shuffle data. + var (executorDecommissionTime, shuffleDataDecommissionTime) = getDecommissionTimeOut( + delay, curTimeMs) + executorDecommissionTimeMs = executorDecommissionTime + shuffleDataDecommissionTimeMs = shuffleDataDecommissionTime + case _ => + // No action + } + } + + // Count of executors/worker which went to decommissioning + // state over lifetime of application + DecommissionTracker.incrNodeDecommissionCnt() + val nodeDecommissionInfo = new NodeDecommissionInfo( + executorDecommissionTime = executorDecommissionTimeMs, + shuffleDataDecommissionTime = shuffleDataDecommissionTimeMs, + terminationTime = terminationTimeMs, + reason = reason, + state = NodeDecommissionState.DECOMMISSIONING) + + logInfo(s"""Adding decommissioning""" + + s""" request : {"node":"$hostname",$nodeDecommissionInfo} """) + + // Add node to the list of decommissioning nodes. + decommissionHostNameMap.put(hostname, nodeDecommissionInfo) + + // Schedule executor decommission + decommissionThread.schedule(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + executorDecommission(hostname, nodeDecommissionInfo) + } + }, executorDecommissionTimeMs - curTimeMs, TimeUnit.MILLISECONDS) + + // Schedule shuffle decommission + decommissionThread.schedule(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + removeShuffleData(hostname, nodeDecommissionInfo) + } + }, shuffleDataDecommissionTimeMs - curTimeMs, TimeUnit.MILLISECONDS) + } + + def removeNodeToDecommission(hostname: String): Unit = synchronized { + if (!decommissionHostNameMap.contains(hostname)) { + return + } + + val nodeDecommissionInfo = decommissionHostNameMap(hostname) + logInfo(s"""Removing decommissioning""" + + s""" request : {"node":"$hostname",$nodeDecommissionInfo}""") + decommissionHostNameMap -= hostname + } + + def updateNodeToDecommissionSetTerminate(hostname: String): Unit = synchronized { + terminate(hostname) + } + + private def executorDecommission(hostname: String, + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + // Scenario where nodeLoss terminated the node + // for the Graceful Decommission node. + // If the node is already terminated and hostname is re-used in that scenario + // no need to kill the executor on that host + if (! decommissionHostNameMap.contains(hostname)) { + logInfo(s"""Node $hostname not found in decommisssionTrackerList while""" + + """performing executor decommission""") + return + } + // if the terminationTime in the thread is not equal to + // terminationTime in decommissionHostNameMap for that + // host than Ignore the ExecutorDecommission + if (decommissionHostNameMap(hostname).terminationTime + != nodeDecommissionInfo.terminationTime) { + logInfo(s"Ignoring ExecutorDecommission for hostname ${hostname}," + + s" since node is already terminated") + return + } + + // Kill executor if there still are some running. This call is + // async does not wait for response. Otherwise it may cause + // deadlock between schedulerBacked (ExecutorAllocationManager) + // and this. + executorAllocClient.map(_.killExecutorsOnHost(hostname)) + decommissionHostNameMap(hostname).state = NodeDecommissionState.EXECUTOR_DECOMMISSIONED + logInfo(s"Node $hostname decommissioned") + return + } + + private def removeShuffleData(hostname: String, + nodeDecommissionInfo: NodeDecommissionInfo): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + // This for scenario where about_to_be_lost terminated the node + // for the Graceful Decommission node. + // If the node is already terminated and hostname is reused in that scenario + // no need to remove the shuffle entry from map-output tracker + if (! decommissionHostNameMap.contains(hostname)) { + logInfo(s"""Node $hostname not found in decommisssionTrackerList while """ + + """performing shuffle data decommission""") + return + } + // if the terminationTime in the thread is not equal to + // terminationTime in decommissionHostNameMap for that + // host than Ignore the removeShuffleData + if (decommissionHostNameMap(hostname).terminationTime + != nodeDecommissionInfo.terminationTime) { + logInfo(s"Ignoring removeShuffleData for hostname ${hostname}," + + s" since node is already terminated") + return + } + + // Unregister shuffle data. + dagScheduler.nodeDecommissioned(hostname) + + decommissionHostNameMap(hostname).state = NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED + + logInfo(s"Node $hostname Shuffle data decommissioned") + + return + } + + private def terminate(hostname: String): Unit = { + // Not found, only valid scenario is the nodes + // has moved back to running state + if (!decommissionHostNameMap.contains(hostname)) { + logWarning(s"Node $hostname not found in decommisssionTrackerList") + return + } + + // Remove all the shuffle data of all the executors for the terminated node + dagScheduler.nodeDecommissioned(hostname) + + decommissionHostNameMap(hostname).state = NodeDecommissionState.TERMINATED + + logInfo(s"Node $hostname terminated") + } + + def stop (): Unit = { + val decommissionNodeCnt = DecommissionTracker.getNodeDecommissionCnt() + val fetchFailIgnoreCnt = DecommissionTracker.getFetchFailIgnoreCnt() + val fetchFailIgnoreThresholdExceed = DecommissionTracker.getFetchFailIgnoreCntThresholdFlag() + val fetchFailedIgnoreThreshold = + conf.get(config.GRACEFUL_DECOMMISSION_FETCHFAILED_IGNORE_THRESHOLD) + decommissionThread.shutdown() + var message = s"Decommission metrics: ${decommissionNodeCnt} nodes decommissioned" + + s" (either due to node loss or node block rotation) while running ${conf.getAppId}." + + s" Ignored ${fetchFailIgnoreCnt} fetch failed exception caused by decommissioned node." + + if (fetchFailIgnoreThresholdExceed) { + message += s"Fetch fail ignore exceeded threshold ${fetchFailedIgnoreThreshold}" + + s" causing stage abort" + } else { + message += s"Fetch fail ignored under allowed threshold ${fetchFailedIgnoreThreshold}" + } + // logging the metrics related to graceful decommission from decommission tracker + logDebug(message) + } + +} + +private[spark] object DecommissionTracker extends Logging { + val infiniteTime = Long.MaxValue + + // Stats + val decommissionNodeCnt = new AtomicInteger(0) + val fetchFailIgnoreCnt = new AtomicInteger(0) + val fetchFailIgnoreCntThresholdExceeded = new AtomicBoolean(false) + val abortStage = new AtomicBoolean(false) + + def incrNodeDecommissionCnt(): Unit = { + decommissionNodeCnt.getAndIncrement() + } + + def incrFetchFailIgnoreCnt(): Unit = { + fetchFailIgnoreCnt.getAndIncrement() + } + + def setFetchFailIgnoreCntThresholdFlag(thresholdExceedFlag: Boolean): Unit = { + fetchFailIgnoreCntThresholdExceeded.set(thresholdExceedFlag) + } + + def setAbortStageFlag(AbortStageFlag: Boolean): Unit = { + abortStage.set(AbortStageFlag) + } + + def getNodeDecommissionCnt(): Int = { + decommissionNodeCnt.get() + } + + def getFetchFailIgnoreCnt(): Int = { + fetchFailIgnoreCnt.get() + } + + def getFetchFailIgnoreCntThresholdFlag(): Boolean = { + fetchFailIgnoreCntThresholdExceeded.get() + } + + def getAbortStageFlag(): Boolean = { + abortStage.get() + } + + def isDecommissionEnabled(conf: SparkConf): Boolean = { + conf.get(config.GRACEFUL_DECOMMISSION_ENABLE) + } +} + +private class NodeDecommissionInfo( + var terminationTime: Long, + var executorDecommissionTime: Long, + var shuffleDataDecommissionTime: Long, + var state: NodeDecommissionState.Value, + var reason: NodeDecommissionReason) { + override def toString(): String = { + val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTime) + val edDateTime = df.format(executorDecommissionTime) + val sdDateTime = df.format(shuffleDataDecommissionTime) + s""""terminationTime":"$tDateTime","reason":"${reason.message}","executorDecommissionTime"""" + + s""":"$edDateTime","shuffleDataDecommissionTime":"$sdDateTime","state":"$state"""" + } +} + +/* + * NB: exposed for testing + */ +private[scheduler] object NodeDecommissionState extends Enumeration { + val DECOMMISSIONING, EXECUTOR_DECOMMISSIONED, SHUFFLEDATA_DECOMMISSIONED, TERMINATED = Value + type NodeDecommissionState = Value +} + +/** + * Represents an explanation for a Node being decommissioned. + * NB: exposed for testing + */ +@DeveloperApi +private[spark] sealed trait NodeDecommissionReason extends Serializable { + def message: String +} + +@DeveloperApi +private[spark] case object NodeLoss extends NodeDecommissionReason { + override def message: String = "nodeLoss" +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index ae7924d66a301..c892daab67e85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -91,8 +91,15 @@ private[scheduler] abstract class Stage( */ val failedAttemptIds = new HashSet[Int] + /** + * Number of times the stage failure is ignored. e.g failed due to fetch failed + * exception caused by node decommissioning. + */ + var ignoredDecommissionFailedStage = 0 + private[scheduler] def clearFailures() : Unit = { failedAttemptIds.clear() + ignoredDecommissionFailedStage = 0 } /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2c37fec271766..b2b41ad01dd7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -92,6 +92,10 @@ private[spark] class TaskSchedulerImpl( // because ExecutorAllocationClient is created after this TaskSchedulerImpl. private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc) + // initializing decommissionTracker + // NB: Public for testing + lazy val decommissionTrackerOpt = maybeCreateDecommissionTracker(sc) + val conf = sc.conf // How often to check for speculative tasks @@ -274,7 +278,7 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, decommissionTrackerOpt) } override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized { @@ -896,6 +900,7 @@ private[spark] class TaskSchedulerImpl( } starvationTimer.cancel() abortTimer.cancel() + decommissionTrackerOpt.map(_.stop) } override def defaultParallelism(): Int = backend.defaultParallelism() @@ -1156,4 +1161,22 @@ private[spark] object TaskSchedulerImpl { } } + private def maybeCreateDecommissionTracker(sc: SparkContext): Option[DecommissionTracker] = { + if (DecommissionTracker.isDecommissionEnabled(sc.conf)) { + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b) + case _ => None + } + + val dagScheduler = sc.dagScheduler + if (executorAllocClient.isDefined && dagScheduler != null) { + Some(new DecommissionTracker(sc, executorAllocClient, dagScheduler)) + } else { + None + } + } else { + None + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a302f680a272e..df22544759ad0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -56,6 +56,7 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, blacklistTracker: Option[BlacklistTracker] = None, + decommissionTracker: Option[DecommissionTracker] = None, clock: Clock = new SystemClock()) extends Schedulable with Logging { private val conf = sched.sc.conf @@ -813,6 +814,23 @@ private[spark] class TaskSetManager( if (fetchFailed.bmAddress != null) { blacklistTracker.foreach(_.updateBlacklistForFetchFailure( fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId)) + + // Do account fetch failure exception raised by decommissioned + // node against stage failure. Here the logic is to specify, + // if the task failed due to fetchFailed of decommission nodes than + // don't count towards the stageFailure. countTowardsStageFailures + // variable of TaskEndReason, that can be used in DAGScheduler to account + // fetch failure while checking the stage abort + decommissionTracker match { + case Some(decommissionTracker) => + if (decommissionTracker.isNodeDecommissioned(fetchFailed.bmAddress.host)) { + logInfo(s"Do not count fetch failure from decommissioned" + + s" node ${fetchFailed.bmAddress.host}") + fetchFailed.countTowardsDecommissionStageFailures = false + } + case _ => + // No action + } } None diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala new file mode 100644 index 0000000000000..942810d9fce90 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterInfo.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import java.text.SimpleDateFormat + +import scala.collection.mutable.{HashMap} + +/** + * State of the node. + * Add the node state depending upon the cluster manager, For Yarn + * getYarnNodeState is added to create the node state for Decommission Tracker + */ +private[spark] object NodeState extends Enumeration { + val RUNNING, DECOMMISSIONED, DECOMMISSIONING, LOST, OTHER = Value + type NodeState = Value +} + +/** + * Node information used by CoarseGrainedSchedulerBackend. + * + * @param nodeState node state + * @param terminationTime Time at which node will terminate. + * This optional and will be set only for node state DECOMMISSIONING + */ +private[spark] case class NodeInfo(nodeState: NodeState.Value, terminationTime: Option[Long]) + extends Serializable { + override def toString(): String = { + val df: SimpleDateFormat = new SimpleDateFormat("YY/MM/dd HH:mm:ss") + val tDateTime = df.format(terminationTime.getOrElse(0L)) + s""""terminationTime":"$tDateTime","state":"$nodeState"""" + } +} + +/** + * Cluster status information used by CoarseGrainedSchedulerBackend. + * + * @param nodeInfos Information about node about to be decommissioned + * resource in the cluster. + */ +private[spark] case class ClusterInfo( + nodeInfos: HashMap[String, NodeInfo]) + extends Serializable { + override def toString(): String = { + + var nodeInfoStr = "" + nodeInfos.foreach { + case (k, v) => + nodeInfoStr = nodeInfoStr + s"""{"node":"$k",$v}""" + } + if (nodeInfoStr == "") { + s"""{"nodeInfos":{}}""" + } else { + s"""{"nodeInfos":$nodeInfoStr}""" + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 465c0d20de481..8eb5ac4d9fab9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorLossReason +import org.apache.spark.scheduler.{ExecutorLossReason, NodeDecommissionReason} import org.apache.spark.util.SerializableBuffer private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable @@ -96,6 +96,17 @@ private[spark] object CoarseGrainedClusterMessages { case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class NodeLossNotification(nodeIds: Seq[String]) + + case class AddNodeToDecommission(hostname: String, decommissionTimeInSeconds: Long, + reason: NodeDecommissionReason) extends CoarseGrainedClusterMessage + + case class RemoveNodeToDecommission(hostname: String) + extends CoarseGrainedClusterMessage + + case class UpdateNodeToDecommissionSetTerminate(hostname: String) + extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage @@ -106,6 +117,11 @@ private[spark] object CoarseGrainedClusterMessages { filterName: String, filterParams: Map[String, String], proxyBase: String) extends CoarseGrainedClusterMessage + // cluster info update notification + case class ClusterInfoUpdate(clusterInfo: ClusterInfo) + extends CoarseGrainedClusterMessage + + // Messages exchanged between the driver and the cluster manager for executor allocation // In Yarn mode, these are exchanged between the driver and the AM diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5f9593c..5c92b2a3e6693 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -57,6 +57,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Total number of executors that are currently registered protected val totalRegisteredExecutors = new AtomicInteger(0) protected val conf = scheduler.sc.conf + // Invoked lazily to prevent creating the DecommissionTracker before + // ExecutorAllocationClient and DAGScheduler + protected lazy val decommissionTracker = scheduler.decommissionTrackerOpt private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) // Submit tasks only after (registered resources / total expected resources) @@ -203,6 +206,32 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp data.freeCores = data.totalCores } makeOffers(executorId) + + case AddNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) => + decommissionTracker.foreach { tracker => + tracker.addNodeToDecommission(hostname, terminationTimeMs, nodeDecommissionReason) + // handle the cached RDD on Decommission nodes + val nodeDecommissionState = tracker.getDecommissionedNodeState(hostname) + if(nodeDecommissionState.isDefined && + !nodeDecommissionState.contains(NodeDecommissionState.TERMINATED)) { + val exe = scheduler.getExecutorsAliveOnHost(hostname) + exe match { + case Some(set) => + for (e <- set) { + moveCachedRddFromDecommissionExecutor(e) + } + case None => logInfo("There is active no executor available" + + " in decommission node for moving the cached RDD") + } + } + } + + case RemoveNodeToDecommission(hostname) => + decommissionTracker.foreach(_.removeNodeToDecommission(hostname)) + + case UpdateNodeToDecommissionSetTerminate(hostname) => + decommissionTracker.foreach(_.updateNodeToDecommissionSetTerminate(hostname)) + case e => logError(s"Received unexpected message. ${e}") } @@ -220,6 +249,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) + } else if (isNodePresentInDecommissionTracker(hostname)) { + // Refuse any new executor registered from the decommissioning worker. Can only happen + // in case of spot loss nodes about to be lost. For nodes gracefuly decommissioning this + // won't happen. + logInfo(s"Rejecting executor:$executorId registration on decommissioning node:$hostname") + context.sendFailure(new IllegalStateException( + s"Executor host is decommissioning: $executorId")) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -295,6 +331,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val taskDescs = withLock { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(isExecutorActive) + .filter(x => !isNodePresentInDecommissionTracker(x._2.executorHost)) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, @@ -322,9 +359,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def makeOffers(executorId: String): Unit = { // Make sure no executor is killed while some task is launching on it val taskDescs = withLock { + val executorData = executorDataMap(executorId) // Filter out executors under killing - if (isExecutorActive(executorId)) { - val executorData = executorDataMap(executorId) + // add the filter to check executor doesnot belong to + // decommission nodes + if (isExecutorActive(executorId) && + !isNodePresentInDecommissionTracker(executorData.executorHost)) { val workOffers = IndexedSeq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort), @@ -438,25 +478,29 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(s"Unexpected error during decommissioning ${e.toString}", e) } logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } + moveCachedRddFromDecommissionExecutor(executorId) } else { logInfo(s"Skipping decommissioning of executor $executorId.") } shouldDisable } + // Move cached Rdd from Decommission executors + private def moveCachedRddFromDecommissionExecutor(executorId: String): Unit = { + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo("Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -851,6 +895,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def isBlacklisted(executorId: String, hostname: String): Boolean = false + /** + * Add nodes to decommission list, time in ms at which node will be decommissioned. + * NB: Public for testing + * + * @param hostname hostname of worker to be decommissioned. + * @param terminationTimeMs time after which node will be forcefully terminated. + */ + protected def addNodeToDecommission(hostname: String, terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = { None } + // SPARK-27112: We need to ensure that there is ordering of lock acquisition // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix // the deadlock issue exposed in SPARK-27112 @@ -858,6 +912,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp CoarseGrainedSchedulerBackend.this.synchronized { fn } } + /** + * Remover worker from the decommission map. + * NB: Public for testing + * + * @hostname hostname of worker to be removed from decommissioned list. + */ + def removeNodeToDecommission(hostname: String): Unit = { None } + + /** + * Mark node as terminated. + * NB: Public for testing + * + * @hostname hostname of worker to be marked as terminated. + */ + def updateNodeToDecommissionSetTerminate(hostname: String): Unit = { None } + + + private def isNodePresentInDecommissionTracker(hostname: String): Boolean = { + decommissionTracker match { + case None => return false + case Some(decommissionTracker) => + return decommissionTracker.isNodePresentInDecommissionTracker(hostname) + } + } + } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 312691302b064..945a74fabc87f 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -77,6 +77,7 @@ class HeartbeatReceiverSuite scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]()) + when(scheduler.decommissionTrackerOpt).thenReturn(None) when(scheduler.sc).thenReturn(sc) heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala new file mode 100644 index 0000000000000..4c0a5b5dc1e7d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/DecommissionTrackerSuite.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark._ +import org.apache.spark.internal.config +import org.apache.spark.util.ManualClock + +class DecommissionTrackerSuite extends SparkFunSuite + with BeforeAndAfter with BeforeAndAfterEach with MockitoSugar + with LocalSparkContext { + + private var decommissionTracker: DecommissionTracker = _ + private var dagScheduler : DAGScheduler = _ + private var executorAllocationClient : ExecutorAllocationClient = _ + private var conf: SparkConf = _ + private var currentDecommissionNode = "" + private var currentShuffleDecommissionNode = "" + + override def beforeAll(): Unit = { + conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.GRACEFUL_DECOMMISSION_ENABLE.key, "true") + .set(config.GRACEFUL_DECOMMISSION_MIN_TERMINATION_TIME_IN_SEC.key, "8s") + + executorAllocationClient = mock[ExecutorAllocationClient] + dagScheduler = mock[DAGScheduler] + } + + override def afterAll(): Unit = { + if (decommissionTracker != null) { + decommissionTracker = null + } + } + + before { + val clock = new ManualClock(0) + clock.setTime(0) + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), dagScheduler, clock = clock) + + when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + when(dagScheduler.nodeDecommissioned(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentShuffleDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + } + + after { + decommissionTracker.removeNodeToDecommission("hostA") + decommissionTracker.removeNodeToDecommission("hostB") + super.afterEach() + } + + test("Check Node Decommission state") { + val clock = new ManualClock(0) + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), dagScheduler, clock = clock) + + when(executorAllocationClient.killExecutorsOnHost(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + when(dagScheduler.nodeDecommissioned(anyString())).thenAnswer(new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + currentShuffleDecommissionNode = invocation.getArgument(0, classOf[String]) + true + } + }) + + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + + // Wait for executor decommission give 100ms over 50% + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for and terminate + Thread.sleep(2000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recomission + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + } + + test("Check the Multiple Node Decommission state") { + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + decommissionTracker.addNodeToDecommission("hostB", 30000, NodeLoss) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostB")) + + // Wait for hostA executor decommission + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for hostA shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for hostA termination and trigger termination + Thread.sleep(1000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission hostA + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + + // Wait for hostB executor decommission + Thread.sleep(5000) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostB") + + // Wait for hostB shuffledata decommission + Thread.sleep(12000) + assert(decommissionTracker.isNodeDecommissioned("hostB")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostB") + + // Wait for hostB termination and trigger termination + Thread.sleep(3000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostB") + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission hostB + decommissionTracker.removeNodeToDecommission("hostB") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostB")) + } + + test("Check Multiple Node Decommission state at same time") { + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + decommissionTracker.addNodeToDecommission("hostB", 10000, NodeLoss) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostB")) + + // Wait for both hostA hostB executor decommission + Thread.sleep(5100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + + // Wait for both hostA hostB shuffle data decommission + Thread.sleep(4000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(decommissionTracker.isNodeDecommissioned("hostB")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + + // Wait for both node termination and trigger termination + Thread.sleep(2000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostB") + assert(decommissionTracker.getDecommissionedNodeState( + "hostB") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission both + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + decommissionTracker.removeNodeToDecommission("hostB") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostB")) + } + + test("Check Node decommissioning with minimum termination time config") { + // Less than MIN_TERMINATION_TIME + decommissionTracker.addNodeToDecommission("hostA", 5000, NodeLoss) + // sleep for 1 sec which is lag between executor and shuffle + // decommission. + Thread.sleep(1100) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + } + + test("Check Node Decommissioning with non-default executor and shuffle data lease time config") { + + val clock = new ManualClock(0) + clock.setTime(0) + // override and recreated + conf = conf + .set(config.GRACEFUL_DECOMMISSION_EXECUTOR_LEASETIME_PCT.key, "20") + .set(config.GRACEFUL_DECOMMISSION_SHUFFLEDATA_LEASETIME_PCT.key, "50") + + decommissionTracker = new DecommissionTracker(conf, + Some(executorAllocationClient), dagScheduler, clock = clock) + + decommissionTracker.addNodeToDecommission("hostA", 10000, NodeLoss) + assert(decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + + // Wait for hostA executor decommission at 20% + Thread.sleep(2100) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.EXECUTOR_DECOMMISSIONED)) + assert(currentDecommissionNode === "hostA") + + // Wait for hostA shuffle data decommission at 50% + Thread.sleep(3000) + assert(decommissionTracker.isNodeDecommissioned("hostA")) + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.SHUFFLEDATA_DECOMMISSIONED)) + assert(currentShuffleDecommissionNode === "hostA") + + // Wait for terminate and trigger termination + Thread.sleep(5000) + decommissionTracker.updateNodeToDecommissionSetTerminate("hostA") + assert(decommissionTracker.getDecommissionedNodeState( + "hostA") === Some(NodeDecommissionState.TERMINATED)) + + // Recommission + decommissionTracker.removeNodeToDecommission("hostA") + assert(!decommissionTracker.isNodePresentInDecommissionTracker("hostA")) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a75bae56229b4..d85153800923f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -203,7 +203,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) { override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { // Don't shuffle the offers around for this test. Instead, we'll just pass in all @@ -1337,7 +1337,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B offers } override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } } // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. @@ -1377,7 +1377,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val clock = new ManualClock() val taskScheduler = new TaskSchedulerImpl(sc) { override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { - new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock = clock) } } // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index e4aad58d25064..777b6d6ec3234 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -493,7 +493,8 @@ class TaskSetManagerSuite // within the taskset. val mockListenerBus = mock(classOf[LiveListenerBus]) val blacklistTrackerOpt = Some(new BlacklistTracker(mockListenerBus, conf, None, clock)) - val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) + val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, + clock = clock) { val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 diff --git a/docs/configuration.md b/docs/configuration.md index 420942f7b7bbb..c4d8877cc898c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2146,6 +2146,72 @@ Apart from these, the following properties are also available, and may be useful 3.1.0 + + spark.graceful.decommission.enable + false + + If set to "true", Spark will handle the node decommissioning gracefully + for YARN Resource Manager. + + 3.1.0 + + + spark.graceful.decommission.node.timeout + 110s + + Interval in seconds after which the node is decommissioned. In case aws spotloss + the time is approximately 110s and in case of GCP preemptible VMs this is around 30s + this config can be changed according to node type in the public cloud. This will + be applied if the decommission timeout is not sent by the Resource Manager. + + 3.1.0 + + + spark.graceful.decommission.fetchfailed.ignore.threshold + 8 + + Threshold of number of times fetchfailed ignored due to node decommission. + This is configurable as per the need of the user and depending upon + type of the cloud. If we keep this a large value and there is + continuous decommission of nodes, in those scenarios stage + will never abort and keeps on retrying in an unbounded manner. + + 3.1.0 + + + spark.graceful.decommission.executor.leasetimePct + 50 + + Percentage of time to expiry after which executors are killed + (if enabled) on the node. Value ranges between (0-100). + + 3.1.0 + + + spark.graceful.decommission.shuffedata.leasetimePct + 90 + + Percentage of time to expiry after which shuffle data + cleaned up (if enabled) on the node. Value ranges between (0-100) + This value is always greater than or equal to executor + leaseTime (is set to be equal if incorrectly configured). + Near 0% would mean generated data is marked as lost too early. + Too close to 100 would shuffle data may not get cleared proactively + leading to tasks going into fetchFail scenarios. + + 3.1.0 + + + spark.graceful.decommission.min.termination.time + 60s + + Minimum time to termination below which node decommissioning is performed immediately. + If decommissioning time is less than the configured time(spark.graceful.decommission.min.termination.time), + than in that scenario the executor decommissioning and shuffle data clean up will take place + immediately. First the executor decommission than the shuffle data clean up. + + 3.1.0 + spark.scheduler.blacklist.unschedulableTaskSetTimeout 120s diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index cd0e7d5c87bc8..ad38b7da10a00 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -24,9 +24,12 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.util.{Failure, Success} import scala.util.control.NonFatal +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.records.{NodeState => YarnNodeState} import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -42,9 +45,8 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId -import org.apache.spark.scheduler.cluster.SchedulerBackendUtils +import org.apache.spark.scheduler.cluster.{ClusterInfo, NodeInfo, NodeState, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{ClusterInfoUpdate, NodeLossNotification, RemoveExecutor, RetrieveLastAllocatedExecutorId} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -162,6 +164,17 @@ private[yarn] class YarnAllocator( private val allocatorBlacklistTracker = new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker) + // Cluster info is information about cluster passed from AM -> Driver. + // we keep it cached on the AM side till it is sent to driver and is + // cleared once successfully sent. On failure, the failed message is + // sent in next cycle. + // Visible for testing + private val currentClusterInfo = ClusterInfo(new HashMap[String, NodeInfo]) + + // Interval in seconds after which the node is decommissioned after this time node + // is not available to use. + private val nodeLossInterval = sparkConf.get(GRACEFUL_DECOMMISSION_NODE_TIMEOUT) + // Executor memory in MiB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Executor offHeap memory in MiB. @@ -436,6 +449,72 @@ private[yarn] class YarnAllocator( logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, getNumExecutorsRunning)) } + + // If the flags is enabled than GRACEFUL_DECOMMISSION_ENABLE + // than handling the Node loss scenario using the decommission tracker. + if (sparkConf.get(GRACEFUL_DECOMMISSION_ENABLE)) { + processGracefulDecommission(allocateResponse) + } + } + + // Helper method to get NodeState of the Yarn. + def getYarnNodeState(state: YarnNodeState): NodeState.Value = { + // In hadoop-2.7 there is no support for node state DECOMMISSIONING + // In Hadoop-2.8, hadoop3.1 and later version of spark there is a support + // to node state DECOMMISSIONING. + // Inorder to build the spark using hadoop2 and hadoop3, not + // using YarnNodeState for the node state DECOMMISSIONING here and + // and for other state we are matching the YarnNodeState and assigning + // the node state at spark end + if (state.toString.equals(NodeState.DECOMMISSIONING.toString)) { + NodeState.DECOMMISSIONING + } else { + state match { + case YarnNodeState.RUNNING => NodeState.RUNNING + case YarnNodeState.DECOMMISSIONED => NodeState.DECOMMISSIONED + case YarnNodeState.LOST => NodeState.LOST + case YarnNodeState.UNHEALTHY => NodeState.LOST + case _ => NodeState.OTHER + } + } + } + + def processGracefulDecommission(allocateResponse: AllocateResponse): Unit = { + // Create a consolidated node decommission info report. + val nodeInfos = new HashMap[String, NodeInfo] + + // node with updated information. + val getUpdatedNodes = allocateResponse.getUpdatedNodes() + if (getUpdatedNodes != null) { + val updatedNodes = getUpdatedNodes.asScala + for (x <- updatedNodes) { + if (x.getNodeState.toString.equals(NodeState.DECOMMISSIONING.toString)) { + // In hadoop 2.7 there is no support getDecommissioningTimeout whereas + // In hadoop 3.1 and later version of hadoop there is support + // of getDecommissioningTimeout So the method call made using reflection + // to update the value nodeTerminationTime and for lower version of hadoop2.7 + // use the config spark.graceful.decommission.node.timeout which is specific to cloud + var nodeTerminationTime = clock.getTimeMillis() + nodeLossInterval * 1000 + try { + val decommiossioningTimeout = x.getClass.getMethod( + "getDecommissioningTimeout").invoke(x).asInstanceOf[Integer] + if (decommiossioningTimeout != null) { + nodeTerminationTime = clock.getTimeMillis() + decommiossioningTimeout * 1000 + } + } catch { + case e: NoSuchMethodException => logDebug(e.toString) + } + nodeInfos(x.getNodeId().getHost()) + = NodeInfo(terminationTime = Some(nodeTerminationTime), + nodeState = getYarnNodeState(x.getNodeState())) + } else { + nodeInfos(x.getNodeId().getHost()) + = NodeInfo(terminationTime = None, + nodeState = getYarnNodeState(x.getNodeState())) + } + } + } + processClusterInfo(ClusterInfo(nodeInfos = nodeInfos)) } /** @@ -896,6 +975,22 @@ private[yarn] class YarnAllocator( } } + private[yarn] def processClusterInfo(clusterInfo: ClusterInfo): Unit = { + + // Update cached currentClusterInfo. + clusterInfo.nodeInfos.foreach{case(k, v) => currentClusterInfo.nodeInfos(k) = v} + + logDebug(s"clusterInfoUpdate: full sync $currentClusterInfo") + + driverRef.ask[Boolean](ClusterInfoUpdate(currentClusterInfo)).andThen { + case Success(b) => currentClusterInfo.nodeInfos.clear() // Clear cached data + case Failure(f) => logInfo(s"clusterInfoUpdate: sync failed ($f)." + + s" Will be synced in next cycle") + }(ThreadUtils.sameThread) + // Since the time taken to complete is small , so used the single thread here. + } + + /** * Register that some RpcCallContext has asked the AM why the executor was lost. Note that * we can only find the loss reason to send back in the next call to allocateResources(). diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index e428bab4f96f3..deac6d17bc4e0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster import java.util.EnumSet -import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent.atomic.AtomicBoolean import javax.servlet.DispatcherType import scala.concurrent.ExecutionContext.Implicits.global @@ -36,8 +36,9 @@ import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.NodeDecommissionReason import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{Clock, RpcUtils, SystemClock, ThreadUtils} /** * Abstract Yarn scheduler backend that contains common logic @@ -210,6 +211,70 @@ private[spark] abstract class YarnSchedulerBackend( Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, driverEndpoint)) } + /** + * Process cluster info update recieved from AM + * + */ + protected def processClusterInfoUpdate(clusterInfo: ClusterInfo): Boolean = { + + // NB: In case of clusterInfoFullSync nodes decommissioning info may be + // received multiple times but that is not an issue as long as information + // is correct. Actions here are idempotent. + + clusterInfo.nodeInfos.foreach { case (node, nodeInfo) => + nodeInfo.nodeState match { + + case NodeState.DECOMMISSIONING => + if (nodeInfo.terminationTime.isDefined && nodeInfo.terminationTime.get > 0) { + // eg. Spot Loss/ preemptible VMs loss case + addNodeToDecommission(node, nodeInfo.terminationTime.get, NodeLoss) + } + + case NodeState.RUNNING => + // If node reused (identified by hostname), + // This is the scenario of hostname reused + removeNodeToDecommission(node) + + case NodeState.DECOMMISSIONED | NodeState.LOST => + updateNodeToDecommissionSetTerminate(node) + + // NodeState.OTHER + case _ => + // ignore other states + } + } + true + } + + /** + * Add nodes to decommission list, time at which node will be terminated. + */ + override def addNodeToDecommission(hostname: String, terminationTimeMs: Long, + reason: NodeDecommissionReason): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(AddNodeToDecommission(hostname, terminationTimeMs, reason)) + } + } + + /** + * Remove node from the decommission map. + */ + override def removeNodeToDecommission(hostname: String): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(RemoveNodeToDecommission(hostname)) + } + } + + /** + * Update node to mark it terminated + */ + override def updateNodeToDecommissionSetTerminate(hostname: String): Unit = { + if (driverEndpoint != null) { + driverEndpoint.send(UpdateNodeToDecommissionSetTerminate(hostname)) + } + } + + /** * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected. * This endpoint communicates with the executors and queries the AM for an executor's exit @@ -319,6 +384,10 @@ private[spark] abstract class YarnSchedulerBackend( case RetrieveDelegationTokens => context.reply(currentDelegationTokens) + + case ClusterInfoUpdate(clusterInfo) => + logDebug(s"clusterInfoUpdate received at YarnSchedulerEndpoint : $clusterInfo") + context.reply(processClusterInfoUpdate(clusterInfo)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = {