Skip to content

Commit

Permalink
Deal with quotes + address various comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed May 16, 2014
1 parent fe4c8a7 commit 6fba412
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
24 changes: 20 additions & 4 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR"
SCALA_VERSION=2.10

if [[ "$@" == *--help* ]]; then
echo "Usage: ./bin/pyspark [python file] [options]"
echo "Usage: ./bin/pyspark [options]"
./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
Expand Down Expand Up @@ -57,15 +57,31 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
export PYSPARK_SUBMIT_ARGS="$@"

# If IPython options are specified, assume user wants to run IPython
if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

# If a python file is provided, directly run spark-submit
# Build up arguments list manually to preserve quotes. We export Spark submit arguments as an
# environment variable because shell.py must run as a PYTHONSTARTUP script, which does not take
# in arguments. This is required mainly for IPython notebooks.

PYSPARK_SUBMIT_ARGS=""
whitespace="[[:space:]]"
for i in "$@"; do
if [[ $i =~ $whitespace ]]; then
i=\"$i\"
fi
PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
done
export PYSPARK_SUBMIT_ARGS

# If a python file is provided, directly run spark-submit.
if [[ "$1" =~ \.py$ ]]; then
exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0."
echo -e "Use ./bin/spark-submit <python file>\n"
exec $FWDIR/bin/spark-submit "$@"
else
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ object SparkSubmit {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = Utils.mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("")
args.files = Utils.mergeFileLists(args.files, pyFiles)
args.files = mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}

Expand Down Expand Up @@ -300,7 +300,7 @@ object SparkSubmit {

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists) {
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
}

Expand Down Expand Up @@ -328,6 +328,18 @@ object SparkSubmit {
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
*/
private[spark] def mergeFileLists(lists: String*): String = {
lists
.filter(_ != null)
.filter(_ != "")
.flatMap(_.split(","))
.mkString(",")
}
}

/**
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,16 +1166,4 @@ private[spark] object Utils extends Logging {
true
}
}

/**
* Merge a sequence of comma-separated file lists into a single comma-separated string.
* The provided strings may be null or empty to indicate no files.
*/
def mergeFileLists(lists: String*): String = {
lists
.filter(_ != null)
.filter(_ != "")
.flatMap(_.split(","))
.mkString(",")
}
}
33 changes: 28 additions & 5 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

Expand All @@ -36,10 +35,7 @@ def launch_gateway():
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
if submit_args is not None:
submit_args = submit_args.split(" ")
else:
submit_args = []
submit_args = split_preserve_quotes(submit_args)
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
Expand Down Expand Up @@ -80,3 +76,30 @@ def run(self):
java_import(gateway.jvm, "scala.Tuple2")

return gateway

def split_preserve_quotes(args):
"""
Given a string of space-delimited arguments with quotes,
split it into a list while preserving the quote boundaries.
"""
if args is None:
return []
split_list = []
quoted_string = ""
wait_for_quote = False
for arg in args.split(" "):
if not wait_for_quote:
if arg.startswith("\""):
wait_for_quote = True
quoted_string = arg
else:
split_list.append(arg)
else:
quoted_string += " " + arg
if quoted_string.endswith("\""):
# Strip quotes
quoted_string = quoted_string[1:-1]
split_list.append(quoted_string)
quoted_string = ""
wait_for_quote = False
return split_list

0 comments on commit 6fba412

Please sign in to comment.