Skip to content

Commit

Permalink
[SPARK-13207][SQL][BRANCH-1.6] Make partitioning discovery ignore _SU…
Browse files Browse the repository at this point in the history
…CCESS files.

If a _SUCCESS appears in the inner partitioning dir, partition discovery will treat that _SUCCESS file as a data file. Then, partition discovery will fail because it finds that the dir structure is not valid. We should ignore those `_SUCCESS` files.

In future, it is better to ignore all files/dirs starting with `_` or `.`. This PR does not make this change. I am thinking about making this change simple, so we can consider of getting it in branch 1.6.

To ignore all files/dirs starting with `_` or `, the main change is to let ParquetRelation have another way to get metadata files. Right now, it relies on FileStatusCache's cachedLeafStatuses, which returns file statuses of both metadata files (e.g. metadata files used by parquet) and data files, which requires more changes.

https://issues.apache.org/jira/browse/SPARK-13207

Author: Yin Huai <yhuai@databricks.com>

Closes #11697 from yhuai/SPARK13207_branch16.
  • Loading branch information
yhuai authored and rxin committed Mar 15, 2016
1 parent 589d042 commit 6935b50
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ abstract class HadoopFsRelation private[sql](
}
}.filterNot { status =>
val name = status.getPath.getName
name.toLowerCase == "_temporary" || name.startsWith(".")
HadoopFsRelation.shouldFilterOut(name)
}

val (dirs, files) = statuses.partition(_.isDir)
Expand Down Expand Up @@ -843,6 +843,16 @@ abstract class HadoopFsRelation private[sql](
}

private[sql] object HadoopFsRelation extends Logging {

/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
// TODO: We should try to filter out all files/dirs starting with "." or "_".
// The only reason that we are not doing it now is that Parquet needs to find those
// metadata files from leaf files returned by this methods. We should refactor
// this logic to not mix metadata files with data files.
pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
}

// We don't filter files/directories whose name start with "_" except "_temporary" here, as
// specific data sources may take advantages over them (e.g. Parquet _metadata and
// _common_metadata files). "_temporary" directories are explicitly ignored since failed
Expand All @@ -851,19 +861,21 @@ private[sql] object HadoopFsRelation extends Logging {
def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
logInfo(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (name == "_temporary" || name.startsWith(".")) {
if (shouldFilterOut(name)) {
Array.empty
} else {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
val statuses =
if (pathFilter != null) {
val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,29 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}

test("_SUCCESS should not break partitioning discovery") {
Seq(1, 32).foreach { threshold =>
// We have two paths to list files, one at driver side, another one that we use
// a Spark job. We need to test both ways.
withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> threshold.toString) {
withTempPath { dir =>
val tablePath = new File(dir, "table")
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")

df.write
.format("parquet")
.partitionBy("b", "c", "d")
.save(tablePath.getCanonicalPath)

Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1", "_SUCCESS"))
Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1", "_SUCCESS"))
Files.touch(new File(s"${tablePath.getCanonicalPath}/b=1/c=1/d=1", "_SUCCESS"))
checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
}
}
}
}

test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
Expand Down

0 comments on commit 6935b50

Please sign in to comment.