Skip to content

Commit

Permalink
Merge branch 'master' into feature/SPARK-6435-2
Browse files Browse the repository at this point in the history
Conflicts:
	bin/spark-class2.cmd
  • Loading branch information
tsudukim committed Apr 2, 2015
2 parents 04f4291 + 899ebcb commit 2a332e5
Show file tree
Hide file tree
Showing 141 changed files with 5,463 additions and 1,709 deletions.
61 changes: 36 additions & 25 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,46 @@ else
fi
fi

# Look for the launcher. In non-release mode, add the compiled classes directly to the classpath
# instead of looking for a jar file.
SPARK_LAUNCHER_CP=
if [ -f $SPARK_HOME/RELEASE ]; then
LAUNCHER_DIR="$SPARK_HOME/lib"
num_jars="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_LAUNCHER_CP" ]; then
echo "Failed to find Spark launcher in $LAUNCHER_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "$SPARK_HOME/RELEASE" ]; then
ASSEMBLY_DIR="$SPARK_HOME/lib"
else
ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

LAUNCHER_JARS="$(ls -1 "$LAUNCHER_DIR" | grep "^spark-launcher.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark launcher jars in $LAUNCHER_DIR:" 1>&2
echo "$LAUNCHER_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

SPARK_LAUNCHER_CP="${LAUNCHER_DIR}/${LAUNCHER_JARS}"
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

# Verify that versions of java used to build the jars and run Spark are compatible
if [ -n "$JAVA_HOME" ]; then
JAR_CMD="$JAVA_HOME/bin/jar"
else
LAUNCHER_DIR="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION"
if [ ! -d "$LAUNCHER_DIR/classes" ]; then
echo "Failed to find Spark launcher classes in $LAUNCHER_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
JAR_CMD="jar"
fi

if [ $(command -v "$JAR_CMD") ] ; then
jar_error_check=$("$JAR_CMD" -tf "$SPARK_ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi
SPARK_LAUNCHER_CP="$LAUNCHER_DIR/classes"
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
Expand All @@ -77,7 +88,7 @@ fi
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$SPARK_LAUNCHER_CP" org.apache.spark.launcher.Main "$@")
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")

if [ "${CMD[0]}" = "usage" ]; then
"${CMD[@]}"
Expand Down
33 changes: 11 additions & 22 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,20 @@ if "x%1"=="x" (
exit /b 1
)

set LAUNCHER_CP=0
if exist %SPARK_HOME%\RELEASE goto find_release_launcher
rem Find assembly jar
set SPARK_ASSEMBLY_JAR=0

rem Look for the Spark launcher in both Scala build directories. The launcher doesn't use Scala so
rem it doesn't really matter which one is picked up. Add the compiled classes directly to the
rem classpath instead of looking for a jar file, since it's very common for people using sbt to use
rem the "assembly" target instead of "package".
set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.10\classes
if exist %LAUNCHER_CLASSES% (
set LAUNCHER_CP=%LAUNCHER_CLASSES%
if exist "%SPARK_HOME%\RELEASE" (
set ASSEMBLY_DIR=%SPARK_HOME%\lib
) else (
set ASSEMBLY_DIR=%SPARK_HOME%\assembly\target\scala-%SPARK_SCALA_VERSION%
)
set LAUNCHER_CLASSES=%SPARK_HOME%\launcher\target\scala-2.11\classes
if exist %LAUNCHER_CLASSES% (
set LAUNCHER_CP=%LAUNCHER_CLASSES%
)
goto check_launcher

:find_release_launcher
for %%d in (%SPARK_HOME%\lib\spark-launcher*.jar) do (
set LAUNCHER_CP=%%d
for %%d in (%ASSEMBLY_DIR%\spark-assembly*hadoop*.jar) do (
set SPARK_ASSEMBLY_JAR=%%d
)

:check_launcher
if "%LAUNCHER_CP%"=="0" (
echo Failed to find Spark launcher JAR.
if "%SPARK_ASSEMBLY_JAR%"=="0" (
echo Failed to find Spark assembly JAR.
echo You need to build Spark before running this program.
exit /b 1
)
Expand All @@ -65,7 +54,7 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output.txt
"%RUNNER%" -cp %LAUNCHER_CP% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
"%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
set SPARK_CMD=%%i
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ $(function() {

stripeSummaryTable();

$("input:checkbox").click(function() {
$('input[type="checkbox"]').click(function() {
var column = "table ." + $(this).attr("name");
$(column).toggle();
stripeSummaryTable();
Expand All @@ -39,15 +39,15 @@ $(function() {
$("#select-all-metrics").click(function() {
if (this.checked) {
// Toggle all un-checked options.
$('input:checkbox:not(:checked)').trigger('click');
$('input[type="checkbox"]:not(:checked)').trigger('click');
} else {
// Toggle all checked options.
$('input:checkbox:checked').trigger('click');
$('input[type="checkbox"]:checked').trigger('click');
}
});

// Trigger a click on the checkbox if a user clicks the label next to it.
$("span.additional-metric-title").click(function() {
$(this).parent().find('input:checkbox').trigger('click');
$(this).parent().find('input[type="checkbox"]').trigger('click');
});
});
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000

private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val executorTimeoutMs = sc.conf.getOption("spark.network.timeout").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120000))

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val checkTimeoutIntervalMs =
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))

private var timeoutCheckingTask: Cancellable = null

Expand Down Expand Up @@ -84,7 +89,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
"timed out after ${now - lastSeenMs} ms"))
s"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
sc.killExecutor(executorId)
}
Expand Down
42 changes: 28 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorActor
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}

/**
* :: DeveloperApi ::
Expand All @@ -54,7 +56,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
@DeveloperApi
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
Expand All @@ -71,6 +73,9 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {

// TODO Remove actorSystem
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

Expand All @@ -91,7 +96,8 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
actorSystem.shutdown()
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
Expand Down Expand Up @@ -236,16 +242,15 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)

// Create the ActorSystem for Akka and get the port it binds to.
val (actorSystem, boundPort) = {
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
if (isDriver) {
conf.set("spark.driver.port", boundPort.toString)
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else {
conf.set("spark.executor.port", boundPort.toString)
conf.set("spark.executor.port", rpcEnv.address.port.toString)
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down Expand Up @@ -290,6 +295,15 @@ object SparkEnv extends Logging {
}
}

def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
Expand Down Expand Up @@ -377,13 +391,13 @@ object SparkEnv extends Logging {
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf)
}
val outputCommitCoordinatorActor = registerOrLookup("OutputCommitCoordinator",
new OutputCommitCoordinatorActor(outputCommitCoordinator))
outputCommitCoordinator.coordinatorActor = Some(outputCommitCoordinatorActor)
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

val envInstance = new SparkEnv(
executorId,
actorSystem,
rpcEnv,
serializer,
closureSerializer,
cacheManager,
Expand Down
52 changes: 2 additions & 50 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

Expand Down Expand Up @@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()

// Called after we have decided to commit
def performCommit(): Unit = {
try {
cmtr.commitTask(taCtxt)
logInfo (s"$taID: Committed")
} catch {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}

// First, check whether the task's output has already been committed by some other attempt
if (cmtr.needsTaskCommit(taCtxt)) {
// The task output needs to be committed, but we don't know whether some other task attempt
// might be racing to commit the same output partition. Therefore, coordinate with the driver
// in order to determine whether this attempt can commit (see SPARK-4879).
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
if (canCommit) {
performCommit()
} else {
val msg = s"$taID: Not committed because the driver did not authorize commit"
logInfo(msg)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
cmtr.abortTask(taCtxt)
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
performCommit()
}
} else {
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
}
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
}

def commitJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package org.apache.spark.deploy.worker

import java.io.File

import akka.actor._

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{AkkaUtils, ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
* Utility object for launching driver programs such that they share fate with the Worker process.
Expand All @@ -39,9 +38,9 @@ object DriverWrapper {
*/
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
val rpcEnv = RpcEnv.create("Driver",
Utils.localHostName(), 0, conf, new SecurityManager(conf))
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))

val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
Expand All @@ -58,7 +57,7 @@ object DriverWrapper {
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

actorSystem.shutdown()
rpcEnv.shutdown()

case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <userJar> <driverMainClass> [options]")
Expand Down
Loading

0 comments on commit 2a332e5

Please sign in to comment.