Skip to content

Commit

Permalink
[SPARK-6568] spark-shell.cmd --jars option does not accept the jar th…
Browse files Browse the repository at this point in the history
…at has space in its path

escape spaces in the arguments.

Author: Masayoshi TSUZUKI <tsudukim@oss.nttdata.co.jp>
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>

Closes apache#5447 from tsudukim/feature/SPARK-6568-2 and squashes the following commits:

3f9a188 [Masayoshi TSUZUKI] modified some errors.
ed46047 [Masayoshi TSUZUKI] avoid scalastyle errors.
1784239 [Masayoshi TSUZUKI] removed Utils.formatPath.
e03f289 [Masayoshi TSUZUKI] removed testWindows from Utils.resolveURI and Utils.resolveURIs. replaced SystemUtils.IS_OS_WINDOWS to Utils.isWindows. removed Utils.formatPath from PythonRunner.scala.
84c33d0 [Masayoshi TSUZUKI] - use resolveURI in nonLocalPaths - run tests for Windows path only on Windows
016128d [Masayoshi TSUZUKI] fixed to use File.toURI()
2c62e3b [Masayoshi TSUZUKI] Merge pull request #1 from sarutak/SPARK-6568-2
7019a8a [Masayoshi TSUZUKI] Merge branch 'master' of https://github.com/apache/spark into feature/SPARK-6568-2
45946ee [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6568-2
10f1c73 [Kousuke Saruta] Added a comment
93c3c40 [Kousuke Saruta] Merge branch 'classpath-handling-fix' of github.com:sarutak/spark into SPARK-6568-2
649da82 [Kousuke Saruta] Fix classpath handling
c7ba6a7 [Masayoshi TSUZUKI] [SPARK-6568] spark-shell.cmd --jars option does not accept the jar that has space in its path
  • Loading branch information
tsudukim authored and srowen committed May 13, 2015
1 parent 98195c3 commit 50c7270
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 80 deletions.
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.deploy

import java.net.URI
import java.io.File

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try

import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
Expand Down Expand Up @@ -81,16 +83,13 @@ object PythonRunner {
throw new IllegalArgumentException("Launching Python applications through " +
s"spark-submit is currently only supported for local files: $path")
}
val windows = Utils.isWindows || testWindows
var formattedPath = if (windows) Utils.formatWindowsPath(path) else path

// Strip the URI scheme from the path
formattedPath =
new URI(formattedPath).getScheme match {
case null => formattedPath
case Utils.windowsDrive(d) if windows => formattedPath
case _ => new URI(formattedPath).getPath
}
// get path when scheme is file.
val uri = Try(new URI(path)).getOrElse(new File(path).toURI)
var formattedPath = uri.getScheme match {
case null => path
case "file" | "local" => uri.getPath
case _ => null
}

// Guard against malformed paths potentially throwing NPE
if (formattedPath == null) {
Expand All @@ -99,7 +98,9 @@ object PythonRunner {

// In Windows, the drive should not be prefixed with "/"
// For instance, python does not understand "/C:/path/to/sheep.py"
formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
if (Utils.isWindows && formattedPath.matches("/[a-zA-Z]:/.*")) {
formattedPath = formattedPath.stripPrefix("/")
}
formattedPath
}

Expand Down
43 changes: 12 additions & 31 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1704,11 +1704,6 @@ private[spark] object Utils extends Logging {
*/
val windowsDrive = "([a-zA-Z])".r

/**
* Format a Windows path such that it can be safely passed to a URI.
*/
def formatWindowsPath(path: String): String = path.replace("\\", "/")

/**
* Indicates whether Spark is currently running unit tests.
*/
Expand Down Expand Up @@ -1806,37 +1801,24 @@ private[spark] object Utils extends Logging {
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String, testWindows: Boolean = false): URI = {

// In Windows, the file separator is a backslash, but this is inconsistent with the URI format
val windows = isWindows || testWindows
val formattedPath = if (windows) formatWindowsPath(path) else path

val uri = new URI(formattedPath)
if (uri.getPath == null) {
throw new IllegalArgumentException(s"Given path is malformed: $uri")
}

Option(uri.getScheme) match {
case Some(windowsDrive(d)) if windows =>
new URI("file:/" + uri.toString.stripPrefix("/"))
case None =>
// Preserve fragments for HDFS file name substitution (denoted by "#")
// For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
val fragment = uri.getFragment
val part = new File(uri.getPath).toURI
new URI(part.getScheme, part.getPath, fragment)
case Some(other) =>
uri
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
if (uri.getScheme() != null) {
return uri
}
} catch {
case e: URISyntaxException =>
}
new File(path).getAbsoluteFile().toURI()
}

/** Resolve a comma-separated list of paths. */
def resolveURIs(paths: String, testWindows: Boolean = false): String = {
def resolveURIs(paths: String): String = {
if (paths == null || paths.trim.isEmpty) {
""
} else {
paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
paths.split(",").map { p => Utils.resolveURI(p) }.mkString(",")
}
}

Expand All @@ -1847,8 +1829,7 @@ private[spark] object Utils extends Logging {
Array.empty
} else {
paths.split(",").filter { p =>
val formattedPath = if (windows) formatWindowsPath(p) else p
val uri = new URI(formattedPath)
val uri = resolveURI(p)
Option(uri.getScheme).getOrElse("file") match {
case windowsDrive(d) if windows => false
case "local" | "file" => false
Expand Down
31 changes: 19 additions & 12 deletions core/src/test/scala/org/apache/spark/deploy/PythonRunnerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.deploy

import org.scalatest.FunSuite

import org.apache.spark.util.Utils

class PythonRunnerSuite extends FunSuite {

// Test formatting a single path to be added to the PYTHONPATH
Expand All @@ -28,10 +30,14 @@ class PythonRunnerSuite extends FunSuite {
assert(PythonRunner.formatPath("file:///spark.py") === "/spark.py")
assert(PythonRunner.formatPath("local:/spark.py") === "/spark.py")
assert(PythonRunner.formatPath("local:///spark.py") === "/spark.py")
assert(PythonRunner.formatPath("C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
assert(PythonRunner.formatPath("/C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
"C:/a/b/spark.py")
if (Utils.isWindows) {
assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
"C:/a/b/spark.py")
assert(PythonRunner.formatPath("C:\\a\\b\\spark.py", testWindows = true) ===
"C:/a/b/spark.py")
assert(PythonRunner.formatPath("C:\\a b\\spark.py", testWindows = true) ===
"C:/a b/spark.py")
}
intercept[IllegalArgumentException] { PythonRunner.formatPath("one:two") }
intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:s3:xtremeFS") }
intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:/path/to/some.py") }
Expand All @@ -45,14 +51,15 @@ class PythonRunnerSuite extends FunSuite {
Array("/app.py", "/spark.py"))
assert(PythonRunner.formatPaths("me.py,file:/you.py,local:/we.py") ===
Array("me.py", "/you.py", "/we.py"))
assert(PythonRunner.formatPaths("C:/a/b/spark.py", testWindows = true) ===
Array("C:/a/b/spark.py"))
assert(PythonRunner.formatPaths("/C:/a/b/spark.py", testWindows = true) ===
Array("C:/a/b/spark.py"))
assert(PythonRunner.formatPaths("C:/free.py,pie.py", testWindows = true) ===
Array("C:/free.py", "pie.py"))
assert(PythonRunner.formatPaths("lovely.py,C:/free.py,file:/d:/fry.py", testWindows = true) ===
Array("lovely.py", "C:/free.py", "d:/fry.py"))
if (Utils.isWindows) {
assert(PythonRunner.formatPaths("C:\\a\\b\\spark.py", testWindows = true) ===
Array("C:/a/b/spark.py"))
assert(PythonRunner.formatPaths("C:\\free.py,pie.py", testWindows = true) ===
Array("C:/free.py", "pie.py"))
assert(PythonRunner.formatPaths("lovely.py,C:\\free.py,file:/d:/fry.py",
testWindows = true) ===
Array("lovely.py", "C:/free.py", "d:/fry.py"))
}
intercept[IllegalArgumentException] { PythonRunner.formatPaths("one:two,three") }
intercept[IllegalArgumentException] { PythonRunner.formatPaths("two,three,four:five:six") }
intercept[IllegalArgumentException] { PythonRunner.formatPaths("hdfs:/some.py,foo.py") }
Expand Down
67 changes: 43 additions & 24 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,51 +367,58 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
}

test("resolveURI") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
def assertResolves(before: String, after: String): Unit = {
// This should test only single paths
assume(before.split(",").length === 1)
// Repeated invocations of resolveURI should yield the same result
def resolve(uri: String): String = Utils.resolveURI(uri, testWindows).toString
def resolve(uri: String): String = Utils.resolveURI(uri).toString
assert(resolve(after) === after)
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
// Also test resolveURIs with single paths
assert(new URI(Utils.resolveURIs(before, testWindows)) === new URI(after))
assert(new URI(Utils.resolveURIs(after, testWindows)) === new URI(after))
assert(new URI(Utils.resolveURIs(before)) === new URI(after))
assert(new URI(Utils.resolveURIs(after)) === new URI(after))
}
val cwd = System.getProperty("user.dir")
val rawCwd = System.getProperty("user.dir")
val cwd = if (Utils.isWindows) s"/$rawCwd".replace("\\", "/") else rawCwd
assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar")
assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:/root/spark.jar#app.jar")
assertResolves("spark.jar", s"file:$cwd/spark.jar")
assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar#app.jar")
assertResolves("C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt", testWindows = true)
assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
assertResolves("file:///C:/path/to/file.txt", "file:/C:/path/to/file.txt", testWindows = true)
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt", testWindows = true)
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo") }
intercept[IllegalArgumentException] { Utils.resolveURI("file:foo:baby") }
assertResolves("spark.jar#app.jar", s"file:$cwd/spark.jar%23app.jar")
assertResolves("path to/file.txt", s"file:$cwd/path%20to/file.txt")
if (Utils.isWindows) {
assertResolves("C:\\path\\to\\file.txt", "file:/C:/path/to/file.txt")
assertResolves("C:\\path to\\file.txt", "file:/C:/path%20to/file.txt")
}
assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt")
assertResolves("file:///C:/path/to/file.txt", "file:/C:/path/to/file.txt")
assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt")
assertResolves("file:foo", s"file:foo")
assertResolves("file:foo:baby", s"file:foo:baby")
}

test("resolveURIs with multiple paths") {
def assertResolves(before: String, after: String, testWindows: Boolean = false): Unit = {
def assertResolves(before: String, after: String): Unit = {
assume(before.split(",").length > 1)
assert(Utils.resolveURIs(before, testWindows) === after)
assert(Utils.resolveURIs(after, testWindows) === after)
assert(Utils.resolveURIs(before) === after)
assert(Utils.resolveURIs(after) === after)
// Repeated invocations of resolveURIs should yield the same result
def resolve(uri: String): String = Utils.resolveURIs(uri, testWindows)
def resolve(uri: String): String = Utils.resolveURIs(uri)
assert(resolve(after) === after)
assert(resolve(resolve(after)) === after)
assert(resolve(resolve(resolve(after))) === after)
}
val cwd = System.getProperty("user.dir")
val rawCwd = System.getProperty("user.dir")
val cwd = if (Utils.isWindows) s"/$rawCwd".replace("\\", "/") else rawCwd
assertResolves("jar1,jar2", s"file:$cwd/jar1,file:$cwd/jar2")
assertResolves("file:/jar1,file:/jar2", "file:/jar1,file:/jar2")
assertResolves("hdfs:/jar1,file:/jar2,jar3", s"hdfs:/jar1,file:/jar2,file:$cwd/jar3")
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5")
assertResolves("hdfs:/jar1,file:/jar2,jar3,C:\\pi.py#py.pi",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi", testWindows = true)
assertResolves("hdfs:/jar1,file:/jar2,jar3,jar4#jar5,path to/jar6",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4%23jar5,file:$cwd/path%20to/jar6")
if (Utils.isWindows) {
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
}
}

test("nonLocalPaths") {
Expand All @@ -426,6 +433,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
assert(Utils.nonLocalPaths("local:/spark.jar,file:/smart.jar,family.py") === Array.empty)
assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar") ===
Array("hdfs:/spark.jar", "s3:/smart.jar"))
assert(Utils.nonLocalPaths("hdfs:/spark.jar,path to/a.jar,s3:/smart.jar") ===
Array("hdfs:/spark.jar", "s3:/smart.jar"))
assert(Utils.nonLocalPaths("hdfs:/spark.jar,s3:/smart.jar,local.py,file:/hello/pi.py") ===
Array("hdfs:/spark.jar", "s3:/smart.jar"))
assert(Utils.nonLocalPaths("local.py,hdfs:/spark.jar,file:/hello/pi.py,s3:/smart.jar") ===
Expand Down Expand Up @@ -547,7 +556,12 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
val targetDir = new File(tempDir, "target-dir")
Files.write("some text", sourceFile, UTF_8)

val path = new Path("file://" + sourceDir.getAbsolutePath)
val path =
if (Utils.isWindows) {
new Path("file:/" + sourceDir.getAbsolutePath.replace("\\", "/"))
} else {
new Path("file://" + sourceDir.getAbsolutePath)
}
val conf = new Configuration()
val fs = Utils.getHadoopFileSystem(path.toString, conf)

Expand All @@ -567,7 +581,12 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging {
val destInnerFile = new File(destInnerDir, sourceFile.getName)
assert(destInnerFile.isFile())

val filePath = new Path("file://" + sourceFile.getAbsolutePath)
val filePath =
if (Utils.isWindows) {
new Path("file:/" + sourceFile.getAbsolutePath.replace("\\", "/"))
} else {
new Path("file://" + sourceFile.getAbsolutePath)
}
val testFileDir = new File(tempDir, "test-filename")
val testFileName = "testFName"
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class SparkILoop(
// e.g. file:/C:/my/path.jar -> C:/my/path.jar
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") }
} else {
SparkILoop.getAddedJars
// We need new URI(jar).getPath here for the case that `jar` includes encoded white space (%20).
SparkILoop.getAddedJars.map { jar => new URI(jar).getPath }
}
// work around for Scala bug
val totalClassPath = addedJars.foldLeft(
Expand Down Expand Up @@ -1109,7 +1110,7 @@ object SparkILoop extends Logging {
if (settings.classpath.isDefault)
settings.classpath.value = sys.props("java.class.path")

getAddedJars.foreach(settings.classpath.append(_))
getAddedJars.map(jar => new URI(jar).getPath).foreach(settings.classpath.append(_))

repl process settings
}
Expand Down

0 comments on commit 50c7270

Please sign in to comment.