Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2582. Make Block Manager Master pluggable. #1506

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,15 @@ object SparkEnv extends Logging {
"MapOutputTracker",
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
val blockManagerMasterType = conf.get("spark.blockmanager.type", "standalone")
var blockManagerMaster: BlockManagerMaster = null
blockManagerMasterType match {
case _ =>
// Since currently only one option exists, this is what is to be done in any case.
blockManagerMaster = new StandaloneBlockManagerMaster(registerOrLookup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I want to use different BlockManagerMaster implementation I have to modified this code to support different type?

It is preferable to use fully classified class name to allow pluggability so as long as the Class implementation is in the classpath then it should be able to use the different implementation of the BlockManagerMaster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that - but doing that does not allow us a way to force the implementation to use the Akka BlockManagerMasterActor, since the Block Managers would continue to use that. If we could somehow force that - then it would be a good idea to just use FQCN.

"BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
}


val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker)
Expand Down
173 changes: 19 additions & 154 deletions core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import scala.concurrent.{Await, Future}
Expand All @@ -28,134 +27,68 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils

private[spark]
class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3)
val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000)

val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"

val timeout = AkkaUtils.askTimeout(conf)
trait BlockManagerMaster {

/** Remove a dead executor from the driver actor. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
}
def removeExecutor(execId: String)

/**
* Send the driver actor a heart beat from the slave. Returns true if everything works out,
* false if the driver does not know about the given block manager, which means the block
* manager should re-register.
*/
def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = {
askDriverWithReply[Boolean](HeartBeat(blockManagerId))
}
def sendHeartBeat(blockManagerId: BlockManagerId): Boolean

/** Register the BlockManager's id with the driver. */
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
logInfo("Registered BlockManager")
}
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef)

def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long): Boolean = {
val res = askDriverWithReply[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
logInfo("Updated info of block " + blockId)
res
}
tachyonSize: Long): Boolean

/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))
}
def getLocations(blockId: BlockId): Seq[BlockManagerId]

/** Get locations of multiple blockIds from the driver */
def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
}
def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]]

/**
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are reported to block manager master.
*/
def contains(blockId: BlockId) = {
!getLocations(blockId).isEmpty
}
def contains(blockId: BlockId): Boolean

/** Get ids of other nodes in the cluster from the driver */
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
}
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId]

/**
* Remove a block from the slaves that have it. This can only be used to remove
* blocks that the driver knows about.
*/
def removeBlock(blockId: BlockId) {
askDriverWithReply(RemoveBlock(blockId))
}
def removeBlock(blockId: BlockId)

/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
}
if (blocking) {
Await.result(future, timeout)
}
}
def removeRdd(rddId: Int, blocking: Boolean)

/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
}
if (blocking) {
Await.result(future, timeout)
}
}
def removeShuffle(shuffleId: Int, blocking: Boolean)

/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Throwable =>
logError("Failed to remove broadcast " + broadcastId +
" with removeFromMaster = " + removeFromMaster, e)
}
if (blocking) {
Await.result(future, timeout)
}
}
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean)

/**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum
* amount of memory allocated for the block manager, while the second is the
* amount of remaining memory.
*/
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}
def getMemoryStatus: Map[BlockManagerId, (Long, Long)]

def getStorageStatus: Array[StorageStatus] = {
askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
}
def getStorageStatus: Array[StorageStatus]

/**
* Return the block's status on all block managers, if any. NOTE: This is a
Expand All @@ -166,25 +99,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
* by all block managers.
*/
def getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
val msg = GetBlockStatus(blockId, askSlaves)
/*
* To avoid potential deadlocks, the use of Futures is necessary, because the master actor
* should not block on waiting for a block manager, which can in turn be waiting for the
* master actor for a response to a prior message.
*/
val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
}
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus]

/**
* Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
Expand All @@ -196,60 +112,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
*/
def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = askDriverWithReply[Future[Seq[BlockId]]](msg)
Await.result(future, timeout)
}
askSlaves: Boolean): Seq[BlockId]

/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
tell(StopBlockManagerMaster)
driverActor = null
logInfo("BlockManagerMaster stopped")
}
}

/** Send a one-way message to the master actor, to which we expect it to reply with true. */
private def tell(message: Any) {
if (!askDriverWithReply[Boolean](message)) {
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
}
}

/**
* Send a message to the driver actor and get its result within a default timeout, or
* throw a SparkException if this fails.
*/
private def askDriverWithReply[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (driverActor == null) {
throw new SparkException("Error sending message to BlockManager as driverActor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e)
}
Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}

throw new SparkException(
"Error sending message to BlockManagerMaster [message = " + message + "]", lastException)
}
def stop()

}
Loading