diff --git a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala index db6586f09e3..e6c011d2abd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/files/TransactionalWrite.scala @@ -201,9 +201,8 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl def writeFiles( data: Dataset[_], - writeOptions: Option[DeltaOptions], additionalConstraints: Seq[Constraint]): Seq[FileAction] = { - writeFiles(data, additionalConstraints) + writeFiles(data, None, additionalConstraints) } def writeFiles( @@ -222,6 +221,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl */ def writeFiles( data: Dataset[_], + writeOptions: Option[DeltaOptions], additionalConstraints: Seq[Constraint]): Seq[FileAction] = { if (DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(metadata)) { throw DeltaErrors.cdcWriteNotAllowedInThisVersion() @@ -295,6 +295,16 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl statsTrackers.append(basicWriteJobStatsTracker) } + // Retain only a minimal selection of Spark writer options to avoid any potential + // compatibility issues + val options = writeOptions match { + case None => Map.empty[String, String] + case Some(writeOptions) => + writeOptions.options.filterKeys(key => + key.equalsIgnoreCase("maxRecordsPerFile") + ).toMap + } + try { FileFormatWriter.write( sparkSession = spark, @@ -309,7 +319,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl partitionColumns = partitioningColumns, bucketSpec = None, statsTrackers = optionalStatsTracker.toSeq ++ statsTrackers, - options = Map.empty) + options = options) } catch { case s: SparkException => // Pull an InvariantViolationException up to the top level if it was the root cause. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala index 19d5bbf6536..2e79e5cfa8e 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaOptionSuite.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, FileAction} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.FileNames +import org.apache.commons.io.FileUtils + import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils class DeltaOptionSuite extends QueryTest - with SharedSparkSession { + with SharedSparkSession with DeltaSQLCommandTest { import testImplicits._ @@ -131,4 +134,47 @@ class DeltaOptionSuite extends QueryTest } } + test("support the maxRecordsPerFile write option: path") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + withTable("maxRecordsPerFile") { + spark.range(100) + .write + .format("delta") + .option("maxRecordsPerFile", 5) + .save(path) + assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20) + } + } + } + + test("support the maxRecordsPerFile write option: external table") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + withTable("maxRecordsPerFile") { + spark.range(100) + .write + .format("delta") + .option("maxRecordsPerFile", 5) + .option("path", path) + .saveAsTable("maxRecordsPerFile") + assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20) + } + } + } + + test("support the maxRecordsPerFile write option: v2 write") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + withTable("maxRecordsPerFile") { + spark.range(100) + .writeTo(s"maxRecordsPerFile") + .using("delta") + .option("maxRecordsPerFile", 5) + .tableProperty("location", path) + .create() + assert(FileUtils.listFiles(tempDir, Array("parquet"), false).size === 20) + } + } + } }