From 78eb1b58cee0b5baf532ffde2a90e0fc31657d69 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 17:15:54 -0800 Subject: [PATCH] Better OutputCommitCoordinatorActor stopping; simpler canCommit Before, a poison pill message was sent to the actor. That is not the paradigm that is used on other actors in Spark though, so making this more like the e.g. MapOutputTracker Actor. --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/SparkHadoopWriter.scala | 6 +-- .../scheduler/OutputCommitCoordinator.scala | 37 +++++++++---------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 651b2cbdfcca0..0c05daa70789a 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -348,7 +348,7 @@ object SparkEnv extends Logging { "levels using the RDD.persist() method instead.") } - val outputCommitCoordinator = new OutputCommitCoordinator + val outputCommitCoordinator = new OutputCommitCoordinator(conf) val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator", OutputCommitCoordinator.createActor(outputCommitCoordinator)) outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index a4d13c972c01a..bc5c7c5509d89 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -109,11 +109,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) if (cmtr.needsTaskCommit(taCtxt)) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val conf = SparkEnv.get.conf - val timeout = AkkaUtils.askTimeout(conf) - val maxAttempts = AkkaUtils.numRetries(conf) - val retryInterval = AkkaUtils.retryWaitMs(conf) - val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID, - maxAttempts, retryInterval, timeout) + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) if (canCommit) { try { cmtr.commitTask(taCtxt) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index ee20896de1449..a91f953cfcb5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -22,13 +22,14 @@ import scala.concurrent.duration.FiniteDuration import akka.actor.{PoisonPill, ActorRef, Actor} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.{AkkaUtils, ActorLogReceive} private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage private[spark] case class AskPermissionToCommitOutput( stage: Int, @@ -49,10 +50,13 @@ private[spark] case class TaskCompleted( * This lives on the driver, but the actor allows the tasks that commit * to Hadoop to invoke it. */ -private[spark] class OutputCommitCoordinator extends Logging { +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { // Initialized by SparkEnv var coordinatorActor: ActorRef = _ + val timeout = AkkaUtils.askTimeout(conf) + val maxAttempts = AkkaUtils.numRetries(conf) + val retryInterval = AkkaUtils.retryWaitMs(conf) private type StageId = Int private type TaskId = Long @@ -61,7 +65,6 @@ private[spark] class OutputCommitCoordinator extends Logging { private val authorizedCommittersByStage: mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() - def stageStart(stage: StageId) { coordinatorActor ! StageStarted(stage) } @@ -72,21 +75,9 @@ private[spark] class OutputCommitCoordinator extends Logging { def canCommit( stage: StageId, task: TaskId, - attempt: TaskAttemptId, - timeout: FiniteDuration): Boolean = { + attempt: TaskAttemptId): Boolean = { AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), - coordinatorActor, timeout) - } - - def canCommit( - stage: StageId, - task: TaskId, - attempt: TaskAttemptId, - maxAttempts: Int, - retryInterval: Int, - timeout: FiniteDuration): Boolean = { - AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), - coordinatorActor, maxAttempts = maxAttempts, retryInterval, timeout) + coordinatorActor, maxAttempts, retryInterval, timeout) } def taskCompleted( @@ -98,7 +89,10 @@ private[spark] class OutputCommitCoordinator extends Logging { } def stop() { - coordinatorActor ! PoisonPill + val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout) + if (!stopped) { + logWarning("Expected true from stopping output coordinator actor, but got false!") + } } private def handleStageStart(stage: StageId): Unit = { @@ -147,6 +141,7 @@ private[spark] class OutputCommitCoordinator extends Logging { authorizedCommitters.remove(task) } } + } private[spark] object OutputCommitCoordinator { @@ -163,11 +158,13 @@ private[spark] object OutputCommitCoordinator { sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt) case TaskCompleted(stage, task, attempt, successful) => outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful) + case StopCoordinator => + logInfo("OutputCommitCoordinator stopped!") + context.stop(self) + sender ! true } } def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = { new OutputCommitCoordinatorActor(coordinator) } } - -