Skip to content

Commit

Permalink
Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"
Browse files Browse the repository at this point in the history
This reverts commit 8c21170.
  • Loading branch information
gatorsmile committed Feb 14, 2018
1 parent 140f875 commit 400a1d9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,29 +444,16 @@ case class FileSourceScanExec(
currentSize = 0
}

def addFile(file: PartitionedFile): Unit = {
currentFiles += file
currentSize += file.length + openCostInBytes
}

var frontIndex = 0
var backIndex = splitFiles.length - 1

while (frontIndex <= backIndex) {
addFile(splitFiles(frontIndex))
frontIndex += 1
while (frontIndex <= backIndex &&
currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
addFile(splitFiles(frontIndex))
frontIndex += 1
}
while (backIndex > frontIndex &&
currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
addFile(splitFiles(backIndex))
backIndex -= 1
// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
closePartition()
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
assert(partitions.size == 3, "when checking partitions")
assert(partitions(0).files.size == 2, "when checking partition 1")
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
assert(partitions(3).files.size == 1, "when checking partition 4")

// First partition reads (file1, file6)
// First partition reads (file1)
assert(partitions(0).files(0).start == 0)
assert(partitions(0).files(0).length == 2)
assert(partitions(0).files(1).start == 0)
assert(partitions(0).files(1).length == 1)

// Second partition reads (file2, file3)
assert(partitions(1).files(0).start == 0)
Expand All @@ -164,6 +163,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
assert(partitions(2).files(0).length == 1)
assert(partitions(2).files(1).start == 0)
assert(partitions(2).files(1).length == 1)

// Final partition reads (file6)
assert(partitions(3).files(0).start == 0)
assert(partitions(3).files(0).length == 1)
}

checkPartitionSchema(StructType(Nil))
Expand Down

0 comments on commit 400a1d9

Please sign in to comment.