-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18775][SQL] Limit the max number of records written per file #16204
Conversation
@@ -821,8 +831,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { | |||
|
|||
def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString | |||
|
|||
def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i moved this closer to the controls for files.
Test build #69837 has finished for PR 16204 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a much-loved feature. :)
@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging { | |||
taskAttemptContext: TaskAttemptContext, | |||
committer: FileCommitProtocol) extends ExecuteWriteTask { | |||
|
|||
private[this] var outputWriter: OutputWriter = { | |||
private[this] var currentWriter: OutputWriter = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking through the code, three things stand out:
-
There is code duplication between
SingleDirectoryWriteTask
andDynamicPartitionWriteTask
when it comes to current writer management and cleanup. -
There is duplication within
releaseResources()
andnewOutputWriter()
of the write tasks when it comes to releasing resources. -
Write task state management is leaky because
releaseResources()
is called explicitly byexecuteTask()
. Also,releaseResources()
will be called twice when there are no exceptions and once if there is an exception inexecute()
, which is a bit confusing.
What about asking the base trait to do a bit more work and present a stronger contract to its users, e.g.:
private trait ExecuteWriteTask {
protected[this] var currentWriter: OutputWriter = null
def execute(iterator: Iterator[InternalRow]): Set[String] = {
try {
executeImp(iterator)
} finally {
releaseResources()
}
}
/**
* Writes data out to files, and then returns the list of partition strings written out.
* The list of partitions is sent back to the driver and used to update the catalog.
*/
protected def executeImp(iterator: Iterator[InternalRow]): Set[String]
protected def resetCurrentWriter(): Unit = {
if (currentWriter != null) {
currentWriter.close()
currentWriter = null
}
}
protected def releaseResources(): Unit = {
resetCurrentWriter()
}
}
A simpler implementation would omit releaseResources()
and simply call resetCurrentWriter()
in finally
. That is OK since all the classes are private but slightly less readable when it comes to unexpected future changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should just remove SingleDirectoryWriteTask
. The few tests I tried still seem to pass with the dynamic implementation as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dynamic one forces a sort, which is highly inefficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. I guess you could still refactor out the record counting code to be shared, but I'm not sure it's worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I wanted to do that in the beginning but given there are only two implementations with almost no code in this part, it's just over abstraction.
Test build #69853 has started for PR 16204 at commit |
Test build #69854 has started for PR 16204 at commit |
Test build #69843 has finished for PR 16204 at commit
|
cc @ericl |
Test build #3477 has finished for PR 16204 at commit
|
if (partitionPath.nonEmpty) { | ||
updatedPartitions.add(partitionPath) | ||
} | ||
} else if (description.maxRecordsPerFile > 0 && | ||
recordsInFile == description.maxRecordsPerFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
>= is probably a little more clear intent, here and in the other write task
@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging { | |||
taskAttemptContext: TaskAttemptContext, | |||
committer: FileCommitProtocol) extends ExecuteWriteTask { | |||
|
|||
private[this] var outputWriter: OutputWriter = { | |||
private[this] var currentWriter: OutputWriter = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should just remove SingleDirectoryWriteTask
. The few tests I tried still seem to pass with the dynamic implementation as well.
Test build #69899 has finished for PR 16204 at commit
|
Test build #3484 has finished for PR 16204 at commit
|
LGTM |
} | ||
|
||
override def execute(iter: Iterator[InternalRow]): Set[String] = { | ||
var fileCounter = 0 | ||
var recordsInFile = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long ?
|
||
// This must be in a form that matches our bucketing format. See BucketingUtils. | ||
val ext = f"$bucketId.c$fileCounter%03d" + | ||
description.outputWriterFactory.getFileExtension(taskAttemptContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an assumption here on number of files per output partition ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No.
@@ -379,30 +405,40 @@ object FileFormatWriter extends Logging { | |||
val sortedIterator = sorter.sortedIterator() | |||
|
|||
// If anything below fails, we should abort the task. | |||
var recordsInFile = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long
cc @hvanhovell |
Test build #70145 has finished for PR 16204 at commit
|
@hvanhovell don't forget this one! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One tiny comment regarding documentation. Otherwise LGTM.
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val MAX_RECORDS_PER_FILE = SQLConfigBuilder("spark.sql.files.maxRecordsPerFile") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also mention that there is a limit to the number of files produced? This might not be the best location.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the limit realistically speaking is so high that i doubt it'd matter unless this value is set to 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay that is fair. Lets merge this.
LGTM - merging to master. Thanks! |
## What changes were proposed in this pull request? Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files. This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag). ## How was this patch tested? Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert. Author: Reynold Xin <rxin@databricks.com> Closes apache#16204 from rxin/SPARK-18775.
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example, ``` spark.range(100) .write .format("parquet") .option("maxRecordsPerFile", 5) .save(path) ``` The above code will generate 20 parquet files and each one contains 5 rows. This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat. Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior. Fixes #781 Closes #1017 Co-authored-by: Andrew Olson <aolson1@cerner.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com> GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example, ``` spark.range(100) .write .format("parquet") .option("maxRecordsPerFile", 5) .save(path) ``` The above code will generate 20 parquet files and each one contains 5 rows. This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat. Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior. Fixes delta-io#781 Closes delta-io#1017 Co-authored-by: Andrew Olson <aolson1@cerner.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com> GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example, ``` spark.range(100) .write .format("parquet") .option("maxRecordsPerFile", 5) .save(path) ``` The above code will generate 20 parquet files and each one contains 5 rows. This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat. Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior. Fixes delta-io#781 Closes delta-io#1017 Co-authored-by: Andrew Olson <aolson1@cerner.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com> GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.
This patch introduces a new write config option
maxRecordsPerFile
(default to a session-wide settingspark.sql.files.maxRecordsPerFile
) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.