Skip to content

Commit

Permalink
Fix parquet split calculation to avoid O(file*block) lookups (apache-…
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoody authored and Robert Kruszewski committed Jun 24, 2018
1 parent 95646b5 commit 93e4fbb
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import org.apache.spark.util.ThreadUtils
abstract class ParquetFileSplitter {
def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit])

def singleFileSplit(stat: FileStatus): Seq[FileSplit] = {
Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty))
def singleFileSplit(path: Path, length: Long): Seq[FileSplit] = {
Seq(new FileSplit(path, 0, length, Array.empty))
}
}

object ParquetDefaultFileSplitter extends ParquetFileSplitter {
override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = {
stat => singleFileSplit(stat)
stat => singleFileSplit(stat.getPath, stat.getLen)
}
}

Expand Down Expand Up @@ -84,18 +84,20 @@ class ParquetMetadataFileSplitter(
(applied, unapplied, filteredBlocks)
}

// Group eligible splits by file Path.
val eligible = applyParquetFilter(unapplied, filteredBlocks).map { bmd =>
val blockPath = new Path(root, bmd.getPath)
new FileSplit(blockPath, bmd.getStartingPos, bmd.getCompressedSize, Array.empty)
}
}.groupBy(_.getPath)

val statFilter: (FileStatus => Seq[FileSplit]) = { stat =>
if (referencedFiles.contains(stat.getPath)) {
eligible.filter(_.getPath == stat.getPath)
val filePath = stat.getPath
if (referencedFiles.contains(filePath)) {
eligible.getOrElse(filePath, Nil)
} else {
log.warn(s"Found _metadata file for $root," +
s" but no entries for blocks in ${stat.getPath}. Retaining whole file.")
singleFileSplit(stat)
s" but no entries for blocks in ${filePath}. Retaining whole file.")
singleFileSplit(filePath, stat.getLen)
}
}
statFilter
Expand Down

0 comments on commit 93e4fbb

Please sign in to comment.