Skip to content

Commit

Permalink
parallely replicate blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed Apr 23, 2020
1 parent 9c6bdb6 commit bb324f9
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,9 @@ package object config {
private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
.doc("Maximum number of failures to tolerate for offloading " +
"one block in single decommission cache blocks iteration")
.doc("Maximum number of failures which can be handled for the replication of " +
"one RDD block when block manager is decommissioning and trying to move its " +
"existing blocks.")
.version("3.1.0")
.intConf
.createWithDefault(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError("Unexpected error during block manager " +
s"decommissioning for executor $executorId: ${e.toString}", e)
}
logInfo(s"Finished decommissioning block manager corresponding to $executorId.")
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
}
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ private[spark] class BlockManager(
* Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
* Visible for testing
*/
def offloadRddCacheBlocks(): Unit = {
def decommissionRddCacheBlocks(): Unit = {
val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)

if (replicateBlocksInfo.nonEmpty) {
Expand All @@ -1803,8 +1803,9 @@ private[spark] class BlockManager(
config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)

// TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
// so that we end up prioritize them over each other
val blocksFailedReplication = replicateBlocksInfo.filterNot {
// so that we end up prioritize them over each other
val blocksFailedReplication = ThreadUtils.parmap(
replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
val replicatedSuccessfully = replicateBlock(
blockId,
Expand All @@ -1818,8 +1819,8 @@ private[spark] class BlockManager(
} else {
logWarning(s"Failed to offload block $blockId")
}
replicatedSuccessfully
}
(blockId, replicatedSuccessfully)
}.filterNot(_._2).map(_._1)
if (blocksFailedReplication.nonEmpty) {
logWarning("Blocks failed replication in cache decommissioning " +
s"process: ${blocksFailedReplication.mkString(",")}")
Expand Down Expand Up @@ -1905,7 +1906,7 @@ private[spark] class BlockManager(
while (blockManagerDecommissioning && !stopped) {
try {
logDebug("Attempting to replicate all cached RDD blocks")
offloadRddCacheBlocks()
decommissionRddCacheBlocks()
logInfo("Attempt to replicate all cached blocks done")
val sleepInterval = conf.get(
config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,5 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
// should have same value like before
assert(sleepyRdd.count() === 10)
assert(accum.value === 10)

// all cache block should have been shifted from decommissioned block manager
// after some time
Thread.sleep(1000)
val storageStatus = sc.env.blockManager.master.getStorageStatus
val execIdToBlocksMapping = storageStatus.map(
status => (status.blockManagerId.executorId, status.blocks)).toMap
// No cached blocks should be present on executor which was decommissioned
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq())
// There should still be all 10 RDD blocks cached
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2))
}

test("test replicateRddCacheBlocks should offload all cached blocks") {
test("test decommissionRddCacheBlocks should offload all cached blocks") {
val store1 = makeBlockManager(2000, "exec1")
val store2 = makeBlockManager(2000, "exec2")
val store3 = makeBlockManager(2000, "exec3")
Expand All @@ -1737,13 +1737,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations(blockId).size === 2)
assert(master.getLocations(blockId).contains(store1.blockManagerId))

store1.offloadRddCacheBlocks()
store1.decommissionRddCacheBlocks()
assert(master.getLocations(blockId).size === 2)
assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId,
store3.blockManagerId))
}

test("test replicateRddCacheBlocks should keep the block if it is not able to offload") {
test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") {
val store1 = makeBlockManager(12000, "exec1")
val store2 = makeBlockManager(2000, "exec2")

Expand All @@ -1757,7 +1757,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId))

store1.offloadRddCacheBlocks()
store1.decommissionRddCacheBlocks()
// Smaller block offloaded to store2
assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
// Larger block still present in store1 as it can't be offloaded
Expand Down

0 comments on commit bb324f9

Please sign in to comment.