Skip to content

Commit

Permalink
[SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated…
Browse files Browse the repository at this point in the history
… list of files as input

See JIRA: https://issues.apache.org/jira/browse/SPARK-7155

SparkContext's newAPIHadoopFile() does not support comma-separated list of files. For example, the following:
```scala
sc.newAPIHadoopFile("/root/file1.txt,/root/file2.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
```
will throw
```
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/root/file1.txt,/root/file2.txt
```
However, the other API hadoopFile() is able to process comma-separated list of files correctly. In addition, since sc.textFile() uses hadoopFile(), it is also able to process comma-separated list of files correctly.

That means the behaviors of hadoopFile() and newAPIHadoopFile() are not aligned.

This pull request fix this issue and allows newAPIHadoopFile() to support comma-separated list of files as input.

A unit test has also been added in SparkContextSuite.scala. It creates two temporary text files as the input and tested against sc.textFile(), sc.hadoopFile(), and sc.newAPIHadoopFile().

Note: The contribution is my original work and that I license the work to the project under the project's open source license.

Author: yongtang <yongtang@users.noreply.github.com>

Closes apache#5708 from yongtang/SPARK-7155 and squashes the following commits:

654c80c [yongtang] [SPARK-7155] [CORE] Remove unneeded temp file deletion in unit test as parent dir is already temporary.
26faa6a [yongtang] [SPARK-7155] [CORE] Support comma-separated list of files as input for newAPIHadoopFile, wholeTextFiles, and binaryFiles. Use setInputPaths for consistency.
73e1f16 [yongtang] [SPARK-7155] [CORE] Allow newAPIHadoopFile to support comma-separated list of files as input.
  • Loading branch information
yongtang authored and srowen committed Apr 29, 2015
1 parent 7f4b583 commit 3fc6cfd
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, String)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
Expand Down Expand Up @@ -759,7 +761,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, PortableDataStream)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
Expand Down Expand Up @@ -935,7 +939,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
Expand Down
63 changes: 62 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import com.google.common.io.Files

import org.scalatest.FunSuite

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat}
import org.apache.spark.util.Utils

import scala.concurrent.Await
Expand Down Expand Up @@ -213,4 +215,63 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}

test("Comma separated paths for newAPIHadoopFile/wholeTextFiles/binaryFiles (SPARK-7155)") {
// Regression test for SPARK-7155
// dir1 and dir2 are used for wholeTextFiles and binaryFiles
val dir1 = Utils.createTempDir()
val dir2 = Utils.createTempDir()

val dirpath1=dir1.getAbsolutePath
val dirpath2=dir2.getAbsolutePath

// file1 and file2 are placed inside dir1, they are also used for
// textFile, hadoopFile, and newAPIHadoopFile
// file3, file4 and file5 are placed inside dir2, they are used for
// textFile, hadoopFile, and newAPIHadoopFile as well
val file1 = new File(dir1, "part-00000")
val file2 = new File(dir1, "part-00001")
val file3 = new File(dir2, "part-00000")
val file4 = new File(dir2, "part-00001")
val file5 = new File(dir2, "part-00002")

val filepath1=file1.getAbsolutePath
val filepath2=file2.getAbsolutePath
val filepath3=file3.getAbsolutePath
val filepath4=file4.getAbsolutePath
val filepath5=file5.getAbsolutePath


try {
// Create 5 text files.
Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8)
Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8)
Files.write("someline1 in file3", file3, UTF_8)
Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8)
Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8)

sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))

// Test textFile, hadoopFile, and newAPIHadoopFile for file1 and file2
assert(sc.textFile(filepath1 + "," + filepath2).count() == 5L)
assert(sc.hadoopFile(filepath1 + "," + filepath2,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
assert(sc.newAPIHadoopFile(filepath1 + "," + filepath2,
classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)

// Test textFile, hadoopFile, and newAPIHadoopFile for file3, file4, and file5
assert(sc.textFile(filepath3 + "," + filepath4 + "," + filepath5).count() == 5L)
assert(sc.hadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)
assert(sc.newAPIHadoopFile(filepath3 + "," + filepath4 + "," + filepath5,
classOf[NewTextInputFormat], classOf[LongWritable], classOf[Text]).count() == 5L)

// Test wholeTextFiles, and binaryFiles for dir1 and dir2
assert(sc.wholeTextFiles(dirpath1 + "," + dirpath2).count() == 5L)
assert(sc.binaryFiles(dirpath1 + "," + dirpath2).count() == 5L)

} finally {
sc.stop()
}
}
}

0 comments on commit 3fc6cfd

Please sign in to comment.