Skip to content

Commit

Permalink
Better OutputCommitCoordinatorActor stopping; simpler canCommit
Browse files Browse the repository at this point in the history
Before, a poison pill message was sent to the actor. That is not the
paradigm that is used on other actors in Spark though, so making this
more like the e.g. MapOutputTracker Actor.
  • Loading branch information
mccheah committed Jan 23, 2015
1 parent 83de900 commit 78eb1b5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 26 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}

val outputCommitCoordinator = new OutputCommitCoordinator
val outputCommitCoordinator = new OutputCommitCoordinator(conf)
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
OutputCommitCoordinator.createActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
if (cmtr.needsTaskCommit(taCtxt)) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val conf = SparkEnv.get.conf
val timeout = AkkaUtils.askTimeout(conf)
val maxAttempts = AkkaUtils.numRetries(conf)
val retryInterval = AkkaUtils.retryWaitMs(conf)
val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID,
maxAttempts, retryInterval, timeout)
val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
if (canCommit) {
try {
cmtr.commitTask(taCtxt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import scala.concurrent.duration.FiniteDuration

import akka.actor.{PoisonPill, ActorRef, Actor}

import org.apache.spark.Logging
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.util.{AkkaUtils, ActorLogReceive}

private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable

private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage
private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage
private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage

private[spark] case class AskPermissionToCommitOutput(
stage: Int,
Expand All @@ -49,10 +50,13 @@ private[spark] case class TaskCompleted(
* This lives on the driver, but the actor allows the tasks that commit
* to Hadoop to invoke it.
*/
private[spark] class OutputCommitCoordinator extends Logging {
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {

// Initialized by SparkEnv
var coordinatorActor: ActorRef = _
val timeout = AkkaUtils.askTimeout(conf)
val maxAttempts = AkkaUtils.numRetries(conf)
val retryInterval = AkkaUtils.retryWaitMs(conf)

private type StageId = Int
private type TaskId = Long
Expand All @@ -61,7 +65,6 @@ private[spark] class OutputCommitCoordinator extends Logging {
private val authorizedCommittersByStage:
mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap()


def stageStart(stage: StageId) {
coordinatorActor ! StageStarted(stage)
}
Expand All @@ -72,21 +75,9 @@ private[spark] class OutputCommitCoordinator extends Logging {
def canCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
timeout: FiniteDuration): Boolean = {
attempt: TaskAttemptId): Boolean = {
AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt),
coordinatorActor, timeout)
}

def canCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
maxAttempts: Int,
retryInterval: Int,
timeout: FiniteDuration): Boolean = {
AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt),
coordinatorActor, maxAttempts = maxAttempts, retryInterval, timeout)
coordinatorActor, maxAttempts, retryInterval, timeout)
}

def taskCompleted(
Expand All @@ -98,7 +89,10 @@ private[spark] class OutputCommitCoordinator extends Logging {
}

def stop() {
coordinatorActor ! PoisonPill
val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout)
if (!stopped) {
logWarning("Expected true from stopping output coordinator actor, but got false!")
}
}

private def handleStageStart(stage: StageId): Unit = {
Expand Down Expand Up @@ -147,6 +141,7 @@ private[spark] class OutputCommitCoordinator extends Logging {
authorizedCommitters.remove(task)
}
}

}

private[spark] object OutputCommitCoordinator {
Expand All @@ -163,11 +158,13 @@ private[spark] object OutputCommitCoordinator {
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt)
case TaskCompleted(stage, task, attempt, successful) =>
outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful)
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
context.stop(self)
sender ! true
}
}
def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = {
new OutputCommitCoordinatorActor(coordinator)
}
}


0 comments on commit 78eb1b5

Please sign in to comment.