Skip to content

Commit

Permalink
Properly handling messages that could be sent after actor shutdown.
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah committed Jan 23, 2015
1 parent 8d5a091 commit c334255
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ object SparkEnv extends Logging {
val outputCommitCoordinator = new OutputCommitCoordinator(conf)
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
OutputCommitCoordinator.createActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)

new SparkEnv(
executorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ private[spark] case class TaskCompleted(
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {

// Initialized by SparkEnv
var coordinatorActor: ActorRef = _
val timeout = AkkaUtils.askTimeout(conf)
val maxAttempts = AkkaUtils.numRetries(conf)
val retryInterval = AkkaUtils.retryWaitMs(conf)
var coordinatorActor: Option[ActorRef] = None
private val timeout = AkkaUtils.askTimeout(conf)
private val maxAttempts = AkkaUtils.numRetries(conf)
private val retryInterval = AkkaUtils.retryWaitMs(conf)

private type StageId = Int
private type TaskId = Long
Expand All @@ -66,33 +66,30 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap()

def stageStart(stage: StageId) {
coordinatorActor ! StageStarted(stage)
sendToActor(StageStarted(stage))
}
def stageEnd(stage: StageId) {
coordinatorActor ! StageEnded(stage)
sendToActor(StageEnded(stage))
}

def canCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId): Boolean = {
AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt),
coordinatorActor, maxAttempts, retryInterval, timeout)
askActor(AskPermissionToCommitOutput(stage, task, attempt))
}

def taskCompleted(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
successful: Boolean) {
coordinatorActor ! TaskCompleted(stage, task, attempt, successful)
sendToActor(TaskCompleted(stage, task, attempt, successful))
}

def stop() {
val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout)
if (!stopped) {
logWarning("Expected true from stopping output coordinator actor, but got false!")
}
sendToActor(StopCoordinator)
coordinatorActor = None
authorizedCommittersByStage.foreach(_._2.clear)
authorizedCommittersByStage.clear
}
Expand Down Expand Up @@ -144,6 +141,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
}
}

private def sendToActor(msg: OutputCommitCoordinationMessage) {
coordinatorActor.foreach(_ ! msg)
}

private def askActor(msg: OutputCommitCoordinationMessage): Boolean = {
coordinatorActor
.map(AkkaUtils.askWithReply[Boolean](msg, _, maxAttempts, retryInterval, timeout))
.getOrElse(false)
}
}

private[spark] object OutputCommitCoordinator {
Expand Down

0 comments on commit c334255

Please sign in to comment.