Skip to content

Commit

Permalink
Route bin/pyspark through Spark submit
Browse files Browse the repository at this point in the history
The bin/pyspark script takes two pathways, depending on the application.

If the application is a python file, bin/pyspark passes the python file
directly to Spark submit, which launches the python application as a
sub-process within the JVM.

If the application is the pyspark shell, however, bin/pyspark starts
the python REPL as the parent process, which launches the JVM as a
sub-process. A significant benefit here is that all keyboard signals
are propagated first to the Python interpreter properly. The existing
code already provided a code path to do this; all we need to change
is to use spark-submit instead of spark-class to launch the JVM. This
requires modifications to Spark submit to handle the pyspark shell
as a special case.

This has been tested locally (OSX) for both cases, and using IPython.
  • Loading branch information
andrewor14 committed May 16, 2014
1 parent d52761d commit a371d26
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 32 deletions.
14 changes: 10 additions & 4 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,20 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
export PYSPARK_SUBMIT_ARGS="$@"

if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
# If a python file is provided, directly run spark-submit
if [[ "$1" =~ \.py$ ]]; then
exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS
else
exec "$PYSPARK_PYTHON" "$@"
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= pyFiles.split(",")
pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
Expand Down
60 changes: 38 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL

/**
* A special jar name that indicates the class being run is inside of Spark itself,
* and therefore no user jar is needed.
* Special primary resource names that represent shells rather than application jars.
*/
private val RESERVED_JAR_NAME = "spark-internal"
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
Expand All @@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
Expand Down Expand Up @@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs ++= ArrayBuffer("--die-on-broken-pipe", "0")
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = Utils.mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("")
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
args.primaryResource = RESERVED_JAR_NAME
args.files = Utils.mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
Expand Down Expand Up @@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
Expand Down Expand Up @@ -293,8 +299,8 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists) {
printWarning(s"Jar $localJar does not exist, skipping.")
}

Expand All @@ -303,14 +309,24 @@ object SparkSubmit {
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
}

/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

/**
* Return whether the given primary resource requires running python.
*/
private[spark] def mergeFileLists(lists: String*): String = {
val merged = lists.filter(_ != null)
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
isPython = v.endsWith(".py")
isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,4 +1166,16 @@ private[spark] object Utils extends Logging {
true
}
}

/**
* Merge a sequence of comma-separated file lists into a single comma-separated string.
* The provided strings may be null or empty to indicate no files.
*/
def mergeFileLists(lists: String*): String = {
lists
.filter(_ != null)
.filter(_ != "")
.flatMap(_.split(","))
.mkString(",")
}
}
10 changes: 7 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
if submit_args is not None:
submit_args = submit_args.split(" ")
else:
submit_args = []
command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)

print("""Welcome to
____ __
Expand Down

0 comments on commit a371d26

Please sign in to comment.