- { providerConfig.map(e => - {e._1}: {e._2}
) }
+ {providerConfig.map { case (k, v) => - {k}: {v}
}}
{
if (allApps.size > 0) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index cacb9da8c947b..d1a64c1912cb8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.SignalLogger
/**
* A web server that renders SparkUIs of completed applications.
@@ -177,7 +177,7 @@ object HistoryServer extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
- val args = new HistoryServerArguments(conf, argStrings)
+ new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
val providerName = conf.getOption("spark.history.provider")
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index be9361b754fc3..25fc76c23e0fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
args match {
case ("--dir" | "-d") :: value :: tail =>
logDir = value
+ conf.set("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case _ =>
printUsageAndExit(1)
}
- if (logDir != null) {
- conf.set("spark.history.fs.logDirectory", logDir)
- }
}
private def printUsageAndExit(exitCode: Int) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 21f8667819c44..a70ecdb375373 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -154,6 +154,8 @@ private[spark] class Master(
}
override def postStop() {
+ masterMetricsSystem.report()
+ applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index a87781fb93850..4b0dbbe543d3f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -38,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (conf.contains("master.ui.port")) {
- webUiPort = conf.get("master.ui.port").toInt
+ if (conf.contains("spark.master.ui.port")) {
+ webUiPort = conf.get("spark.master.ui.port").toInt
}
parse(args.toList)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 16aa0493370dd..d86ec1e03e45c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.AkkaUtils
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
- extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {
+ extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 4af5bc3afad6c..687e492a0d6fc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -47,7 +47,6 @@ object CommandUtils extends Logging {
*/
def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")
- val extraOpts = command.extraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq())
// Exists for backwards compatibility with older Spark versions
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString)
@@ -62,7 +61,7 @@ object CommandUtils extends Logging {
val joined = command.libraryPathEntries.mkString(File.pathSeparator)
Seq(s"-Djava.library.path=$joined")
} else {
- Seq()
+ Seq()
}
val permGenOpt = Seq("-XX:MaxPermSize=128m")
@@ -71,11 +70,11 @@ object CommandUtils extends Logging {
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Seq(sparkHome + "/bin/compute-classpath" + ext),
- extraEnvironment=command.environment)
+ extraEnvironment = command.environment)
val userClassPath = command.classPathEntries ++ Seq(classPath)
Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
- permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
+ permGenOpt ++ libraryOpts ++ workerLocalOpts ++ command.javaOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 662d37871e7a6..5caaf6bea3575 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
+ * This is currently only used in standalone cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
@@ -81,7 +82,7 @@ private[spark] class DriverRunner(
driverDesc.command.environment,
classPath,
driverDesc.command.libraryPathEntries,
- driverDesc.command.extraJavaOptions)
+ driverDesc.command.javaOpts)
val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem,
sparkHome.getAbsolutePath)
launchDriver(command, driverDesc.command.environment, driverDir, driverDesc.supervise)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 467317dd9b44c..7be89f9aff0f3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender
/**
* Manages the execution of one executor process.
+ * This is currently only used in standalone mode.
*/
private[spark] class ExecutorRunner(
val appId: String,
@@ -72,7 +73,7 @@ private[spark] class ExecutorRunner(
}
/**
- * kill executor process, wait for exit and notify worker to update resource status
+ * Kill executor process, wait for exit and notify worker to update resource status.
*
* @param message the exception message which caused the executor's death
*/
@@ -114,10 +115,13 @@ private[spark] class ExecutorRunner(
}
def getCommandSeq = {
- val command = Command(appDesc.command.mainClass,
- appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment,
- appDesc.command.classPathEntries, appDesc.command.libraryPathEntries,
- appDesc.command.extraJavaOptions)
+ val command = Command(
+ appDesc.command.mainClass,
+ appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
+ appDesc.command.environment,
+ appDesc.command.classPathEntries,
+ appDesc.command.libraryPathEntries,
+ appDesc.command.javaOpts)
CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ce425443051b0..458d9947bd873 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -71,7 +71,7 @@ private[spark] class Worker(
// TTL for app folders/data; after TTL expires it will be cleaned up
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
-
+ val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
@@ -81,7 +81,13 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
- val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
+ val sparkHome =
+ if (testing) {
+ assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
+ new File(sys.props("spark.test.home"))
+ } else {
+ new File(sys.env.get("SPARK_HOME").getOrElse("."))
+ }
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -233,9 +239,7 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host,
- appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
- workDir, akkaUrl, conf, ExecutorState.RUNNING)
+ self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
@@ -357,6 +361,7 @@ private[spark] class Worker(
}
override def postStop() {
+ metricsSystem.report()
registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index b389cb546de6c..ecb358c399819 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.worker.ui
-import java.io.File
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
@@ -25,7 +24,7 @@ import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils
import org.apache.spark.Logging
-import org.apache.spark.util.logging.{FileAppender, RollingFileAppender}
+import org.apache.spark.util.logging.RollingFileAppender
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
@@ -64,11 +63,11 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w
val offset = Option(request.getParameter("offset")).map(_.toLong)
val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
- val (logDir, params) = (appId, executorId, driverId) match {
+ val (logDir, params, pageName) = (appId, executorId, driverId) match {
case (Some(a), Some(e), None) =>
- (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e")
+ (s"${workDir.getPath}/$a/$e/", s"appId=$a&executorId=$e", s"$a/$e")
case (None, None, Some(d)) =>
- (s"${workDir.getPath}/$d/", s"driverId=$d")
+ (s"${workDir.getPath}/$d/", s"driverId=$d", d)
case _ =>
throw new Exception("Request must specify either application or driver identifiers")
}
@@ -120,7 +119,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w