Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18775][SQL] Limit the max number of records written per file #16204

Closed
wants to merge 7 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Dec 8, 2016

What changes were proposed in this pull request?

Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.

This patch introduces a new write config option maxRecordsPerFile (default to a session-wide setting spark.sql.files.maxRecordsPerFile) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).

How was this patch tested?

Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.

@@ -821,8 +831,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString

def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i moved this closer to the controls for files.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69837 has finished for PR 16204 at commit fca401f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@ssimeonov ssimeonov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a much-loved feature. :)

@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging {
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol) extends ExecuteWriteTask {

private[this] var outputWriter: OutputWriter = {
private[this] var currentWriter: OutputWriter = _
Copy link
Contributor

@ssimeonov ssimeonov Dec 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through the code, three things stand out:

  1. There is code duplication between SingleDirectoryWriteTask and DynamicPartitionWriteTask when it comes to current writer management and cleanup.

  2. There is duplication within releaseResources() and newOutputWriter() of the write tasks when it comes to releasing resources.

  3. Write task state management is leaky because releaseResources() is called explicitly by executeTask(). Also, releaseResources() will be called twice when there are no exceptions and once if there is an exception in execute(), which is a bit confusing.

What about asking the base trait to do a bit more work and present a stronger contract to its users, e.g.:

  private trait ExecuteWriteTask {

    protected[this] var currentWriter: OutputWriter = null

    def execute(iterator: Iterator[InternalRow]): Set[String] = {
      try {
        executeImp(iterator)
      } finally {
        releaseResources()
      }
    }

    /**
     * Writes data out to files, and then returns the list of partition strings written out.
     * The list of partitions is sent back to the driver and used to update the catalog.
     */
    protected def executeImp(iterator: Iterator[InternalRow]): Set[String]

    protected def resetCurrentWriter(): Unit = {
      if (currentWriter != null) {
        currentWriter.close()
        currentWriter = null
      }
    }

    protected def releaseResources(): Unit = {
      resetCurrentWriter()
    }

  }

A simpler implementation would omit releaseResources() and simply call resetCurrentWriter() in finally. That is OK since all the classes are private but slightly less readable when it comes to unexpected future changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should just remove SingleDirectoryWriteTask. The few tests I tried still seem to pass with the dynamic implementation as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic one forces a sort, which is highly inefficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I guess you could still refactor out the record counting code to be shared, but I'm not sure it's worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I wanted to do that in the beginning but given there are only two implementations with almost no code in this part, it's just over abstraction.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69853 has started for PR 16204 at commit f77730f.

@rxin rxin changed the title [SPARK-18775][SQL] Limit the max number of records written per file - WIP [SPARK-18775][SQL] Limit the max number of records written per file Dec 8, 2016
@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69854 has started for PR 16204 at commit d2172d1.

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #69843 has finished for PR 16204 at commit 3199f8f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Dec 8, 2016

cc @ericl

@SparkQA
Copy link

SparkQA commented Dec 8, 2016

Test build #3477 has finished for PR 16204 at commit d2172d1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (partitionPath.nonEmpty) {
updatedPartitions.add(partitionPath)
}
} else if (description.maxRecordsPerFile > 0 &&
recordsInFile == description.maxRecordsPerFile) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

>= is probably a little more clear intent, here and in the other write task

@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging {
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol) extends ExecuteWriteTask {

private[this] var outputWriter: OutputWriter = {
private[this] var currentWriter: OutputWriter = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should just remove SingleDirectoryWriteTask. The few tests I tried still seem to pass with the dynamic implementation as well.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69899 has finished for PR 16204 at commit ceeacde.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #3484 has finished for PR 16204 at commit ceeacde.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Dec 10, 2016

LGTM

}

override def execute(iter: Iterator[InternalRow]): Set[String] = {
var fileCounter = 0
var recordsInFile = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long ?


// This must be in a form that matches our bucketing format. See BucketingUtils.
val ext = f"$bucketId.c$fileCounter%03d" +
description.outputWriterFactory.getFileExtension(taskAttemptContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an assumption here on number of files per output partition ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.

@@ -379,30 +405,40 @@ object FileFormatWriter extends Logging {
val sortedIterator = sorter.sortedIterator()

// If anything below fails, we should abort the task.
var recordsInFile = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long

@rxin
Copy link
Contributor Author

rxin commented Dec 14, 2016

cc @hvanhovell

@SparkQA
Copy link

SparkQA commented Dec 14, 2016

Test build #70145 has finished for PR 16204 at commit 5bf2b32.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor Author

rxin commented Dec 20, 2016

@hvanhovell don't forget this one!

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One tiny comment regarding documentation. Otherwise LGTM.

.booleanConf
.createWithDefault(false)

val MAX_RECORDS_PER_FILE = SQLConfigBuilder("spark.sql.files.maxRecordsPerFile")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also mention that there is a limit to the number of files produced? This might not be the best location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the limit realistically speaking is so high that i doubt it'd matter unless this value is set to 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay that is fair. Lets merge this.

@hvanhovell
Copy link
Contributor

LGTM - merging to master. Thanks!

@asfgit asfgit closed this in 354e936 Dec 21, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
Currently, Spark writes a single file out per task, sometimes leading to very large files. It would be great to have an option to limit the max number of records written per file in a task, to avoid humongous files.

This patch introduces a new write config option `maxRecordsPerFile` (default to a session-wide setting `spark.sql.files.maxRecordsPerFile`) that limits the max number of records written to a single file. A non-positive value indicates there is no limit (same behavior as not having this flag).

## How was this patch tested?
Added test cases in PartitionedWriteSuite for both dynamic partition insert and non-dynamic partition insert.

Author: Reynold Xin <rxin@databricks.com>

Closes apache#16204 from rxin/SPARK-18775.
vkorukanti pushed a commit to delta-io/delta that referenced this pull request Apr 8, 2022
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example,

```
spark.range(100)
          .write
          .format("parquet")
          .option("maxRecordsPerFile", 5)
          .save(path)
```

The above code will generate 20 parquet files and each one contains 5 rows.

This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat.

Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior.

Fixes #781

Closes #1017

Co-authored-by: Andrew Olson <aolson1@cerner.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example,

```
spark.range(100)
          .write
          .format("parquet")
          .option("maxRecordsPerFile", 5)
          .save(path)
```

The above code will generate 20 parquet files and each one contains 5 rows.

This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat.

Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior.

Fixes delta-io#781

Closes delta-io#1017

Co-authored-by: Andrew Olson <aolson1@cerner.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
jbguerraz pushed a commit to jbguerraz/delta that referenced this pull request Jul 6, 2022
Today, parquet supports the [maxRecordsPerFile](apache/spark#16204) option to limit the max number of records written per file so that users can control the parquet file size to avoid humongous files. For example,

```
spark.range(100)
          .write
          .format("parquet")
          .option("maxRecordsPerFile", 5)
          .save(path)
```

The above code will generate 20 parquet files and each one contains 5 rows.

This is missing in Delta. This PR adds the support for Delta by passing the `maxRecordsPerFile` option from Delta to ParquetFileFormat.

Note: today both Delta and parquet support the SQL conf `spark.sql.files.maxRecordsPerFile` to control the file size. This PR is just adding the `DataFrameWriter` option support to mimic the parquet format behavior.

Fixes delta-io#781

Closes delta-io#1017

Co-authored-by: Andrew Olson <aolson1@cerner.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>

GitOrigin-RevId: 02af2c40457fe0acc76a31687e4fd6c47f3f2944
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants