Skip to content

Commit

Permalink
[SPARK-5529][CORE]Add expireDeadHosts in HeartbeatReceiver
Browse files Browse the repository at this point in the history
If a blockManager has not send heartBeat more than 120s, BlockManagerMasterActor will remove it. But coarseGrainedSchedulerBackend can only remove executor after an DisassociatedEvent.  We should expireDeadHosts at HeartbeatReceiver.

Author: Hong Shen <hongshen@tencent.com>

Closes #4363 from shenh062326/my_change3 and squashes the following commits:

2c9a46a [Hong Shen] Change some code style.
1a042ff [Hong Shen] Change some code style.
2dc456e [Hong Shen] Change some code style.
d221493 [Hong Shen] Fix test failed
7448ac6 [Hong Shen] A minor change in sparkContext and heartbeatReceiver
b904aed [Hong Shen] Fix failed test
52725af [Hong Shen] Remove assert in SparkContext.killExecutors
5bedcb8 [Hong Shen] Remove assert in SparkContext.killExecutors
a858fb5 [Hong Shen] A minor change in HeartbeatReceiver
3e221d9 [Hong Shen] A minor change in HeartbeatReceiver
6bab7aa [Hong Shen] Change a code style.
07952f3 [Hong Shen] Change configs name and code style.
ce9257e [Hong Shen] Fix test failed
bccd515 [Hong Shen] Fix test failed
8e77408 [Hong Shen] Fix test failed
c1dfda1 [Hong Shen] Fix test failed
e197e20 [Hong Shen] Fix test failed
fb5df97 [Hong Shen] Remove ExpireDeadHosts in BlockManagerMessages
b5c0441 [Hong Shen] Remove expireDeadHosts in BlockManagerMasterActor
c922cb0 [Hong Shen] Add expireDeadHosts in HeartbeatReceiver
  • Loading branch information
shenh062326 authored and Andrew Or committed Feb 27, 2015
1 parent fbc4694 commit 18f2098
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 49 deletions.
65 changes: 59 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,86 @@

package org.apache.spark

import akka.actor.Actor
import scala.concurrent.duration._
import scala.collection.mutable

import akka.actor.{Actor, Cancellable}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks.
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000

private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000

private var timeoutCheckingTask: Cancellable = null

override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()
}

private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
sc.killExecutor(executorId)
}
executorLastSeen.remove(executorId)
}
}
}

override def postStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
super.postStop()
}
}
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down Expand Up @@ -398,7 +398,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
Expand Down Expand Up @@ -1122,6 +1122,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
postEnvironmentUpdate()
}

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
Expand Down Expand Up @@ -1155,7 +1162,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand All @@ -1173,7 +1180,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,9 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId


/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private[spark] class TaskSchedulerImpl(
}
}

def executorLost(executorId: String, reason: ExecutorLossReason) {
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.{Actor, ActorRef, Cancellable}
import akka.actor.{Actor, ActorRef}
import akka.pattern.ask

import org.apache.spark.{Logging, SparkConf, SparkException}
Expand Down Expand Up @@ -52,19 +52,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

private val akkaTimeout = AkkaUtils.askTimeout(conf)

val slaveTimeout = conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120 * 1000)

val checkTimeoutInterval = conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000)

var timeoutCheckingTask: Cancellable = null

override def preStart() {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
Expand Down Expand Up @@ -118,14 +105,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

case StopBlockManagerMaster =>
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
context.stop(self)

case ExpireDeadHosts =>
expireDeadHosts()

case BlockManagerHeartbeat(blockManagerId) =>
sender ! heartbeatReceived(blockManagerId)

Expand Down Expand Up @@ -207,21 +188,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
logInfo(s"Removing block manager $blockManagerId")
}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}
toRemove.foreach(removeBlockManager)
}

private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,4 @@ private[spark] object BlockManagerMessages {
extends ToBlockManagerMaster

case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
}
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -386,6 +387,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
override def defaultParallelism() = 2
override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down

0 comments on commit 18f2098

Please sign in to comment.