From 186b497c945cc7bbe7a21fef56a948dd1fd10774 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 13 Oct 2014 23:31:37 -0700 Subject: [PATCH] [SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode The goal of this patch is to fix the swapped arguments in standalone mode, which was caused by https://github.com/apache/spark/commit/79e45c9323455a51f25ed9acd0edd8682b4bbb88#diff-79391110e9f26657e415aa169a004998R153. More details can be found in the JIRA: [SPARK-3921](https://issues.apache.org/jira/browse/SPARK-3921) Tested in Standalone mode, but not in Mesos. Author: Aaron Davidson Closes #2779 from aarondav/fix-standalone and squashes the following commits: 725227a [Aaron Davidson] Fix ExecutorRunnerTest 9d703fe [Aaron Davidson] [SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode --- .../apache/spark/deploy/worker/ExecutorRunner.scala | 3 ++- .../spark/executor/CoarseGrainedExecutorBackend.scala | 3 +++ .../cluster/SparkDeploySchedulerBackend.scala | 3 ++- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 8 ++++---- .../spark/deploy/worker/ExecutorRunnerTest.scala | 10 ++++------ 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 71650cd773bcf..71d7385b08eb9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -111,13 +111,14 @@ private[spark] class ExecutorRunner( case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString + case "{{APP_ID}}" => appId case other => other } def getCommandSeq = { val command = Command( appDesc.command.mainClass, - appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), + appDesc.command.arguments.map(substituteVariables), appDesc.command.environment, appDesc.command.classPathEntries, appDesc.command.libraryPathEntries, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 06061edfc0844..c40a3e16675ad 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { "Usage: CoarseGrainedExecutorBackend " + " [] ") System.exit(1) + + // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode) + // and CoarseMesosSchedulerBackend (for mesos mode). case 5 => run(args(0), args(1), args(2), args(3).toInt, args(4), None) case x if x > 5 => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ed209d195ec9d..8c7de75600b5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}", + "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 90828578cd88f..d7f88de4b40aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend( if (uri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath command.setValue( - "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format( + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( ("cd %s*; " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d") + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s") .format(basename, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores)) + offer.getHostname, numCores, appId)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } command.build() diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 39ab53cf0b5b1..5e2592e8d2e8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -26,14 +26,12 @@ import org.apache.spark.SparkConf class ExecutorRunnerTest extends FunSuite { test("command includes appId") { - def f(s:String) = new File(s) + val appId = "12345-worker321-9876" val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, - Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl") - val appId = "12345-worker321-9876" - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome), - f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) - + Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", + new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING) assert(er.getCommandSeq.last === appId) } }