From 4b8ec6fcfd7a7ef0857d5b21917183c181301c95 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 22:34:38 -0700 Subject: [PATCH] [SPARK-1808] Route bin/pyspark through Spark submit **Problem.** For `bin/pyspark`, there is currently no other way to specify Spark configuration properties other than through `SPARK_JAVA_OPTS` in `conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, it needs to pick up configurations explicitly specified in `conf/spark-defaults.conf`. **Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This has the additional benefit of making the invocation of all the user facing Spark scripts consistent. **Details.** `bin/pyspark` inherently handles two cases: (1) running python applications and (2) running the python shell. For (1), Spark submit already handles running python applications. For cases in which `bin/pyspark` is given a python file, we can simply call pass the file directly to Spark submit and let it handle the rest. For case (2), `bin/pyspark` starts a python process as before, which launches the JVM as a sub-process. The existing code already provides a code path to do this. All we needed to change is to use `bin/spark-submit` instead of `spark-class` to launch the JVM. This requires modifications to Spark submit to handle the pyspark shell as a special case. This has been tested locally (OSX and Windows 7), on a standalone cluster, and on a YARN cluster. Running IPython also works as before, except now it takes in Spark submit arguments too. Author: Andrew Or Closes #799 from andrewor14/pyspark-submit and squashes the following commits: bf37e36 [Andrew Or] Minor changes 01066fa [Andrew Or] bin/pyspark for Windows c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes) 1866f85 [Andrew Or] Windows is not cooperating 456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not set 7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit b7ba0d8 [Andrew Or] Address a few comments (minor) 06eb138 [Andrew Or] Use shlex instead of writing our own parser 05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly 6fba412 [Andrew Or] Deal with quotes + address various comments fe4c8a7 [Andrew Or] Update --help for bin/pyspark afe47bf [Andrew Or] Fix spark shell f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-submit a371d26 [Andrew Or] Route bin/pyspark through Spark submit --- bin/pyspark | 35 ++++++++++-- bin/pyspark2.cmd | 21 ++++++- bin/spark-shell | 6 +- bin/spark-shell.cmd | 2 +- .../apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 55 ++++++++++++++----- .../spark/deploy/SparkSubmitArguments.scala | 6 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- python/pyspark/java_gateway.py | 10 ++-- python/pyspark/shell.py | 2 +- 10 files changed, 107 insertions(+), 34 deletions(-) diff --git a/bin/pyspark b/bin/pyspark index 10e35e0f1734e..9e1364e44c8c4 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR" SCALA_VERSION=2.10 +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + echo "Usage: ./bin/pyspark [options]" + ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + exit 0 +fi + # Exit if the user hasn't compiled Spark if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark @@ -52,13 +58,34 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py +# If IPython options are specified, assume user wants to run IPython if [ -n "$IPYTHON_OPTS" ]; then IPYTHON=1 fi -# Only use ipython if no command line arguments were provided [SPARK-1134] -if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then - exec ipython $IPYTHON_OPTS +# Build up arguments list manually to preserve quotes and backslashes. +# We export Spark submit arguments as an environment variable because shell.py must run as a +# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks. + +PYSPARK_SUBMIT_ARGS="" +whitespace="[[:space:]]" +for i in "$@"; do + if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi + if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi + PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i" +done +export PYSPARK_SUBMIT_ARGS + +# If a python file is provided, directly run spark-submit. +if [[ "$1" =~ \.py$ ]]; then + echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2 + echo -e "Use ./bin/spark-submit \n" 1>&2 + exec $FWDIR/bin/spark-submit "$@" else - exec "$PYSPARK_PYTHON" "$@" + # Only use ipython if no command line arguments were provided [SPARK-1134] + if [[ "$IPYTHON" = "1" ]]; then + exec ipython $IPYTHON_OPTS + else + exec "$PYSPARK_PYTHON" + fi fi diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index d7cfd5eec501c..0ef9eea95342e 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -31,7 +31,7 @@ set FOUND_JAR=0 for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( set FOUND_JAR=1 ) -if "%FOUND_JAR%"=="0" ( +if [%FOUND_JAR%] == [0] ( echo Failed to find Spark assembly JAR. echo You need to build Spark with sbt\sbt assembly before running this program. goto exit @@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if it exists if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" rem Figure out which Python to use. -if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python +if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python set PYTHONPATH=%FWDIR%python;%PYTHONPATH% set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py +set PYSPARK_SUBMIT_ARGS=%* echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH% -"%PYSPARK_PYTHON%" %* +rem Check whether the argument is a file +for /f %%i in ('echo %1^| findstr /R "\.py"') do ( + set PYTHON_FILE=%%i +) + +if [%PYTHON_FILE%] == [] ( + %PYSPARK_PYTHON% +) else ( + echo. + echo WARNING: Running python applications through ./bin/pyspark.cmd is deprecated as of Spark 1.0. + echo Use ./bin/spark-submit ^ + echo. + "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS% +) + :exit diff --git a/bin/spark-shell b/bin/spark-shell index 7f03349c5e910..c158683ab3f99 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -28,7 +28,7 @@ esac # Enter posix mode for bash set -o posix -if [[ "$@" == *--help* ]]; then +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then echo "Usage: ./bin/spark-shell [options]" ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 0 @@ -46,11 +46,11 @@ function main(){ # (see https://github.com/sbt/sbt/issues/562). stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" - $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS - $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main fi } diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd index ca0c722c926f5..4b9708a8c03f3 100755 --- a/bin/spark-shell.cmd +++ b/bin/spark-shell.cmd @@ -19,4 +19,4 @@ rem set SPARK_HOME=%~dp0.. -cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class org.apache.spark.repl.Main +cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index e20d4486c8f0c..2dfa02bd26f13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -42,7 +42,7 @@ object PythonRunner { // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument val pathElements = new ArrayBuffer[String] - pathElements ++= pyFiles.split(",") + pathElements ++= Option(pyFiles).getOrElse("").split(",") pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e86182e4c56ce..a99b2176e2b5e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,10 +41,10 @@ object SparkSubmit { private var clusterManager: Int = LOCAL /** - * A special jar name that indicates the class being run is inside of Spark itself, - * and therefore no user jar is needed. + * Special primary resource names that represent shells rather than application jars. */ - private val RESERVED_JAR_NAME = "spark-internal" + private val SPARK_SHELL = "spark-shell" + private val PYSPARK_SHELL = "pyspark-shell" def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) @@ -71,8 +71,8 @@ object SparkSubmit { * entries for the child, a list of system properties, a list of env vars * and the main class for the child */ - private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], - ArrayBuffer[String], Map[String, String], String) = { + private[spark] def createLaunchEnv(args: SparkSubmitArguments) + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { if (args.master.startsWith("local")) { clusterManager = LOCAL } else if (args.master.startsWith("yarn")) { @@ -121,24 +121,30 @@ object SparkSubmit { printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } - // If we're running a Python app, set the Java class to run to be our PythonRunner, add - // Python files to deployment list, and pass the main file and Python path to PythonRunner + // If we're running a python app, set the main class to our specific python runner if (isPython) { if (deployOnCluster) { printErrorAndExit("Cannot currently run Python driver programs on cluster") } - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "py4j.GatewayServer" + args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.files = mergeFileLists(args.files, args.primaryResource) + } val pyFiles = Option(args.pyFiles).getOrElse("") - args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs - args.primaryResource = RESERVED_JAR_NAME + args.files = mergeFileLists(args.files, pyFiles) sysProps("spark.submit.pyFiles") = pyFiles } // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { childMainClass = args.mainClass - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } } else if (clusterManager == YARN) { @@ -219,7 +225,7 @@ object SparkSubmit { // For python files, the primary resource is already distributed as a regular file if (!isYarnCluster && !isPython) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) @@ -293,7 +299,7 @@ object SparkSubmit { } private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { - val localJarFile = new File(new URI(localJar).getPath()) + val localJarFile = new File(new URI(localJar).getPath) if (!localJarFile.exists()) { printWarning(s"Jar $localJar does not exist, skipping.") } @@ -302,6 +308,27 @@ object SparkSubmit { loader.addURL(url) } + /** + * Return whether the given primary resource represents a user jar. + */ + private def isUserJar(primaryResource: String): Boolean = { + !isShell(primaryResource) && !isPython(primaryResource) + } + + /** + * Return whether the given primary resource represents a shell. + */ + private def isShell(primaryResource: String): Boolean = { + primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL + } + + /** + * Return whether the given primary resource requires running python. + */ + private[spark] def isPython(primaryResource: String): Boolean = { + primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL + } + /** * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 2d327aa3fb27f..264d4544cd31c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { case v => primaryResource = v inSparkOpts = false - isPython = v.endsWith(".py") + isPython = SparkSubmit.isPython(v) parse(tail) } } else { - childArgs += value + if (!value.isEmpty) { + childArgs += value + } parse(tail) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 388f7222428db..0c7cff019fce1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging { * Strip the directory from a path name */ def stripDirectory(path: String): String = { - path.split(File.separator).last + new File(path).getName } /** diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3d0936fdca911..91ae8263f66b8 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -18,12 +18,12 @@ import os import sys import signal +import shlex import platform from subprocess import Popen, PIPE from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient - def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] @@ -34,9 +34,11 @@ def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and settings from spark-env.sh on_windows = platform.system() == "Windows" - script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" - command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", - "--die-on-broken-pipe", "0"] + script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" + submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") + submit_args = submit_args if submit_args is not None else "" + submit_args = shlex.split(submit_args) + command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index d172d588bfbd8..ebd714db7a918 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -40,7 +40,7 @@ if os.environ.get("SPARK_EXECUTOR_URI"): SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) -sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files) +sc = SparkContext(appName="PySparkShell", pyFiles=add_files) print("""Welcome to ____ __