Skip to content

Commit

Permalink
Fix precedence of library paths, classpath, java opts and memory
Browse files Browse the repository at this point in the history
This was previously broken because of the way we pass command line
arguments. As of this commit, the ordering becomes:

SPARK_SUBMIT_DRIVER_MEMORY > spark.driver.memory > SPARK_DRIVER_MEMORY
SPARK_SUBMIT_CLASSPATH > spark.driver.extraClassPath
SPARK_SUBMIT_LIBRARY_PATH > spark.driver.extraLibraryPath
SPARK_SUBMIT_JAVA_OPTS > spark.driver.extraJavaOpts

We achieve this by passing existing environment variables to
SparkClassLauncher directly.
  • Loading branch information
andrewor14 committed Aug 19, 2014
1 parent 158f813 commit a91ea19
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 97 deletions.
52 changes: 28 additions & 24 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# limitations under the License.
#

# NOTE: Any changes to this file must be reflected in SparkClassLauncher.scala!

cygwin=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Expand Down Expand Up @@ -73,13 +75,16 @@ case "$1" in
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
;;

# Spark submit uses SPARK_SUBMIT_OPTS and SPARK_JAVA_OPTS
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_OPTS"
# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_JAVA_OPTS + SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
'org.apache.spark.deploy.SparkSubmit')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_SUBMIT_JAVA_OPTS"
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
if [ -n "$SPARK_SUBMIT_LIBRARY_PATH" ]; then
OUR_JAVA_OPTS="$OUR_JAVA_OPTS -Djava.library.path=$SPARK_SUBMIT_LIBRARY_PATH"
fi
OUR_JAVA_MEM=${SPARK_DRIVER_MEMORY:-$DEFAULT_MEM}
if [ -n "$SPARK_SUBMIT_DRIVER_MEMORY" ]; then
OUR_JAVA_MEM="$SPARK_SUBMIT_DRIVER_MEMORY"
fi
;;

*)
Expand All @@ -102,7 +107,6 @@ fi

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"

# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
Expand Down Expand Up @@ -149,27 +153,27 @@ if $cygwin; then
fi
export CLASSPATH

if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
fi

# In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
# Here we must parse the properties file for relevant "spark.driver.*" configs for launching
# the driver JVM itself.

if [ -n "$SPARK_SUBMIT_CLIENT_MODE" ]; then
# This is currently used only if the properties file actually contains these special configs
exec "$RUNNER" org.apache.spark.deploy.SparkClassLauncher \
"$PROPERTIES_FILE" \
"$RUNNER" \
"$CLASSPATH" \
"$SPARK_SUBMIT_LIBRARY_PATH" \
"$JAVA_OPTS" \
"$OUR_JAVA_MEM" \
"$@"
# Here we must parse the properties file for relevant "spark.driver.*" configs before launching
# the driver JVM itself. Instead of handling this complexity in BASH, we launch a separate JVM
# to prepare the launch environment of this driver JVM.

if [ -n "$SPARK_SUBMIT_BOOTSTRAP_DRIVER" ]; then
# This is used only if the properties file actually contains these special configs
# Export the environment variables needed by SparkClassLauncher
export RUNNER
export CLASSPATH
export JAVA_OPTS
export OUR_JAVA_MEM
shift
exec "$RUNNER" org.apache.spark.deploy.SparkClassLauncher "$@"
else
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"
if [ -n "$SPARK_PRINT_LAUNCH_COMMAND" ]; then
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
fi
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
fi

26 changes: 12 additions & 14 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,45 @@
# limitations under the License.
#

# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
DEPLOY_MODE=$2
elif [ "$1" = "--driver-memory" ]; then
DRIVER_MEMORY=$2
SPARK_SUBMIT_DEPLOY_MODE=$2
elif [ "$1" = "--properties-file" ]; then
PROPERTIES_FILE=$2
SPARK_SUBMIT_PROPERTIES_FILE=$2
elif [ "$1" = "--driver-memory" ]; then
export SPARK_SUBMIT_DRIVER_MEMORY=$2
elif [ "$1" = "--driver-library-path" ]; then
export SPARK_SUBMIT_LIBRARY_PATH=$2
elif [ "$1" = "--driver-class-path" ]; then
export SPARK_SUBMIT_CLASSPATH=$2
elif [ "$1" = "--driver-java-options" ]; then
export SPARK_SUBMIT_OPTS=$2
export SPARK_SUBMIT_JAVA_OPTS=$2
fi
shift
done

DEPLOY_MODE=${DEPLOY_MODE:-"client"}
DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
PROPERTIES_FILE=${PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}
export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"}
export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"}

# For client mode, the driver will be launched in the same JVM that launches
# SparkSubmit, so we may need to read the properties file for any extra class
# paths, library paths, java options and memory early on. Otherwise, it will
# be too late by the time the JVM has started.

if [ "$DEPLOY_MODE" == "client" ]; then
if [ -n "$DRIVER_MEMORY" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi
if [ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" ]; then
# Parse the properties file only if the special configs exist
contains_special_configs=$(
grep -e "spark.driver.extra*\|spark.driver.memory" "$PROPERTIES_FILE" | \
grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \
grep -v "^[[:space:]]*#"
)
if [ -n "$contains_special_configs" ]; then
export PROPERTIES_FILE
export SPARK_SUBMIT_CLIENT_MODE=1
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
fi
fi

Expand Down
136 changes: 77 additions & 59 deletions core/src/main/scala/org/apache/spark/deploy/SparkClassLauncher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,76 +24,94 @@ import scala.collection.JavaConversions._
import org.apache.spark.util.{RedirectThread, Utils}

/**
* Wrapper of `bin/spark-class` that prepares the launch environment of the child JVM properly.
* Launch an application through Spark submit in client mode with the appropriate classpath,
* library paths, java options and memory. These properties of the JVM must be set before the
* driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
* of parsing the properties file for such relevant configs in BASH.
*
* Usage: org.apache.spark.deploy.SparkClassLauncher <application args>
*/
private[spark] object SparkClassLauncher {

// TODO: This is currently only used for running Spark submit in client mode.
// The goal moving forward is to use this class for all use cases of `bin/spark-class`.
// Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`.
// Any changes made there must be reflected in this file.

/**
* Launch a Spark class with the given class paths, library paths, java options and memory,
* taking into account special `spark.driver.*` properties needed to start the driver JVM.
*/
def main(args: Array[String]): Unit = {
if (args.size < 7) {
System.err.println(
"""
|Usage: org.apache.spark.deploy.SparkClassLauncher
|
| [properties file] - path to your Spark properties file
| [java runner] - command to launch the child JVM
| [java class paths] - class paths to pass to the child JVM
| [java library paths] - library paths to pass to the child JVM
| [java opts] - java options to pass to the child JVM
| [java memory] - memory used to launch the child JVM
| [main class] - main class to run in the child JVM
| <main args> - arguments passed to this main class
|
|Example:
| org.apache.spark.deploy.SparkClassLauncher.SparkClassLauncher
| conf/spark-defaults.conf java /classpath1:/classpath2 /librarypath1:/librarypath2
| "-XX:-UseParallelGC -Dsome=property" 5g org.apache.spark.deploy.SparkSubmit
| --master local --class org.apache.spark.examples.SparkPi 10
""".stripMargin)
System.exit(1)
}
val propertiesFile = args(0)
val javaRunner = args(1)
val clClassPaths = args(2)
val clLibraryPaths = args(3)
val clJavaOpts = Utils.splitCommandString(args(4))
val clJavaMemory = args(5)
val mainClass = args(6)
val submitArgs = args
val runner = sys.env("RUNNER")
val classpath = sys.env("CLASSPATH")
val javaOpts = sys.env("JAVA_OPTS")
val defaultDriverMemory = sys.env("OUR_JAVA_MEM")

// Spark submit specific environment variables
val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE")
val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE")
val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER")
val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY")
val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH")
val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH")
val submitJavaOpts = sys.env.get("SPARK_SUBMIT_JAVA_OPTS")

assume(runner != null, "RUNNER must be set")
assume(classpath != null, "CLASSPATH must be set")
assume(javaOpts != null, "JAVA_OPTS must be set")
assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set")
assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!")
assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set")
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set!")

// In client deploy mode, parse the properties file for certain `spark.driver.*` configs.
// These configs encode java options, class paths, and library paths needed to launch the JVM.
// Parse the properties file for the equivalent spark.driver.* configs
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
val confDriverMemory = properties.get("spark.driver.memory")
val confClassPaths = properties.get("spark.driver.extraClassPath")
val confLibraryPaths = properties.get("spark.driver.extraLibraryPath")
val confJavaOpts = properties.get("spark.driver.extraJavaOptions")
val confDriverMemory = properties.get("spark.driver.memory").getOrElse(defaultDriverMemory)
val confLibraryPath = properties.get("spark.driver.extraLibraryPath").getOrElse("")
val confClasspath = properties.get("spark.driver.extraClassPath").getOrElse("")
val confJavaOpts = properties.get("spark.driver.extraJavaOptions").getOrElse("")

// Merge relevant command line values with the config equivalents, if any
val javaMemory = confDriverMemory.getOrElse(clJavaMemory)
val pathSeparator = sys.props("path.separator")
val classPaths = clClassPaths + confClassPaths.map(pathSeparator + _).getOrElse("")
val libraryPaths = clLibraryPaths + confLibraryPaths.map(pathSeparator + _).getOrElse("")
val javaOpts = clJavaOpts ++ confJavaOpts.map(Utils.splitCommandString).getOrElse(Seq.empty)
val filteredJavaOpts = javaOpts.distinct.filterNot { opt =>
opt.startsWith("-Djava.library.path") || opt.startsWith("-Xms") || opt.startsWith("-Xmx")
}
// Favor Spark submit arguments over the equivalent configs in the properties file.
// Note that we do not actually use the Spark submit values for library path, classpath,
// and java opts here, because we have already captured them in BASH.
val newDriverMemory = submitDriverMemory.getOrElse(confDriverMemory)
val newLibraryPath =
if (submitLibraryPath.isDefined) {
// SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
""
} else {
"-Djava.library.path=" + confLibraryPath
}
val newClasspath =
if (submitClasspath.isDefined) {
// SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
classpath
} else {
classpath + sys.props("path.separator") + confClasspath
}
val newJavaOpts =
if (submitJavaOpts.isDefined) {
// SPARK_SUBMIT_JAVA_OPTS is already captured in JAVA_OPTS
javaOpts
} else {
javaOpts + " " + confJavaOpts
}

// Build up command
val command: Seq[String] =
Seq(javaRunner) ++
{ if (classPaths.nonEmpty) Seq("-cp", classPaths) else Seq.empty } ++
{ if (libraryPaths.nonEmpty) Seq(s"-Djava.library.path=$libraryPaths") else Seq.empty } ++
filteredJavaOpts ++
Seq(s"-Xms$javaMemory", s"-Xmx$javaMemory") ++
Seq(mainClass) ++
args.slice(7, args.size)
val builder = new ProcessBuilder(command)
Seq(runner) ++
Seq("-cp", newClasspath) ++
Seq(newLibraryPath) ++
Utils.splitCommandString(newJavaOpts) ++
Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++
Seq("org.apache.spark.deploy.SparkSubmit") ++
submitArgs

// Print the launch command. This follows closely the format used in `bin/spark-class`.
if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) {
System.err.print("Spark Command: ")
System.err.println(command.mkString(" "))
System.err.println("========================================\n")
}

val filteredCommand = command.filter(_.nonEmpty)
val builder = new ProcessBuilder(filteredCommand)
val process = builder.start()
new RedirectThread(System.in, process.getOutputStream, "redirect stdin").start()
new RedirectThread(process.getInputStream, System.out, "redirect stdout").start()
Expand Down

0 comments on commit a91ea19

Please sign in to comment.