Skip to content

Commit

Permalink
Add support for Spark DataFrameWriter maxRecordsPerFile option
Browse files Browse the repository at this point in the history
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example,

```
spark.range(100)
          .write
          .format("parquet")
          .option("maxRecordsPerFile", 5)
          .save(path)
```

The above code will generate 20 parquet files and each one contains 5 rows.

This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat.

Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior.

Fixes #781

Closes #1017

Co-authored-by: Andrew Olson <aolson1@cerner.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
  • Loading branch information
noslowerdna authored and vkorukanti committed Apr 8, 2022
1 parent 952f25b commit 3fe6f7a
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,8 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

def writeFiles(
data: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
writeFiles(data, additionalConstraints)
writeFiles(data, None, additionalConstraints)
}

def writeFiles(
Expand All @@ -222,6 +221,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
*/
def writeFiles(
data: Dataset[_],
writeOptions: Option[DeltaOptions],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
if (DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(metadata)) {
throw DeltaErrors.cdcWriteNotAllowedInThisVersion()
Expand Down Expand Up @@ -295,6 +295,16 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
statsTrackers.append(basicWriteJobStatsTracker)
}

// Retain only a minimal selection of Spark writer options to avoid any potential
// compatibility issues
val options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys(key =>
key.equalsIgnoreCase("maxRecordsPerFile")
).toMap
}

try {
FileFormatWriter.write(
sparkSession = spark,
Expand All @@ -309,7 +319,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
partitionColumns = partitioningColumns,
bucketSpec = None,
statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers,
options = Map.empty)
options = options)
} catch {
case s: SparkException =>
// Pull an InvariantViolationException up to the top level if it was the root cause.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Action, FileAction}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.FileNames
import org.apache.commons.io.FileUtils

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

class DeltaOptionSuite extends QueryTest
with SharedSparkSession {
with SharedSparkSession with DeltaSQLCommandTest {

import testImplicits._

Expand Down Expand Up @@ -131,4 +133,48 @@ class DeltaOptionSuite extends QueryTest
}
}


test("support the maxRecordsPerFile write option: path") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
withTable("maxRecordsPerFile") {
spark.range(100)
.write
.format("delta")
.option("maxRecordsPerFile", 5)
.save(path)
assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20)
}
}
}

test("support the maxRecordsPerFile write option: external table") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
withTable("maxRecordsPerFile") {
spark.range(100)
.write
.format("delta")
.option("maxRecordsPerFile", 5)
.option("path", path)
.saveAsTable("maxRecordsPerFile")
assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20)
}
}
}

test("support the maxRecordsPerFile write option: v2 write") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
withTable("maxRecordsPerFile") {
spark.range(100)
.writeTo(s"maxRecordsPerFile")
.using("delta")
.option("maxRecordsPerFile", 5)
.tableProperty("location", path)
.create()
assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20)
}
}
}
}

0 comments on commit 3fe6f7a

Please sign in to comment.