Skip to content

Commit

Permalink
[SPARK-4879] Use the Spark driver to authorize Hadoop commits.
Browse files Browse the repository at this point in the history
Previously, SparkHadoopWriter always committed its tasks without
question. The problem is that when speculation is turned on, sometimes
this can result in multiple tasks committing their output to the same
partition. Even though an HDFS-writing task may be re-launched due to
speculation, the original task is not killed and may eventually commit
as well.

This can cause strange race conditions where multiple tasks that commit
interfere with each other, with the result being that some partition
files are actually lost entirely. For more context on these kinds of
scenarios, see the aforementioned JIRA ticket.

In Hadoop MapReduce jobs, the application master is a central
coordinator that authorizes whether or not any given task can commit.
Before a task commits its output, it queries the application master as
to whether or not such a commit is safe, and the application master does
bookeeping as tasks are requesting commits. Duplicate tasks that would
write to files that were already written to from other tasks are
prohibited from committing.

This patch emulates that functionality - the crucial missing component was
a central arbitrator, which is now a module called the OutputCommitCoordinator.
The coordinator lives on the driver and the executors can obtain a reference
to this actor and request its permission to commit. As tasks commit and are
reported as completed successfully or unsuccessfully by the DAGScheduler,
the commit coordinator is informed of the task completion events as well
to update its internal state.
  • Loading branch information
mccheah committed Jan 21, 2015
1 parent 9d9294a commit 6e6f748
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 13 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
Expand Down Expand Up @@ -67,6 +67,7 @@ class SparkEnv (
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

private[spark] var isStopped = false
Expand All @@ -86,6 +87,7 @@ class SparkEnv (
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
Expand Down Expand Up @@ -346,6 +348,11 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}

val outputCommitCoordinator = new OutputCommitCoordinator
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
OutputCommitCoordinator.createActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = outputCommitCoordinatorActor

new SparkEnv(
executorId,
actorSystem,
Expand All @@ -362,6 +369,7 @@ object SparkEnv extends Logging {
sparkFilesDir,
metricsSystem,
shuffleMemoryManager,
outputCommitCoordinator,
conf)
}

Expand Down
31 changes: 22 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.AkkaUtils

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand All @@ -44,6 +45,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private val now = new Date()
private val conf = new SerializableWritable(jobConf)

private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator

private var jobID = 0
private var splitID = 0
private var attemptID = 0
Expand Down Expand Up @@ -106,18 +109,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
val conf = SparkEnv.get.conf
val timeout = AkkaUtils.askTimeout(conf)
val maxAttempts = AkkaUtils.numRetries(conf)
val retryInterval = AkkaUtils.retryWaitMs(conf)
val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID,
maxAttempts, retryInterval, timeout)
if (canCommit) {
try {
cmtr.commitTask(taCtxt)
logInfo (s"$taID: Committed")
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}
} else {
logInfo(s"$taID: Not committed because DAGScheduler did not authorize commit")
}
} else {
logInfo ("No need to commit output of task: " + taID.value)
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
}
}

Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, EventLoop, SystemClock, Clock, Utils}
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand All @@ -63,7 +63,7 @@ class DAGScheduler(
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
clock: org.apache.spark.util.Clock = SystemClock)
extends Logging {

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
Expand Down Expand Up @@ -113,7 +113,6 @@ class DAGScheduler(
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()


/** If enabled, we may run certain actions like take() and first() locally. */
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)

Expand All @@ -126,6 +125,8 @@ class DAGScheduler(
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)

private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -808,6 +809,7 @@ class DAGScheduler(
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
outputCommitCoordinator.stageStart(stage.id)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
Expand Down Expand Up @@ -865,6 +867,7 @@ class DAGScheduler(
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
outputCommitCoordinator.stageEnd(stage.id)
listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
Expand Down Expand Up @@ -908,6 +911,9 @@ class DAGScheduler(
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
val isSuccess = event.reason == Success

outputCommitCoordinator.taskCompleted(stageId, task.partitionId, event.taskInfo.taskId, isSuccess)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Expand Down Expand Up @@ -1367,6 +1373,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler

case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()

}

override def onError(e: Throwable): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent

private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent

Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import akka.actor.{PoisonPill, ActorRef, Actor}
import org.apache.spark.Logging
import org.apache.spark.util.{AkkaUtils, ActorLogReceive}

import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

private[spark] sealed trait OutputCommitCoordinationMessage

private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage
private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage

private[spark] case class AskPermissionToCommitOutput(
stage: Int,
task: Long,
taskAttempt: Long)
extends OutputCommitCoordinationMessage

private[spark] case class TaskCompleted(
stage: Int,
task: Long,
attempt: Long,
successful: Boolean)
extends OutputCommitCoordinationMessage

/**
* Authority that decides whether tasks can commit output to HDFS.
*
* This lives on the driver, but the actor allows the tasks that commit
* to Hadoop to invoke it.
*/
private[spark] class OutputCommitCoordinator extends Logging {

// Initialized by SparkEnv
var coordinatorActor: ActorRef = _

// TODO: handling stage attempt ids?
private type StageId = Int
private type TaskId = Long
private type TaskAttemptId = Long

private val authorizedCommittersByStage:
mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap()

def stageStart(stage: StageId) {
coordinatorActor ! StageStarted(stage)
}
def stageEnd(stage: StageId) {
coordinatorActor ! StageEnded(stage)
}

def canCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
timeout: FiniteDuration): Boolean = {
AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt),
coordinatorActor, timeout)
}

def canCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
maxAttempts: Int,
retryInterval: Int,
timeout: FiniteDuration): Boolean = {
AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt),
coordinatorActor, maxAttempts = maxAttempts, retryInterval, timeout)
}

def taskCompleted(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
successful: Boolean) {
coordinatorActor ! TaskCompleted(stage, task, attempt, successful)
}

def stop() {
coordinatorActor ! PoisonPill
}

private def handleStageStart(stage: StageId): Unit = {
// TODO: assert that we're not overwriting an existing entry?
authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]()
}

private def handleStageEnd(stage: StageId): Unit = {
authorizedCommittersByStage.remove(stage)
}

private def handleAskPermissionToCommit(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId):
Boolean = {
if (!authorizedCommittersByStage.contains(stage)) {
logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
return false
}
val authorizedCommitters = authorizedCommittersByStage(stage)
if (authorizedCommitters.contains(task)) {
val existingCommitter = authorizedCommitters(task)
logDebug(s"Denying $attempt to commit for stage=$stage, task=$task; " +
s"existingCommitter = $existingCommitter")
false
} else {
logDebug(s"Authorizing $attempt to commit for stage=$stage, task=$task")
authorizedCommitters(task) = attempt
true
}
}

private def handleTaskCompletion(
stage: StageId,
task: TaskId,
attempt: TaskAttemptId,
successful: Boolean): Unit = {
if (!authorizedCommittersByStage.contains(stage)) {
logDebug(s"Ignoring task completion for completed stage")
return
}
val authorizedCommitters = authorizedCommittersByStage(stage)
if (authorizedCommitters.get(task) == Some(attempt) && !successful) {
logDebug(s"Authorized committer $attempt (stage=$stage, task=$task) failed; clearing lock")
// The authorized committer failed; clear the lock so future attempts can commit their output
authorizedCommitters.remove(task)
}
}
}

private[spark] object OutputCommitCoordinator {

class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)
extends Actor with ActorLogReceive with Logging {

override def receiveWithLogging = {
case StageStarted(stage) =>
outputCommitCoordinator.handleStageStart(stage)
case StageEnded(stage) =>
outputCommitCoordinator.handleStageEnd(stage)
case AskPermissionToCommitOutput(stage, task, taskAttempt) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, task, taskAttempt)
case TaskCompleted(stage, task, attempt, successful) =>
outputCommitCoordinator.handleTaskCompletion(stage, task, attempt, successful)
}
}
def createActor(coordinator: OutputCommitCoordinator): OutputCommitCoordinatorActor = {
new OutputCommitCoordinatorActor(coordinator)
}
}


0 comments on commit 6e6f748

Please sign in to comment.