Skip to content

Commit

Permalink
[SPARK-20732][CORE] Decommission cache blocks to other executors when…
Browse files Browse the repository at this point in the history
… an executor is decommissioned

### What changes were proposed in this pull request?
After changes in SPARK-20628, CoarseGrainedSchedulerBackend can decommission an executor and stop assigning new tasks on it. We should also decommission the corresponding blockmanagers in the same way. i.e. Move the cached RDD blocks from those executors to other active executors.

### Why are the changes needed?
We need to gracefully decommission the block managers so that the underlying RDD cache blocks are not lost in case the executors are taken away forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its good to save as much cache data as possible.

Also In future once the decommissioning signal comes from Cluster Manager (say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to downscale the executors faster by making the executors free of cache data.

Note that this is a best effort approach. We try to move cache blocks from decommissioning executors to active executors. If the active executors don't have free resources available on them for caching, then the decommissioning executors will keep the cache block which it was not able to move and it will still be able to serve them.

Current overall Flow:

1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. On receiving the signal, it do 2 things - Stop assigning new tasks (SPARK-20628), Send another message to BlockManagerMasterEndpoint (via BlockManagerMaster) to decommission the corresponding BlockManager.

2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. On receiving this, it moves the corresponding block managers to "decommissioning" state. All decommissioning BMs are excluded from the getPeers RPC call which is used for replication. All these decommissioning BMs are also sent message from BlockManagerMasterEndpoint to start decommissioning process on themselves.

3. BlockManager on worker (say BM-x) receives the "DecommissionBlockManager" message. Now it will start BlockManagerDecommissionManager thread to offload all the RDD cached blocks. This thread can make multiple reattempts to decommission the existing cache blocks (multiple reattempts might be needed as there might not be sufficient space in other active BMs initially).

### Does this PR introduce any user-facing change?
NO

### How was this patch tested?
Added UTs.

Closes #28370 from prakharjain09/SPARK-20732-rddcache-1.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
  • Loading branch information
prakharjain09 authored and holdenk committed May 18, 2020
1 parent b3686a7 commit c560428
Show file tree
Hide file tree
Showing 10 changed files with 409 additions and 13 deletions.
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,34 @@ package object config {
.intConf
.createWithDefault(1)

private[spark] val STORAGE_DECOMMISSION_ENABLED =
ConfigBuilder("spark.storage.decommission.enabled")
.doc("Whether to decommission the block manager when decommissioning executor")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
.doc("Maximum number of failures which can be handled for the replication of " +
"one RDD block when block manager is decommissioning and trying to move its " +
"existing blocks.")
.version("3.1.0")
.intConf
.createWithDefault(3)

private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
.internal()
.doc("The interval of time between consecutive cache block replication reattempts " +
"happening on each decommissioning executor (due to storage decommissioning).")
.version("3.1.0")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "Time interval between two consecutive attempts of " +
"cache block replication should be positive.")
.createWithDefaultString("30s")

private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")

if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
try {
logInfo("Starting decommissioning block manager corresponding to " +
s"executor $executorId.")
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
} catch {
case e: Exception =>
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
Expand Down Expand Up @@ -574,7 +587,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
logInfo("Propegating executor decommission to driver.")
logInfo("Propagating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
Expand Down Expand Up @@ -658,7 +671,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
* @param resourceProfileToNumExecutors The total number of executors we'd like to have per
* @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
* ResourceProfile. The cluster manager shouldn't kill any
* running executor to reach this number, but, if all
* existing executors were to die, this is the number
Expand Down
141 changes: 132 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
Expand Down Expand Up @@ -241,6 +242,9 @@ private[spark] class BlockManager(

private var blockReplicationPolicy: BlockReplicationPolicy = _

private var blockManagerDecommissioning: Boolean = false
private var decommissionManager: Option[BlockManagerDecommissionManager] = None

// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
Expand Down Expand Up @@ -1551,30 +1555,36 @@ private[spark] class BlockManager(
}

/**
* Called for pro-active replenishment of blocks lost due to executor failures
* Replicates a block to peer block managers based on existingReplicas and maxReplicas
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
* @param maxReplicationFailures number of replication failures to tolerate before
* giving up.
* @return whether block was successfully replicated or not
*/
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit = {
maxReplicas: Int,
maxReplicationFailures: Option[Int] = None): Boolean = {
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
blockInfoManager.lockForReading(blockId).foreach { info =>
blockInfoManager.lockForReading(blockId).forall { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
useMemory = info.level.useMemory,
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
// we know we are called as a result of an executor removal, so we refresh peer cache
// this way, we won't try to replicate to a missing executor with a stale reference
// we know we are called as a result of an executor removal or because the current executor
// is getting decommissioned. so we refresh peer cache before trying replication, we won't
// try to replicate to a missing executor/another decommissioning executor
getPeers(forceFetch = true)
try {
replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
replicate(
blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLockAndDispose(blockId, data)
Expand All @@ -1591,9 +1601,11 @@ private[spark] class BlockManager(
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
existingReplicas: Set[BlockManagerId] = Set.empty,
maxReplicationFailures: Option[Int] = None): Boolean = {

val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
Expand All @@ -1617,7 +1629,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)

while(numFailures <= maxReplicationFailures &&
while(numFailures <= maxReplicationFailureCount &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
Expand All @@ -1641,6 +1653,10 @@ private[spark] class BlockManager(
peersForReplication = peersForReplication.tail
peersReplicatedTo += peer
} catch {
// Rethrow interrupt exception
case e: InterruptedException =>
throw e
// Everything else we may retry
case NonFatal(e) =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
peersFailedToReplicateTo += peer
Expand All @@ -1665,9 +1681,11 @@ private[spark] class BlockManager(
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
return false
}

logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
return true
}

/**
Expand Down Expand Up @@ -1761,6 +1779,60 @@ private[spark] class BlockManager(
blocksToRemove.size
}

def decommissionBlockManager(): Unit = {
if (!blockManagerDecommissioning) {
logInfo("Starting block manager decommissioning process")
blockManagerDecommissioning = true
decommissionManager = Some(new BlockManagerDecommissionManager(conf))
decommissionManager.foreach(_.start())
} else {
logDebug("Block manager already in decommissioning state")
}
}

/**
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def decommissionRddCacheBlocks(): Unit = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
"for block manager decommissioning")
} else {
logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate")
return
}

// Maximum number of storage replication failure which replicateBlock can handle
val maxReplicationFailures = conf.get(
config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
// so that we end up prioritize them over each other
val blocksFailedReplication = replicateBlocksInfo.map {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(
blockId,
existingReplicas.toSet,
maxReplicas,
maxReplicationFailures = Some(maxReplicationFailures))
if (replicatedSuccessfully) {
logInfo(s"Block $blockId offloaded successfully, Removing block now")
removeBlock(blockId)
logInfo(s"Block $blockId removed")
} else {
logWarning(s"Failed to offload block $blockId")
}
(blockId, replicatedSuccessfully)
}.filterNot(_._2).map(_._1)
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
}
}

/**
* Remove all blocks belonging to the given broadcast.
*/
Expand Down Expand Up @@ -1829,7 +1901,58 @@ private[spark] class BlockManager(
data.dispose()
}

/**
* Class to handle block manager decommissioning retries
* It creates a Thread to retry offloading all RDD cache blocks
*/
private class BlockManagerDecommissionManager(conf: SparkConf) {
@volatile private var stopped = false
private val sleepInterval = conf.get(
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)

private val blockReplicationThread = new Thread {
override def run(): Unit = {
var failures = 0
while (blockManagerDecommissioning
&& !stopped
&& !Thread.interrupted()
&& failures < 20) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
decommissionRddCacheBlocks()
logInfo("Attempt to replicate all cached blocks done")
Thread.sleep(sleepInterval)
} catch {
case _: InterruptedException =>
logInfo("Interrupted during migration, will not refresh migrations.")
stopped = true
case NonFatal(e) =>
failures += 1
logError("Error occurred while trying to replicate cached RDD blocks" +
s" for block manager decommissioning (failure count: $failures)", e)
}
}
}
}
blockReplicationThread.setDaemon(true)
blockReplicationThread.setName("block-replication-thread")

def start(): Unit = {
logInfo("Starting block replication thread")
blockReplicationThread.start()
}

def stop(): Unit = {
if (!stopped) {
stopped = true
logInfo("Stopping block replication thread")
blockReplicationThread.interrupt()
}
}
}

def stop(): Unit = {
decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Decommission block managers corresponding to given set of executors */
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
}

/** Get Replication Info for all the RDD blocks stored in given blockManagerId */
def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
}

/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
Expand Down
Loading

0 comments on commit c560428

Please sign in to comment.