From f2f26c2a1dc6d60078c3be9c3d11a21866d9a24f Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 20 Aug 2014 12:13:31 -0700 Subject: [PATCH 01/11] SPARK-3092 [SQL]: Always include the thriftserver when -Phive is enabled. Currently we have a separate profile called hive-thriftserver. I originally suggested this in case users did not want to bundle the thriftserver, but it's ultimately lead to a lot of confusion. Since the thriftserver is only a few classes, I don't see a really good reason to isolate it from the rest of Hive. So let's go ahead and just include it in the same profile to simplify things. This has been suggested in the past by liancheng. Author: Patrick Wendell Closes #2006 from pwendell/hiveserver and squashes the following commits: 742ea40 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into hiveserver 034ad47 [Patrick Wendell] SPARK-3092: Always include the thriftserver when -Phive is enabled. --- README.md | 6 +----- assembly/pom.xml | 5 ----- dev/create-release/create-release.sh | 10 +++++----- dev/run-tests | 2 +- dev/scalastyle | 2 +- docs/building-with-maven.md | 8 ++------ docs/sql-programming-guide.md | 4 +--- pom.xml | 2 +- 8 files changed, 12 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index a1a48f5bd0819..8906e4c1416b1 100644 --- a/README.md +++ b/README.md @@ -118,11 +118,7 @@ If your project is built with Maven, add this to your POM file's ` ## A Note About Thrift JDBC server and CLI for Spark SQL Spark SQL supports Thrift JDBC server and CLI. -See sql-programming-guide.md for more information about those features. -You can use those features by setting `-Phive-thriftserver` when building Spark as follows. - - $ sbt/sbt -Phive-thriftserver assembly - +See sql-programming-guide.md for more information about using the JDBC server. ## Configuration diff --git a/assembly/pom.xml b/assembly/pom.xml index 703f15925bc44..9fbb037115db3 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -163,11 +163,6 @@ spark-hive_${scala.binary.version} ${project.version} - - - - hive-thriftserver - org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 28f26d2368254..905dec0ced383 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -60,14 +60,14 @@ if [[ ! "$@" =~ --package-only ]]; then -Dmaven.javadoc.skip=true \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ -Dtag=$GIT_TAG -DautoVersionSubmodules=true \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ + -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ --batch-mode release:prepare mvn -DskipTests \ -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \ -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ -Dmaven.javadoc.skip=true \ - -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ + -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ release:perform cd .. @@ -117,10 +117,10 @@ make_binary_release() { spark-$RELEASE_VERSION-bin-$NAME.tgz.sha } -make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" & -make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & +make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & +make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & make_binary_release "hadoop2" \ - "-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" & + "-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" & make_binary_release "hadoop2-without-hive" \ "-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" & wait diff --git a/dev/run-tests b/dev/run-tests index 132f696d6447a..20a67cfb361b9 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -99,7 +99,7 @@ echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled: if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" fi # echo "q" is needed because sbt on encountering a build file with failure # (either resolution or compilation) prompts the user for input either q, r, diff --git a/dev/scalastyle b/dev/scalastyle index b53053a04ff42..eb9b467965636 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt +echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt # Check style with YARN alpha built too echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \ >> scalastyle.txt diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index 4d87ab92cec5b..a7d7bd3ccb1f2 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -98,12 +98,8 @@ mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -Dski # Building Thrift JDBC server and CLI for Spark SQL -Spark SQL supports Thrift JDBC server and CLI. -See sql-programming-guide.md for more information about those features. -You can use those features by setting `-Phive-thriftserver` when building Spark as follows. -{% highlight bash %} -mvn -Phive-thriftserver assembly -{% endhighlight %} +Spark SQL supports Thrift JDBC server and CLI. See sql-programming-guide.md for +more information about the JDBC server. # Spark Tests in Maven diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 34accade36ea9..c41f2804a6021 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -578,9 +578,7 @@ evaluated by the SQL execution engine. A full list of the functions supported c The Thrift JDBC server implemented here corresponds to the [`HiveServer2`] (https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test -the JDBC server with the beeline script comes with either Spark or Hive 0.12. In order to use Hive -you must first run '`sbt/sbt -Phive-thriftserver assembly/assembly`' (or use `-Phive-thriftserver` -for maven). +the JDBC server with the beeline script comes with either Spark or Hive 0.12. To start the JDBC server, run the following in the Spark directory: diff --git a/pom.xml b/pom.xml index 0d44cf4ea5f92..dd4c4ee80a0df 100644 --- a/pom.xml +++ b/pom.xml @@ -1179,7 +1179,7 @@ - hive-thriftserver + hive false From ceb19830b88486faa87ff41e18d03ede713a73cc Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 20 Aug 2014 12:18:41 -0700 Subject: [PATCH 02/11] BUILD: Bump Hadoop versions in the release build. Also, minor modifications to the MapR profile. --- dev/create-release/create-release.sh | 10 +++---- pom.xml | 39 +++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 905dec0ced383..eab6313733dfd 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,11 +118,11 @@ make_binary_release() { } make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & -make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & -make_binary_release "hadoop2" \ - "-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" & -make_binary_release "hadoop2-without-hive" \ - "-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0" & +make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Pyarn" & +make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Pyarn" & +make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" & +make_binary_release "mapr3" "-Pmapr3 -Pyarn -Phive" & +make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive" & wait # Copy data diff --git a/pom.xml b/pom.xml index dd4c4ee80a0df..7ed07ad7df88d 100644 --- a/pom.xml +++ b/pom.xml @@ -1115,18 +1115,49 @@ - mapr + mapr3 false 1.0.3-mapr-3.0.3 - 2.3.0-mapr-4.0.0-beta - 0.94.17-mapr-1403 - 3.4.5-mapr-1401 + 2.3.0-mapr-4.0.0-FCS + 0.94.17-mapr-1405 + 3.4.5-mapr-1406 + + mapr4 + + false + + + 2.3.0-mapr-4.0.0-FCS + 2.3.0-mapr-4.0.0-FCS + 0.94.17-mapr-1405-4.0.0-FCS + 3.4.5-mapr-1406 + + + + org.apache.curator + curator-recipes + 2.4.0 + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + 3.4.5-mapr-1406 + + + + hadoop-provided From cf46e725814f575ebb417e80d2571bccc6dac4a7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 20 Aug 2014 12:57:39 -0700 Subject: [PATCH 03/11] [SPARK-3126][SPARK-3127][SQL] Fixed HiveThriftServer2Suite This PR fixes two issues: 1. Fixes wrongly quoted command line option in `HiveThriftServer2Suite` that makes test cases hang until timeout. 1. Asks `dev/run-test` to run Spark SQL tests when `bin/spark-sql` and/or `sbin/start-thriftserver.sh` are modified. Author: Cheng Lian Closes #2036 from liancheng/fix-thriftserver-test and squashes the following commits: f38c4eb [Cheng Lian] Fixed the same quotation issue in CliSuite 26b82a0 [Cheng Lian] Run SQL tests when dff contains bin/spark-sql and/or sbin/start-thriftserver.sh a87f83d [Cheng Lian] Extended timeout e5aa31a [Cheng Lian] Fixed metastore JDBC URI quotation --- dev/run-tests | 2 +- .../spark/sql/hive/thriftserver/CliSuite.scala | 2 +- .../thriftserver/HiveThriftServer2Suite.scala | 18 ++++-------------- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 20a67cfb361b9..d751961605dfd 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -55,7 +55,7 @@ JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..* # Partial solution for SPARK-1455. Only run Hive tests if there are sql changes. if [ -n "$AMPLAB_JENKINS" ]; then git fetch origin master:master - diffs=`git diff --name-only master | grep "^sql/"` + diffs=`git diff --name-only master | grep "^\(sql/\)\|\(bin/spark-sql\)\|\(sbin/start-thriftserver.sh\)"` if [ -n "$diffs" ]; then echo "Detected changes in SQL. Will run Hive test suite." _RUN_SQL_TESTS=true diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 2bf8cfdcacd22..70bea1ed80fda 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -32,7 +32,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils { val commands = s"""../../bin/spark-sql | --master local - | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl" + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH """.stripMargin.split("\\s+") diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index aedef6ce1f5f2..326b0a7275b34 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -51,9 +51,6 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt port } - // If verbose is true, the test program will print all outputs coming from the Hive Thrift server. - val VERBOSE = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean - Class.forName(DRIVER_NAME) override def beforeAll() { launchServer() } @@ -68,8 +65,7 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt val command = s"""../../sbin/start-thriftserver.sh | --master local - | --hiveconf hive.root.logger=INFO,console - | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl" + | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$METASTORE_PATH | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$HOST | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$PORT @@ -77,12 +73,10 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt val pb = new ProcessBuilder(command ++ args: _*) val environment = pb.environment() - environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString) - environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST) process = pb.start() inputReader = new BufferedReader(new InputStreamReader(process.getInputStream)) errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream)) - waitForOutput(inputReader, "ThriftBinaryCLIService listening on") + waitForOutput(inputReader, "ThriftBinaryCLIService listening on", 300000) // Spawn a thread to read the output from the forked process. // Note that this is necessary since in some configurations, log4j could be blocked @@ -91,12 +85,8 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt while (true) { val stdout = readFrom(inputReader) val stderr = readFrom(errorReader) - if (VERBOSE && stdout.length > 0) { - println(stdout) - } - if (VERBOSE && stderr.length > 0) { - println(stderr) - } + print(stdout) + print(stderr) Thread.sleep(50) } } From 0ea46ac80089e9091d247704b17afbc423c0060d Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 20 Aug 2014 13:26:11 -0700 Subject: [PATCH 04/11] [SPARK-3062] [SPARK-2970] [SQL] spark-sql script ends with IOException when EventLogging is enabled #1891 was to avoid IOException when EventLogging is enabled. The solution used ShutdownHookManager but it was defined only Hadoop 2.x. Hadoop 1.x don't have ShutdownHookManager so #1891 doesn't compile on Hadoop 1.x Now, I had a compromised solution for both Hadoop 1.x and 2.x. Only for FileLogger, an unique FileSystem object is created. Author: Kousuke Saruta Closes #1970 from sarutak/SPARK-2970 and squashes the following commits: 240c91e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2970 0e7b45d [Kousuke Saruta] Revert "[SPARK-2970] [SQL] spark-sql script ends with IOException when EventLogging is enabled" e1262ec [Kousuke Saruta] Modified Filelogger to use unique FileSystem instance --- .../scala/org/apache/spark/util/FileLogger.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 2e8fbf5a91ee7..ad8b79af877d8 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -52,7 +52,20 @@ private[spark] class FileLogger( override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } - private val fileSystem = Utils.getHadoopFileSystem(logDir) + /** + * To avoid effects of FileSystem#close or FileSystem.closeAll called from other modules, + * create unique FileSystem instance only for FileLogger + */ + private val fileSystem = { + val conf = SparkHadoopUtil.get.newConfiguration() + val logUri = new URI(logDir) + val scheme = logUri.getScheme + if (scheme == "hdfs") { + conf.setBoolean("fs.hdfs.impl.disable.cache", true) + } + FileSystem.get(logUri, conf) + } + var fileIndex = 0 // Only used if compression is enabled From c1ba4cd6b4db22a9325eee50dc40a78593a10de1 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 20 Aug 2014 14:04:39 -0700 Subject: [PATCH 05/11] [SPARK-3149] Connection establishment information is not enough. Author: Kousuke Saruta Closes #2060 from sarutak/SPARK-3149 and squashes the following commits: 1cc89af [Kousuke Saruta] Modified log message of accepting connection --- .../main/scala/org/apache/spark/network/ConnectionManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index b3e951ded6e77..e5e1e72cd912b 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -418,7 +418,7 @@ private[spark] class ConnectionManager( newConnection.onReceive(receiveMessage) addListeners(newConnection) addConnection(newConnection) - logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]") + logInfo("Accepted connection from [" + newConnection.remoteAddress + "]") } catch { // might happen in case of issues with registering with selector case e: Exception => logError("Error in accept loop", e) From b3ec51bfd795772ff96d18228e979a52ebc82ec4 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 20 Aug 2014 15:01:47 -0700 Subject: [PATCH 06/11] [SPARK-2849] Handle driver configs separately in client mode In client deploy mode, the driver is launched from within `SparkSubmit`'s JVM. This means by the time we parse Spark configs from `spark-defaults.conf`, it is already too late to control certain properties of the driver's JVM. We currently ignore these configs in client mode altogether. ``` spark.driver.memory spark.driver.extraJavaOptions spark.driver.extraClassPath spark.driver.extraLibraryPath ``` This PR handles these properties before launching the driver JVM. It achieves this by spawning a separate JVM that runs a new class called `SparkSubmitDriverBootstrapper`, which spawns `SparkSubmit` as a sub-process with the appropriate classpath, library paths, java opts and memory. Author: Andrew Or Closes #1845 from andrewor14/handle-configs-bash and squashes the following commits: bed4bdf [Andrew Or] Change a few comments / messages (minor) 24dba60 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 08fd788 [Andrew Or] Warn against external usages of SparkSubmitDriverBootstrapper ff34728 [Andrew Or] Minor comments 51aeb01 [Andrew Or] Filter out JVM memory in Scala rather than Bash (minor) 9a778f6 [Andrew Or] Fix PySpark: actually kill driver on termination d0f20db [Andrew Or] Don't pass empty library paths, classpath, java opts etc. a78cb26 [Andrew Or] Revert a few changes in utils.sh (minor) 9ba37e2 [Andrew Or] Don't barf when the properties file does not exist 8867a09 [Andrew Or] A few more naming things (minor) 19464ad [Andrew Or] SPARK_SUBMIT_JAVA_OPTS -> SPARK_SUBMIT_OPTS d6488f9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 1ea6bbe [Andrew Or] SparkClassLauncher -> SparkSubmitDriverBootstrapper a91ea19 [Andrew Or] Fix precedence of library paths, classpath, java opts and memory 158f813 [Andrew Or] Remove "client mode" boolean argument c84f5c8 [Andrew Or] Remove debug print statement (minor) b71f52b [Andrew Or] Revert a few more changes (minor) 7d94a8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 3a8235d [Andrew Or] Only parse the properties file if special configs exist c37e08d [Andrew Or] Revert a few more changes a396eda [Andrew Or] Nullify my own hard work to simplify bash 0effa1e [Andrew Or] Add code in Scala that handles special configs c886568 [Andrew Or] Fix lines too long + a few comments / style (minor) 7a4190a [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 7396be2 [Andrew Or] Explicitly comment that multi-line properties are not supported fa11ef8 [Andrew Or] Parse the properties file only if the special configs exist 371cac4 [Andrew Or] Add function prefix (minor) be99eb3 [Andrew Or] Fix tests to not include multi-line configs bd0d468 [Andrew Or] Simplify parsing config file by ignoring multi-line arguments 56ac247 [Andrew Or] Use eval and set to simplify splitting 8d4614c [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash aeb79c7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into handle-configs-bash 2732ac0 [Andrew Or] Integrate BASH tests into dev/run-tests + log error properly 8d26a5c [Andrew Or] Add tests for bash/utils.sh 4ae24c3 [Andrew Or] Fix bug: escape properly in quote_java_property b3c4cd5 [Andrew Or] Fix bug: count the number of quotes instead of detecting presence c2273fc [Andrew Or] Fix typo (minor) e793e5f [Andrew Or] Handle multi-line arguments 5d8f8c4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra c7b9926 [Andrew Or] Minor changes to spark-defaults.conf.template a992ae2 [Andrew Or] Escape spark.*.extraJavaOptions correctly aabfc7e [Andrew Or] escape -> split (minor) 45a1eb9 [Andrew Or] Fix bug: escape escaped backslashes and quotes properly... 1cdc6b1 [Andrew Or] Fix bug: escape escaped double quotes properly c854859 [Andrew Or] Add small comment c13a2cb [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 8e552b7 [Andrew Or] Include an example of spark.*.extraJavaOptions de765c9 [Andrew Or] Print spark-class command properly a4df3c4 [Andrew Or] Move parsing and escaping logic to utils.sh dec2343 [Andrew Or] Only export variables if they exist fa2136e [Andrew Or] Escape Java options + parse java properties files properly ef12f74 [Andrew Or] Minor formatting 4ec22a1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra e5cfb46 [Andrew Or] Collapse duplicate code + fix potential whitespace issues 4edcaa8 [Andrew Or] Redirect stdout to stderr for python 130f295 [Andrew Or] Handle spark.driver.memory too 98dd8e3 [Andrew Or] Add warning if properties file does not exist 8843562 [Andrew Or] Fix compilation issues... 75ee6b4 [Andrew Or] Remove accidentally added file 63ed2e9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into submit-driver-extra 0025474 [Andrew Or] Revert SparkSubmit handling of --driver-* options for only cluster mode a2ab1b0 [Andrew Or] Parse spark.driver.extra* in bash 250cb95 [Andrew Or] Do not ignore spark.driver.extra* for client mode --- bin/spark-class | 49 ++++-- bin/spark-submit | 28 +++- bin/utils.sh | 0 conf/spark-defaults.conf.template | 10 +- .../apache/spark/api/python/PythonUtils.scala | 25 --- .../api/python/PythonWorkerFactory.scala | 3 +- .../apache/spark/deploy/PythonRunner.scala | 4 +- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +- .../SparkSubmitDriverBootstrapper.scala | 149 ++++++++++++++++++ .../scala/org/apache/spark/util/Utils.scala | 21 +++ 10 files changed, 250 insertions(+), 56 deletions(-) mode change 100644 => 100755 bin/utils.sh create mode 100644 core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala diff --git a/bin/spark-class b/bin/spark-class index 3f6beca5becf0..22acf92288b3b 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -17,6 +17,8 @@ # limitations under the License. # +# NOTE: Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala! + cygwin=false case "`uname`" in CYGWIN*) cygwin=true;; @@ -39,7 +41,7 @@ fi if [ -n "$SPARK_MEM" ]; then echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2 - echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2 + echo -e "(e.g., spark.executor.memory or spark.driver.memory)." 1>&2 fi # Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -73,11 +75,17 @@ case "$1" in OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM} ;; - # Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS - 'org.apache.spark.deploy.SparkSubmit') - OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS \ - -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH" + # Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS + + # SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY. + 'org.apache.spark.deploy.SparkSubmit') + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS" OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM} + if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then + OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH" + fi + if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then + OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY" + fi ;; *) @@ -101,11 +109,12 @@ fi # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM" + # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e "$FWDIR/conf/java-opts" ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi -export JAVA_OPTS + # Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala! TOOLS_DIR="$FWDIR"/tools @@ -146,10 +155,28 @@ if $cygwin; then fi export CLASSPATH -if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo -n "Spark Command: " 1>&2 - echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 - echo -e "========================================\n" 1>&2 +# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself. +# Here we must parse the properties file for relevant "spark.driver.*" configs before launching +# the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM +# to prepare the launch environment of this driver JVM. + +if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then + # This is used only if the properties file actually contains these special configs + # Export the environment variables needed by SparkSubmitDriverBootstrapper + export RUNNER + export CLASSPATH + export JAVA_OPTS + export OUR_JAVA_MEM + export SPARK_CLASS=1 + shift # Ignore main class (org.apache.spark.deploy.SparkSubmit) and use our own + exec "$RUNNER" org.apache.spark.deploy.SparkSubmitDriverBootstrapper "$@" +else + # Note: The format of this command is closely echoed in SparkSubmitDriverBootstrapper.scala + if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then + echo -n "Spark Command: " 1>&2 + echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 + echo -e "========================================\n" 1>&2 + fi + exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" fi -exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" diff --git a/bin/spark-submit b/bin/spark-submit index 9e7cecedd0325..32c911cd0438b 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -17,14 +17,18 @@ # limitations under the License. # +# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala! + export SPARK_HOME="$(cd `dirname $0`/..; pwd)" ORIG_ARGS=("$@") while (($#)); do if [ "$1" = "--deploy-mode" ]; then - DEPLOY_MODE=$2 + SPARK_SUBMIT_DEPLOY_MODE=$2 + elif [ "$1" = "--properties-file" ]; then + SPARK_SUBMIT_PROPERTIES_FILE=$2 elif [ "$1" = "--driver-memory" ]; then - DRIVER_MEMORY=$2 + export SPARK_SUBMIT_DRIVER_MEMORY=$2 elif [ "$1" = "--driver-library-path" ]; then export SPARK_SUBMIT_LIBRARY_PATH=$2 elif [ "$1" = "--driver-class-path" ]; then @@ -35,10 +39,24 @@ while (($#)); do shift done -DEPLOY_MODE=${DEPLOY_MODE:-"client"} +DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf" +export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"} +export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"} + +# For client mode, the driver will be launched in the same JVM that launches +# SparkSubmit, so we may need to read the properties file for any extra class +# paths, library paths, java options and memory early on. Otherwise, it will +# be too late by the time the driver JVM has started. -if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then - export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY +if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then + # Parse the properties file only if the special configs exist + contains_special_configs=$( + grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ + grep -v "^[[:space:]]*#" + ) + if [ -n "$contains_special_configs" ]; then + export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + fi fi exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" diff --git a/bin/utils.sh b/bin/utils.sh old mode 100644 new mode 100755 diff --git a/conf/spark-defaults.conf.template b/conf/spark-defaults.conf.template index 2779342769c14..94427029b94d7 100644 --- a/conf/spark-defaults.conf.template +++ b/conf/spark-defaults.conf.template @@ -2,7 +2,9 @@ # This is useful for setting default environmental settings. # Example: -# spark.master spark://master:7077 -# spark.eventLog.enabled true -# spark.eventLog.dir hdfs://namenode:8021/directory -# spark.serializer org.apache.spark.serializer.KryoSerializer +# spark.master spark://master:7077 +# spark.eventLog.enabled true +# spark.eventLog.dir hdfs://namenode:8021/directory +# spark.serializer org.apache.spark.serializer.KryoSerializer +# spark.driver.memory 5g +# spark.executor.extraJavaOptions -XX:+PrintGCDetail -Dkey=value -Dnumbers="one two three" diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 52c70712eea3d..be5ebfa9219d3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -40,28 +40,3 @@ private[spark] object PythonUtils { paths.filter(_ != "").mkString(File.pathSeparator) } } - - -/** - * A utility class to redirect the child process's stdout or stderr. - */ -private[spark] class RedirectThread( - in: InputStream, - out: OutputStream, - name: String) - extends Thread(name) { - - setDaemon(true) - override def run() { - scala.util.control.Exception.ignoring(classOf[IOException]) { - // FIXME: We copy the stream on the level of bytes to avoid encoding problems. - val buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - out.write(buf, 0, len) - out.flush() - len = in.read(buf) - } - } - } -} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index bf716a8ab025b..4c4796f6c59ba 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -17,7 +17,6 @@ package org.apache.spark.api.python -import java.lang.Runtime import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} @@ -25,7 +24,7 @@ import scala.collection.mutable import scala.collection.JavaConversions._ import org.apache.spark._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 0d6751f3fa6d2..b66c3ba4d5fb0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -22,8 +22,8 @@ import java.net.URI import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ -import org.apache.spark.api.python.{PythonUtils, RedirectThread} -import org.apache.spark.util.Utils +import org.apache.spark.api.python.PythonUtils +import org.apache.spark.util.{RedirectThread, Utils} /** * A main class used by spark-submit to launch Python applications. It executes python as a diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 318509a67a36f..f8cdbc3c392b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -195,18 +195,21 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), // Other options - OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraClassPath"), - OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraJavaOptions"), - OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, - sysProp = "spark.driver.extraLibraryPath"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, - sysProp = "spark.files") + sysProp = "spark.files"), + + // Only process driver specific options for cluster mode here, + // because they have already been processed in bash for client mode + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraClassPath"), + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraJavaOptions"), + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, CLUSTER, + sysProp = "spark.driver.extraLibraryPath") ) // In client mode, launch the application main class directly diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala new file mode 100644 index 0000000000000..af607e6a4a065 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File + +import scala.collection.JavaConversions._ + +import org.apache.spark.util.{RedirectThread, Utils} + +/** + * Launch an application through Spark submit in client mode with the appropriate classpath, + * library paths, java options and memory. These properties of the JVM must be set before the + * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity + * of parsing the properties file for such relevant configs in Bash. + * + * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper + */ +private[spark] object SparkSubmitDriverBootstrapper { + + // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`. + // Any changes made there must be reflected in this file. + + def main(args: Array[String]): Unit = { + + // This should be called only from `bin/spark-class` + if (!sys.env.contains("SPARK_CLASS")) { + System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!") + System.exit(1) + } + + val submitArgs = args + val runner = sys.env("RUNNER") + val classpath = sys.env("CLASSPATH") + val javaOpts = sys.env("JAVA_OPTS") + val defaultDriverMemory = sys.env("OUR_JAVA_MEM") + + // Spark submit specific environment variables + val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE") + val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE") + val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER") + val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY") + val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") + val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") + val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") + + assume(runner != null, "RUNNER must be set") + assume(classpath != null, "CLASSPATH must be set") + assume(javaOpts != null, "JAVA_OPTS must be set") + assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set") + assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!") + assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set") + assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set") + + // Parse the properties file for the equivalent spark.driver.* configs + val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap + val confDriverMemory = properties.get("spark.driver.memory") + val confLibraryPath = properties.get("spark.driver.extraLibraryPath") + val confClasspath = properties.get("spark.driver.extraClassPath") + val confJavaOpts = properties.get("spark.driver.extraJavaOptions") + + // Favor Spark submit arguments over the equivalent configs in the properties file. + // Note that we do not actually use the Spark submit values for library path, classpath, + // and Java opts here, because we have already captured them in Bash. + + val newDriverMemory = submitDriverMemory + .orElse(confDriverMemory) + .getOrElse(defaultDriverMemory) + + val newLibraryPath = + if (submitLibraryPath.isDefined) { + // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS + "" + } else { + confLibraryPath.map("-Djava.library.path=" + _).getOrElse("") + } + + val newClasspath = + if (submitClasspath.isDefined) { + // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH + classpath + } else { + classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") + } + + val newJavaOpts = + if (submitJavaOpts.isDefined) { + // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS + javaOpts + } else { + javaOpts + confJavaOpts.map(" " + _).getOrElse("") + } + + val filteredJavaOpts = Utils.splitCommandString(newJavaOpts) + .filterNot(_.startsWith("-Xms")) + .filterNot(_.startsWith("-Xmx")) + + // Build up command + val command: Seq[String] = + Seq(runner) ++ + Seq("-cp", newClasspath) ++ + Seq(newLibraryPath) ++ + filteredJavaOpts ++ + Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ + Seq("org.apache.spark.deploy.SparkSubmit") ++ + submitArgs + + // Print the launch command. This follows closely the format used in `bin/spark-class`. + if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) { + System.err.print("Spark Command: ") + System.err.println(command.mkString(" ")) + System.err.println("========================================\n") + } + + // Start the driver JVM + val filteredCommand = command.filter(_.nonEmpty) + val builder = new ProcessBuilder(filteredCommand) + val process = builder.start() + + // Redirect stdin, stdout, and stderr to/from the child JVM + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") + val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") + stdinThread.start() + stdoutThread.start() + stderrThread.start() + + // Terminate on broken pipe, which signals that the parent process has exited. This is + // important for the PySpark shell, where Spark submit itself is a python subprocess. + stdinThread.join() + process.destroy() + } + +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d6d74ce269219..69a84a3604a52 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1480,3 +1480,24 @@ private[spark] object Utils extends Logging { } } + +/** + * A utility class to redirect the child process's stdout or stderr. + */ +private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String) + extends Thread(name) { + + setDaemon(true) + override def run() { + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } + } +} From fb60bec34e0b20ae95b4b865a79744916e0a5737 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 20 Aug 2014 15:37:27 -0700 Subject: [PATCH 07/11] [SPARK-2298] Encode stage attempt in SparkListener & UI. Simple way to reproduce this in the UI: ```scala val f = new java.io.File("/tmp/test") f.delete() sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) => if (context.partitionId == 0) { val f = new java.io.File("/tmp/test") if (!f.exists) { f.mkdir() System.exit(0); } } iter }.count() ``` Author: Reynold Xin Closes #1545 from rxin/stage-attempt and squashes the following commits: 3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed. 40a6bd5 [Reynold Xin] Updated test suites. c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite. b3e2eed [Reynold Xin] Oops previous code didn't compile. 0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener. 6c08b07 [Reynold Xin] Addressed code review feedback. 4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI. --- .../apache/spark/scheduler/DAGScheduler.scala | 77 +-- .../spark/scheduler/SparkListener.scala | 11 +- .../org/apache/spark/scheduler/Stage.scala | 8 +- .../apache/spark/scheduler/StageInfo.scala | 11 +- .../spark/scheduler/TaskSchedulerImpl.scala | 8 +- .../org/apache/spark/scheduler/TaskSet.scala | 4 - .../apache/spark/ui/jobs/ExecutorTable.scala | 6 +- .../spark/ui/jobs/JobProgressListener.scala | 40 +- .../org/apache/spark/ui/jobs/StagePage.scala | 11 +- .../org/apache/spark/ui/jobs/StageTable.scala | 14 +- .../org/apache/spark/util/JsonProtocol.scala | 12 +- .../storage/StorageStatusListenerSuite.scala | 17 +- .../ui/jobs/JobProgressListenerSuite.scala | 68 +-- .../spark/ui/storage/StorageTabSuite.scala | 16 +- .../apache/spark/util/JsonProtocolSuite.scala | 476 ++++++++++++++---- 15 files changed, 555 insertions(+), 224 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b86cfbfa48fbe..34131984570e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -164,7 +164,7 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, - taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics) + taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) blockManagerId: BlockManagerId): Boolean = { listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) implicit val timeout = Timeout(600 seconds) @@ -677,7 +677,10 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) + // Note that there is a chance that this task is launched after the stage is cancelled. + // In that case, we wouldn't have the stage anymore in stageIdToStage. + val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) + listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) submitWaitingStages() } @@ -695,8 +698,8 @@ class DAGScheduler( // is in the process of getting stopped. val stageFailedMessage = "Stage cancelled because SparkContext was shut down" runningStages.foreach { stage => - stage.info.stageFailed(stageFailedMessage) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + stage.latestInfo.stageFailed(stageFailedMessage) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) } listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error))) } @@ -781,7 +784,16 @@ class DAGScheduler( logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() - var tasks = ArrayBuffer[Task[_]]() + + // First figure out the indexes of partition ids to compute. + val partitionsToCompute: Seq[Int] = { + if (stage.isShuffleMap) { + (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil) + } else { + val job = stage.resultOfJob.get + (0 until job.numPartitions).filter(id => !job.finished(id)) + } + } val properties = if (jobIdToActiveJob.contains(jobId)) { jobIdToActiveJob(stage.jobId).properties @@ -795,7 +807,8 @@ class DAGScheduler( // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. - listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) + stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) + listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast @@ -826,20 +839,19 @@ class DAGScheduler( return } - if (stage.isShuffleMap) { - for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { - val locs = getPreferredLocs(stage.rdd, p) - val part = stage.rdd.partitions(p) - tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs) + val tasks: Seq[Task[_]] = if (stage.isShuffleMap) { + partitionsToCompute.map { id => + val locs = getPreferredLocs(stage.rdd, id) + val part = stage.rdd.partitions(id) + new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { - // This is a final stage; figure out its job's missing partitions val job = stage.resultOfJob.get - for (id <- 0 until job.numPartitions if !job.finished(id)) { + partitionsToCompute.map { id => val p: Int = job.partitions(id) val part = stage.rdd.partitions(p) val locs = getPreferredLocs(stage.rdd, p) - tasks += new ResultTask(stage.id, taskBinary, part, locs, id) + new ResultTask(stage.id, taskBinary, part, locs, id) } } @@ -869,11 +881,11 @@ class DAGScheduler( logDebug("New pending tasks: " + stage.pendingTasks) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) - stage.info.submissionTime = Some(clock.getTime()) + stage.latestInfo.submissionTime = Some(clock.getTime()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should post // SparkListenerStageCompleted here in case there are no tasks to run. - listenerBus.post(SparkListenerStageCompleted(stage.info)) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) runningStages -= stage @@ -892,8 +904,9 @@ class DAGScheduler( // The success case is dealt with separately below, since we need to compute accumulator // updates before posting. if (event.reason != Success) { - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, - event.taskMetrics)) + val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) + listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason, + event.taskInfo, event.taskMetrics)) } if (!stageIdToStage.contains(task.stageId)) { @@ -902,14 +915,19 @@ class DAGScheduler( } val stage = stageIdToStage(task.stageId) - def markStageAsFinished(stage: Stage) = { - val serviceTime = stage.info.submissionTime match { + def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = { + val serviceTime = stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0) case _ => "Unknown" } - logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) - stage.info.completionTime = Some(clock.getTime()) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + if (errorMessage.isEmpty) { + logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) + stage.latestInfo.completionTime = Some(clock.getTime()) + } else { + stage.latestInfo.stageFailed(errorMessage.get) + logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) + } + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } event.reason match { @@ -924,7 +942,7 @@ class DAGScheduler( val name = acc.name.get val stringPartialValue = Accumulators.stringifyPartialValue(partialValue) val stringValue = Accumulators.stringifyValue(acc.value) - stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue) + stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue) event.taskInfo.accumulables += AccumulableInfo(id, name, Some(stringPartialValue), stringValue) } @@ -935,8 +953,8 @@ class DAGScheduler( logError(s"Failed to update accumulators for $task", e) } } - listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo, - event.taskMetrics)) + listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, + event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => @@ -1029,6 +1047,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) + markStageAsFinished(failedStage, Some("Fetch failure")) runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + @@ -1142,7 +1161,7 @@ class DAGScheduler( } val dependentJobs: Seq[ActiveJob] = activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq - failedStage.info.completionTime = Some(clock.getTime()) + failedStage.latestInfo.completionTime = Some(clock.getTime()) for (job <- dependentJobs) { failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason") } @@ -1182,8 +1201,8 @@ class DAGScheduler( if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptThread) - stage.info.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stage.info)) + stage.latestInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) } catch { case e: UnsupportedOperationException => logInfo(s"Could not cancel tasks for stage $stageId", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index d01d318633877..86ca8445a1124 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -39,7 +39,8 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent @DeveloperApi -case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent +case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) + extends SparkListenerEvent @DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent @@ -47,6 +48,7 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe @DeveloperApi case class SparkListenerTaskEnd( stageId: Int, + stageAttemptId: Int, taskType: String, reason: TaskEndReason, taskInfo: TaskInfo, @@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +/** + * Periodic updates from executors. + * @param execId executor id + * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) + */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - taskMetrics: Seq[(Long, Int, TaskMetrics)]) + taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 800905413d145..071568cdfb429 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite * stage, the callSite gives the user code that created the RDD being shuffled. For a result * stage, the callSite gives the user code that executes the associated action (e.g. count()). * + * A single stage can consist of multiple attempts. In that case, the latestInfo field will + * be updated for each attempt. + * */ private[spark] class Stage( val id: Int, @@ -71,8 +74,8 @@ private[spark] class Stage( val name = callSite.shortForm val details = callSite.longForm - /** Pointer to the [StageInfo] object, set by DAGScheduler. */ - var info: StageInfo = StageInfo.fromStage(this) + /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */ + var latestInfo: StageInfo = StageInfo.fromStage(this) def isAvailable: Boolean = { if (!isShuffleMap) { @@ -116,6 +119,7 @@ private[spark] class Stage( } } + /** Return a new attempt id, starting with 0. */ def newAttemptId(): Int = { val id = nextAttemptId nextAttemptId += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 2a407e47a05bd..c6dc3369ba5cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo @DeveloperApi class StageInfo( val stageId: Int, + val attemptId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], @@ -56,9 +57,15 @@ private[spark] object StageInfo { * shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a * sequence of narrow dependencies should also be associated with this Stage. */ - def fromStage(stage: Stage): StageInfo = { + def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos - new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details) + new StageInfo( + stage.id, + stage.attemptId, + stage.name, + numTasks.getOrElse(stage.numTasks), + rddInfos, + stage.details) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6c0d1b2752a81..ad051e59af86d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl( execId: String, taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId): Boolean = { - val metricsWithStageIds = taskMetrics.flatMap { - case (id, metrics) => { + + val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { + taskMetrics.flatMap { case (id, metrics) => taskIdToTaskSetId.get(id) .flatMap(activeTaskSets.get) - .map(_.stageId) - .map(x => (id, x, metrics)) + .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics)) } } dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala index 613fa7850bb25..c3ad325156f53 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala @@ -31,9 +31,5 @@ private[spark] class TaskSet( val properties: Properties) { val id: String = stageId + "." + attempt - def kill(interruptThread: Boolean) { - tasks.foreach(_.kill(interruptThread)) - } - override def toString: String = "TaskSet " + id } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 0cc51c873727d..2987dc04494a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils -/** Page showing executor summary */ -private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { +/** Stage summary grouped by executors. */ +private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) { private val listener = parent.listener def toNodeSeq: Seq[Node] = { @@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - listener.stageIdToData.get(stageId) match { + listener.stageIdToData.get((stageId, stageAttemptId)) match { case Some(stageData: StageUIData) => stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 74cd637d88155..f7f918fd521a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -43,12 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // How many stages to remember val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - val activeStages = HashMap[Int, StageInfo]() + // Map from stageId to StageInfo + val activeStages = new HashMap[Int, StageInfo] + + // Map from (stageId, attemptId) to StageUIData + val stageIdToData = new HashMap[(Int, Int), StageUIData] + val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - val stageIdToData = new HashMap[Int, StageUIData] - + // Map from pool name to a hash map (map from stage id to StageInfo). val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() @@ -59,9 +63,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo - val stageId = stage.stageId - val stageData = stageIdToData.getOrElseUpdate(stageId, { - logWarning("Stage completed for unknown stage " + stageId) + val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { + logWarning("Stage completed for unknown stage " + stage.stageId) new StageUIData }) @@ -69,8 +72,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.accumulables(id) = info } - poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId)) - activeStages.remove(stageId) + poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap => + hashMap.remove(stage.stageId) + } + activeStages.remove(stage.stageId) if (stage.failureReason.isEmpty) { completedStages += stage trimIfNecessary(completedStages) @@ -84,7 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) } + stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) } stages.trimStart(toRemove) } } @@ -98,21 +103,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData) + val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) stageData.schedulingPool = poolName stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]) stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, { + val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { logWarning("Task start for unknown stage " + taskStart.stageId) new StageUIData }) @@ -128,8 +133,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo - if (info != null) { - val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, { + // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task + // compeletion event is for. Let's just drop it here. This means we might have some speculation + // tasks on the web ui that's never marked as complete. + if (info != null && taskEnd.stageAttemptId != -1) { + val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), { logWarning("Task end for unknown stage " + taskEnd.stageId) new StageUIData }) @@ -222,8 +230,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { - for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) { - val stageData = stageIdToData.getOrElseUpdate(sid, { + for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { + val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { logWarning("Metrics update for task in unknown stage " + sid) new StageUIData }) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d4eb02722ad12..db01be596e073 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt - val stageDataOption = listener.stageIdToData.get(stageId) + val stageAttemptId = request.getParameter("attempt").toInt + val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId)) if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content = @@ -42,14 +43,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {

Summary Metrics

No tasks have started yet

Tasks

No tasks have started yet - return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), content, parent) + return UIUtils.headerSparkPage( + s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent) } val stageData = stageDataOption.get val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val accumulables = listener.stageIdToData(stageId).accumulables + val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables val hasInput = stageData.inputBytes > 0 val hasShuffleRead = stageData.shuffleReadBytes > 0 val hasShuffleWrite = stageData.shuffleWriteBytes > 0 @@ -211,7 +213,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def quantileRow(data: Seq[Node]): Seq[Node] = {data} Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } - val executorTable = new ExecutorTable(stageId, parent) + + val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) val maybeAccumulableTable: Seq[Node] = if (accumulables.size > 0) {

Accumulators

++ accumulableTable } else Seq() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 16ad0df45aa0d..2e67310594784 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -97,8 +97,8 @@ private[ui] class StageTableBase( } // scalastyle:on - val nameLinkUri ="%s/stages/stage?id=%s" - .format(UIUtils.prependBaseUri(parent.basePath), s.stageId) + val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s" + .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId) val nameLink = {s.name} val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0) @@ -121,7 +121,7 @@ private[ui] class StageTableBase( } val stageDesc = for { - stageData <- listener.stageIdToData.get(s.stageId) + stageData <- listener.stageIdToData.get((s.stageId, s.attemptId)) desc <- stageData.description } yield {
{desc}
@@ -131,7 +131,7 @@ private[ui] class StageTableBase( } protected def stageRow(s: StageInfo): Seq[Node] = { - val stageDataOption = listener.stageIdToData.get(s.stageId) + val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) if (stageDataOption.isEmpty) { return {s.stageId}No data available for this stage } @@ -154,7 +154,11 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - {s.stageId} ++ + {if (s.attemptId > 0) { + {s.stageId} (retry {s.attemptId}) + } else { + {s.stageId} + }} ++ {if (isFairScheduler) { Utils.getFormattedClassName(taskStart)) ~ ("Stage ID" -> taskStart.stageId) ~ + ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ ("Task Info" -> taskInfoToJson(taskInfo)) } @@ -112,6 +113,7 @@ private[spark] object JsonProtocol { val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing ("Event" -> Utils.getFormattedClassName(taskEnd)) ~ ("Stage ID" -> taskEnd.stageId) ~ + ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ ("Task Type" -> taskEnd.taskType) ~ ("Task End Reason" -> taskEndReason) ~ ("Task Info" -> taskInfoToJson(taskInfo)) ~ @@ -187,6 +189,7 @@ private[spark] object JsonProtocol { val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) ("Stage ID" -> stageInfo.stageId) ~ + ("Stage Attempt ID" -> stageInfo.attemptId) ~ ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ @@ -419,8 +422,9 @@ private[spark] object JsonProtocol { def taskStartFromJson(json: JValue): SparkListenerTaskStart = { val stageId = (json \ "Stage ID").extract[Int] + val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) val taskInfo = taskInfoFromJson(json \ "Task Info") - SparkListenerTaskStart(stageId, taskInfo) + SparkListenerTaskStart(stageId, stageAttemptId, taskInfo) } def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = { @@ -430,11 +434,12 @@ private[spark] object JsonProtocol { def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { val stageId = (json \ "Stage ID").extract[Int] + val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) val taskType = (json \ "Task Type").extract[String] val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason") val taskInfo = taskInfoFromJson(json \ "Task Info") val taskMetrics = taskMetricsFromJson(json \ "Task Metrics") - SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics) + SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics) } def jobStartFromJson(json: JValue): SparkListenerJobStart = { @@ -492,6 +497,7 @@ private[spark] object JsonProtocol { def stageInfoFromJson(json: JValue): StageInfo = { val stageId = (json \ "Stage ID").extract[Int] + val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0) val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_)) @@ -504,7 +510,7 @@ private[spark] object JsonProtocol { case None => Seq[AccumulableInfo]() } - val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) + val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 51fb646a3cb61..7671cb969a26b 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -69,10 +69,10 @@ class StorageStatusListenerSuite extends FunSuite { // Task end with no updated blocks assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics)) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics)) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } @@ -92,13 +92,13 @@ class StorageStatusListenerSuite extends FunSuite { // Task end with new blocks assert(listener.executorIdToStorageStatus("big").numBlocks === 0) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) assert(listener.executorIdToStorageStatus("big").numBlocks === 2) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 2) assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) @@ -111,13 +111,14 @@ class StorageStatusListenerSuite extends FunSuite { val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) assert(listener.executorIdToStorageStatus("big").numBlocks === 1) assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 1) assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) @@ -135,8 +136,8 @@ class StorageStatusListenerSuite extends FunSuite { val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) taskMetrics2.updatedBlocks = Some(Seq(block3)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2)) assert(listener.executorIdToStorageStatus("big").numBlocks === 3) // Unpersist RDD diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 147ec0bc52e39..3370dd4156c3f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") SparkListenerStageSubmitted(stageInfo) } def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") SparkListenerStageCompleted(stageInfo) } @@ -70,33 +70,37 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 var task = new ShuffleMapTask(0) val taskType = Utils.getFormattedClassName(task) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) - .shuffleRead === 1000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) - .shuffleRead === 2000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000) // finish this task, should get updated duration taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0) - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) - .shuffleRead === 1000) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics)) + assert(listener.stageIdToData.getOrElse((0, 0), fail()) + .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000) } test("test task success vs failure counting for different task end reasons") { @@ -119,16 +123,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc UnknownReason) var failCount = 0 for (reason <- taskFailedReasons) { - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics)) failCount += 1 - assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0) - assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) + assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0) + assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) } // Make sure we count success as success. - listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1) - assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) + listener.onTaskEnd( + SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics)) + assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1) + assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount) } test("test update metrics") { @@ -163,18 +169,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo } - listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L))) - listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L))) - listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L))) - listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L))) + listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L))) + listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L))) + listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) + listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, makeTaskMetrics(0)), - (1235L, 0, makeTaskMetrics(100)), - (1236L, 1, makeTaskMetrics(200))))) + (1234L, 0, 0, makeTaskMetrics(0)), + (1235L, 0, 0, makeTaskMetrics(100)), + (1236L, 1, 0, makeTaskMetrics(200))))) - var stage0Data = listener.stageIdToData.get(0).get - var stage1Data = listener.stageIdToData.get(1).get + var stage0Data = listener.stageIdToData.get((0, 0)).get + var stage1Data = listener.stageIdToData.get((1, 0)).get assert(stage0Data.shuffleReadBytes == 102) assert(stage1Data.shuffleReadBytes == 201) assert(stage0Data.shuffleWriteBytes == 106) @@ -195,14 +201,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc .totalBlocksFetched == 202) // task that was included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, makeTaskInfo(1234L, 1), + listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), makeTaskMetrics(300))) // task that wasn't included in a heartbeat - listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, makeTaskInfo(1237L, 1), + listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1), makeTaskMetrics(400))) - stage0Data = listener.stageIdToData.get(0).get - stage1Data = listener.stageIdToData.get(1).get + stage0Data = listener.stageIdToData.get((0, 0)).get + stage1Data = listener.stageIdToData.get((1, 0)).get assert(stage0Data.shuffleReadBytes == 402) assert(stage1Data.shuffleReadBytes == 602) assert(stage0Data.shuffleWriteBytes == 406) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 6e68dcb3425aa..b860177705d84 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -53,7 +53,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.isEmpty) // 2 RDDs are known, but none are cached - val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.isEmpty) @@ -63,7 +63,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo3Cached = rddInfo3 rddInfo2Cached.numCachedPartitions = 1 rddInfo3Cached.numCachedPartitions = 1 - val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") + val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { // Submitting RDDInfos with duplicate IDs does nothing val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY) rddInfo0Cached.numCachedPartitions = 1 - val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details") + val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -87,7 +87,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo1Cached = rddInfo1 rddInfo0Cached.numCachedPartitions = 1 rddInfo1Cached.numCachedPartitions = 1 - val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.size === 2) @@ -106,7 +106,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo0 = rddInfo0 val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 - val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) @@ -116,7 +116,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(!storageListener._rddInfoMap(2).isCached) // Task end with no updated blocks. This should not change anything. - bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics)) + bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) @@ -128,7 +128,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)), (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L)) )) - bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1)) + bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1)) assert(storageListener._rddInfoMap(0).memSize === 800L) assert(storageListener._rddInfoMap(0).diskSize === 400L) assert(storageListener._rddInfoMap(0).tachyonSize === 200L) @@ -150,7 +150,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist )) - bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2)) + bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2)) assert(storageListener._rddInfoMap(0).memSize === 400L) assert(storageListener._rddInfoMap(0).diskSize === 400L) assert(storageListener._rddInfoMap(0).tachyonSize === 200L) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 97ffb07662482..2fd3b9cfd221a 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -35,13 +35,13 @@ class JsonProtocolSuite extends FunSuite { val stageSubmitted = SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L)) - val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false)) + val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 444L, false)) val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) - val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) - val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) @@ -397,7 +397,8 @@ class JsonProtocolSuite extends FunSuite { private def assertJsonStringEquals(json1: String, json2: String) { val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "") - assert(formatJsonString(json1) === formatJsonString(json2)) + assert(formatJsonString(json1) === formatJsonString(json2), + s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}") } private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) { @@ -485,7 +486,7 @@ class JsonProtocolSuite extends FunSuite { private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) } - val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details") + val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details") val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2)) stageInfo.accumulables(acc1.id) = acc1 stageInfo.accumulables(acc2.id) = acc2 @@ -558,84 +559,246 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ - {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": - "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details", - "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties": - {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + |{ + | "Event": "SparkListenerStageSubmitted", + | "Stage Info": { + | "Stage ID": 100, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 200, + | "RDD Info": [], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | }, + | "Properties": { + | "France": "Paris", + | "Germany": "Berlin", + | "Russia": "Moscow", + | "Ukraine": "Kiev" + | } + |} """ private val stageCompletedJsonString = """ - {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": - "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage - Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, - "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, - "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details", - "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}} + |{ + | "Event": "SparkListenerStageCompleted", + | "Stage Info": { + | "Stage ID": 101, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 201, + | "RDD Info": [ + | { + | "RDD ID": 101, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 201, + | "Number of Cached Partitions": 301, + | "Memory Size": 401, + | "Tachyon Size": 0, + | "Disk Size": 501 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | } + |} """ private val taskStartJsonString = """ - |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, - |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", - |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, - |"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - |"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}} + |{ + | "Event": "SparkListenerTaskStart", + | "Stage ID": 111, + | "Stage Attempt ID": 0, + | "Task Info": { + | "Task ID": 222, + | "Index": 333, + | "Attempt": 1, + | "Launch Time": 444, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] + | } + |} """.stripMargin private val taskGettingResultJsonString = """ - |{"Event":"SparkListenerTaskGettingResult","Task Info": - | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, - | "Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] + |{ + | "Event": "SparkListenerTaskGettingResult", + | "Task Info": { + | "Task ID": 1000, + | "Index": 2000, + | "Attempt": 5, + | "Launch Time": 3000, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": true, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] | } |} """.stripMargin private val taskEndJsonString = """ - |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - |"Task End Reason":{"Reason":"Success"}, - |"Task Info":{ - | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] - |}, - |"Task Metrics":{ - | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, - | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, - | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, - | "Shuffle Read Metrics":{ - | "Shuffle Finish Time":900, - | "Remote Blocks Fetched":800, - | "Local Blocks Fetched":700, - | "Fetch Wait Time":900, - | "Remote Bytes Read":1000 + |{ + | "Event": "SparkListenerTaskEnd", + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Task Type": "ShuffleMapTask", + | "Task End Reason": { + | "Reason": "Success" | }, - | "Shuffle Write Metrics":{ - | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500 + | "Task Info": { + | "Task ID": 123, + | "Index": 234, + | "Attempt": 67, + | "Launch Time": 345, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" + | } + | ] | }, - | "Updated Blocks":[ - | {"Block ID":"rdd_0_0", - | "Status":{ - | "Storage Level":{ - | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, - | "Replication":2 - | }, - | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Shuffle Read Metrics": { + | "Shuffle Finish Time": 900, + | "Remote Blocks Fetched": 800, + | "Local Blocks Fetched": 700, + | "Fetch Wait Time": 900, + | "Remote Bytes Read": 1000 + | }, + | "Shuffle Write Metrics": { + | "Shuffle Bytes Written": 1200, + | "Shuffle Write Time": 1500 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "Tachyon Size": 0, + | "Disk Size": 0 + | } | } - | } | ] | } |} @@ -643,80 +806,187 @@ class JsonProtocolSuite extends FunSuite { private val taskEndWithHadoopInputJsonString = """ - |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", - |"Task End Reason":{"Reason":"Success"}, - |"Task Info":{ - | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", - | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false, - | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1", - | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"}, - | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}] - |}, - |"Task Metrics":{ - | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, - | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, - | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, - | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, - | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, - | "Updated Blocks":[ - | {"Block ID":"rdd_0_0", - | "Status":{ - | "Storage Level":{ - | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, - | "Replication":2 - | }, - | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + |{ + | "Event": "SparkListenerTaskEnd", + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Task Type": "ShuffleMapTask", + | "Task End Reason": { + | "Reason": "Success" + | }, + | "Task Info": { + | "Task ID": 123, + | "Index": 234, + | "Attempt": 67, + | "Launch Time": 345, + | "Executor ID": "executor", + | "Host": "your kind sir", + | "Locality": "NODE_LOCAL", + | "Speculative": false, + | "Getting Result Time": 0, + | "Finish Time": 0, + | "Failed": false, + | "Accumulables": [ + | { + | "ID": 1, + | "Name": "Accumulable1", + | "Update": "delta1", + | "Value": "val1" + | }, + | { + | "ID": 2, + | "Name": "Accumulable2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 3, + | "Name": "Accumulable3", + | "Update": "delta3", + | "Value": "val3" | } - | } - | ]} + | ] + | }, + | "Task Metrics": { + | "Host Name": "localhost", + | "Executor Deserialize Time": 300, + | "Executor Run Time": 400, + | "Result Size": 500, + | "JVM GC Time": 600, + | "Result Serialization Time": 700, + | "Memory Bytes Spilled": 800, + | "Disk Bytes Spilled": 0, + | "Shuffle Write Metrics": { + | "Shuffle Bytes Written": 1200, + | "Shuffle Write Time": 1500 + | }, + | "Input Metrics": { + | "Data Read Method": "Hadoop", + | "Bytes Read": 2100 + | }, + | "Updated Blocks": [ + | { + | "Block ID": "rdd_0_0", + | "Status": { + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": false, + | "Replication": 2 + | }, + | "Memory Size": 0, + | "Tachyon Size": 0, + | "Disk Size": 0 + | } + | } + | ] + | } |} """ private val jobStartJsonString = """ - {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": - {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} + |{ + | "Event": "SparkListenerJobStart", + | "Job ID": 10, + | "Stage IDs": [ + | 1, + | 2, + | 3, + | 4 + | ], + | "Properties": { + | "France": "Paris", + | "Germany": "Berlin", + | "Russia": "Moscow", + | "Ukraine": "Kiev" + | } + |} """ private val jobEndJsonString = """ - {"Event":"SparkListenerJobEnd","Job ID":20,"Job Result":{"Result":"JobSucceeded"}} + |{ + | "Event": "SparkListenerJobEnd", + | "Job ID": 20, + | "Job Result": { + | "Result": "JobSucceeded" + | } + |} """ private val environmentUpdateJsonString = """ - {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"GC speed":"9999 objects/s", - "Java home":"Land of coffee"},"Spark Properties":{"Job throughput":"80000 jobs/s, - regardless of job type"},"System Properties":{"Username":"guest","Password":"guest"}, - "Classpath Entries":{"Super library":"/tmp/super_library"}} + |{ + | "Event": "SparkListenerEnvironmentUpdate", + | "JVM Information": { + | "GC speed": "9999 objects/s", + | "Java home": "Land of coffee" + | }, + | "Spark Properties": { + | "Job throughput": "80000 jobs/s, regardless of job type" + | }, + | "System Properties": { + | "Username": "guest", + | "Password": "guest" + | }, + | "Classpath Entries": { + | "Super library": "/tmp/super_library" + | } + |} """ private val blockManagerAddedJsonString = """ - {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"Stars", - "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500} + |{ + | "Event": "SparkListenerBlockManagerAdded", + | "Block Manager ID": { + | "Executor ID": "Stars", + | "Host": "In your multitude...", + | "Port": 300, + | "Netty Port": 400 + | }, + | "Maximum Memory": 500 + |} """ private val blockManagerRemovedJsonString = """ - {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", - "Host":"to be counted...","Port":100,"Netty Port":200}} + |{ + | "Event": "SparkListenerBlockManagerRemoved", + | "Block Manager ID": { + | "Executor ID": "Scarce", + | "Host": "to be counted...", + | "Port": 100, + | "Netty Port": 200 + | } + |} """ private val unpersistRDDJsonString = """ - {"Event":"SparkListenerUnpersistRDD","RDD ID":12345} + |{ + | "Event": "SparkListenerUnpersistRDD", + | "RDD ID": 12345 + |} """ private val applicationStartJsonString = """ - {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42, - "User":"Garfield"} + |{ + | "Event": "SparkListenerApplicationStart", + | "App Name": "The winner of all", + | "Timestamp": 42, + | "User": "Garfield" + |} """ private val applicationEndJsonString = """ - {"Event":"SparkListenerApplicationEnd","Timestamp":42} + |{ + | "Event": "SparkListenerApplicationEnd", + | "Timestamp": 42 + |} """ } From a2e658dcdab614058eefcf50ae2d419ece9b1fe7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 20 Aug 2014 15:51:14 -0700 Subject: [PATCH 08/11] [SPARK-2967][SQL] Fix sort based shuffle for spark sql. Add explicit row copies when sort based shuffle is on. Author: Michael Armbrust Closes #2066 from marmbrus/sortShuffle and squashes the following commits: fcd7bb2 [Michael Armbrust] Fix sort based shuffle for spark sql. --- .../apache/spark/sql/execution/Exchange.scala | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 77dc2ad733215..09c34b7059fc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf} +import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf} import org.apache.spark.rdd.ShuffledRDD import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.errors.attachTree @@ -37,6 +38,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una def output = child.output + /** We must copy rows when sort based shuffle is on */ + protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => @@ -45,8 +49,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una @transient val hashExpressions = newMutableProjection(expressions, child.output)() - val mutablePair = new MutablePair[Row, Row]() - iter.map(r => mutablePair.update(hashExpressions(r), r)) + if (sortBasedShuffleOn) { + iter.map(r => (hashExpressions(r), r.copy())) + } else { + val mutablePair = new MutablePair[Row, Row]() + iter.map(r => mutablePair.update(hashExpressions(r), r)) + } } val part = new HashPartitioner(numPartitions) val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) @@ -58,8 +66,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una implicit val ordering = new RowOrdering(sortingExpressions, child.output) val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Row, Null](null, null) - iter.map(row => mutablePair.update(row, null)) + if (sortBasedShuffleOn) { + iter.map(row => (row.copy(), null)) + } else { + val mutablePair = new MutablePair[Row, Null](null, null) + iter.map(row => mutablePair.update(row, null)) + } } val part = new RangePartitioner(numPartitions, rdd, ascending = true) val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) @@ -69,8 +81,12 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case SinglePartition => val rdd = child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Null, Row]() - iter.map(r => mutablePair.update(null, r)) + if (sortBasedShuffleOn) { + iter.map(r => (null, r.copy())) + } else { + val mutablePair = new MutablePair[Null, Row]() + iter.map(r => mutablePair.update(null, r)) + } } val partitioner = new HashPartitioner(1) val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) From a1e8b1bc973bc0517681c09e5a5a475c0f395d31 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 20 Aug 2014 16:00:46 -0700 Subject: [PATCH 09/11] SPARK_LOGFILE and SPARK_ROOT_LOGGER no longer need in spark-daemon.sh Author: wangfei Closes #2057 from scwf/patch-7 and squashes the following commits: 1b7b9a5 [wangfei] SPARK_LOGFILE and SPARK_ROOT_LOGGER no longer need in spark-daemon.sh --- sbin/spark-daemon.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index 323f675b17848..9032f23ea8eff 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -113,8 +113,6 @@ if [ "$SPARK_PID_DIR" = "" ]; then fi # some variables -export SPARK_LOGFILE=spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.log -export SPARK_ROOT_LOGGER="INFO,DRFA" log=$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out pid=$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid From d9e94146a6e65be110a62e3bd0351148912a41d1 Mon Sep 17 00:00:00 2001 From: Alex Liu Date: Wed, 20 Aug 2014 16:14:06 -0700 Subject: [PATCH 10/11] [SPARK-2846][SQL] Add configureInputJobPropertiesForStorageHandler to initialization of job conf ...al job conf Author: Alex Liu Closes #1927 from alexliu68/SPARK-SQL-2846 and squashes the following commits: e4bdc4c [Alex Liu] SPARK-SQL-2846 add configureInputJobPropertiesForStorageHandler to initial job conf --- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 82c88280d7754..329f80cad471e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc} import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector @@ -249,6 +249,7 @@ private[hive] object HadoopTableReader extends HiveInspectors { def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) { FileInputFormat.setInputPaths(jobConf, path) if (tableDesc != null) { + PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc) Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf) } val bufferSize = System.getProperty("spark.buffer.size", "65536") From c9f743957fa963bc1dbed7a44a346ffce1a45cf2 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 20 Aug 2014 16:23:10 -0700 Subject: [PATCH 11/11] [SPARK-2848] Shade Guava in uber-jars. For further discussion, please check the JIRA entry. This change moves Guava classes to a different package so that they don't conflict with the user-provided Guava (or the Hadoop-provided one). Since one class (Optional) was exposed through Spark's public API, that class was forked from Guava at the current dependency version (14.0.1) so that it can be kept going forward (until the API is cleaned). Note this change has a few implications: - *all* classes in the final jars will reference the relocated classes. If Hadoop classes are included (i.e. "-Phadoop-provided" is not activated), those will also reference the Guava 14 classes (instead of the Guava 11 classes from the Hadoop classpath). - if the Guava version in Spark is ever changed, the new Guava will still reference the forked Optional class; this may or may not be a problem, but in the long term it's better to think about removing Optional from the public API. For the end user, there are two visible implications: - Guava is not provided as a transitive dependency anymore (since it's "provided" in Spark) - At runtime, unless they provide their own, they'll either have no Guava or Hadoop's version of Guava (11), depending on how they set up their classpath. Note that this patch does not change the sbt deliverables; those will still contain guava in its original package, and provide guava as a compile-time dependency. This assumes that maven is the canonical build, and sbt-built artifacts are not (officially) published. Author: Marcelo Vanzin Closes #1813 from vanzin/SPARK-2848 and squashes the following commits: 9bdffb0 [Marcelo Vanzin] Undo sbt build changes. 819b445 [Marcelo Vanzin] Review feedback. 05e0a3d [Marcelo Vanzin] Merge branch 'master' into SPARK-2848 fef4370 [Marcelo Vanzin] Unfork Optional.java. d3ea8e1 [Marcelo Vanzin] Exclude asm classes from final jar. 637189b [Marcelo Vanzin] Add hacky filter to prefer Spark's copy of Optional. 2fec990 [Marcelo Vanzin] Shade Guava in the sbt build. 616998e [Marcelo Vanzin] Shade Guava in the maven build, fork Guava's Optional.java. --- assembly/pom.xml | 18 ++++++++++++++++++ core/pom.xml | 35 +++++++++++++++++++++++++++++++++++ examples/pom.xml | 26 +++++++++++++++++++++++++- pom.xml | 16 ++++++++++++++++ project/SparkBuild.scala | 4 ++-- project/plugins.sbt | 4 ++++ 6 files changed, 100 insertions(+), 3 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 9fbb037115db3..de7b75258e3c5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -43,6 +43,12 @@ + + + com.google.guava + guava + compile + org.apache.spark spark-core_${scala.binary.version} @@ -113,6 +119,18 @@ shade + + + com.google + org.spark-project.guava + + com.google.common.** + + + com.google.common.base.Optional** + + + diff --git a/core/pom.xml b/core/pom.xml index 6d8be37037729..83c708dfc9619 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -68,9 +68,15 @@ org.eclipse.jetty jetty-server + com.google.guava guava + compile org.apache.commons @@ -322,6 +328,35 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + false + + + com.google.guava:guava + + + + + + com.google.guava:guava + + com/google/common/base/Optional* + + + + + + + diff --git a/examples/pom.xml b/examples/pom.xml index 8c4c128bb484d..9b12cb0c29c9f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -46,8 +46,14 @@
- + + + + com.google.guava + guava + compile + org.apache.spark spark-core_${scala.binary.version} @@ -209,6 +215,12 @@ + + com.google.guava:guava + + com/google/common/base/Optional* + + *:* @@ -226,6 +238,18 @@ shade + + + com.google + org.spark-project.guava + + com.google.common.** + + + com.google.common.base.Optional** + + + diff --git a/pom.xml b/pom.xml index 7ed07ad7df88d..9cbf3ea5995c3 100644 --- a/pom.xml +++ b/pom.xml @@ -260,6 +260,7 @@ com.google.guava guava 14.0.1 + provided org.apache.commons @@ -1017,6 +1018,21 @@ + + + sbt + + + com.google.guava + guava + compile + + + + spark-ganglia-lgpl diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 49d52aefca17a..4c696d3d385fb 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -61,7 +61,7 @@ object SparkBuild extends PomBuild { def backwardCompatibility = { import scala.collection.mutable var isAlphaYarn = false - var profiles: mutable.Seq[String] = mutable.Seq.empty + var profiles: mutable.Seq[String] = mutable.Seq("sbt") if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) { println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.") profiles ++= Seq("spark-ganglia-lgpl") @@ -116,7 +116,7 @@ object SparkBuild extends PomBuild { retrieveManaged := true, retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", publishMavenStyle := true, - + resolvers += Resolver.mavenLocal, otherResolvers <<= SbtPomKeys.mvnLocalRepository(dotM2 => Seq(Resolver.file("dotM2", dotM2))), publishLocalConfiguration in MavenCompile <<= (packagedArtifacts, deliverLocal, ivyLoggingLevel) map { diff --git a/project/plugins.sbt b/project/plugins.sbt index 2a61f56c2ea60..8096c61414660 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -26,3 +26,7 @@ addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1") addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1") addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2") + +libraryDependencies += "org.ow2.asm" % "asm" % "5.0.3" + +libraryDependencies += "org.ow2.asm" % "asm-commons" % "5.0.3"