Skip to content

Commit

Permalink
[SPARK-46862][SQL] Disable CSV column pruning in the multi-line mode
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In the PR, I propose to disable the column pruning feature in the CSV datasource for the `multiLine` mode.

### Why are the changes needed?
To workaround the issue in the `uniVocity` parser used by the CSV datasource: uniVocity/univocity-parsers#529

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *CSVv1Suite"
$ build/sbt "test:testOnly *CSVv2Suite"
$ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#44872 from MaxGekk/csv-disable-column-pruning.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Jan 26, 2024
1 parent f9f413e commit 829e742
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ class CSVOptions(
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
.getOrElse(UNESCAPED_QUOTE_HANDLING, "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))

/**
* The column pruning feature can be enabled either via the CSV option `columnPruning` or
* in non-multiline mode via initialization of CSV options by the SQL config:
* `spark.sql.csv.parser.columnPruning.enabled`.
* The feature is disabled in the `multiLine` mode because of the issue:
* https://github.com/uniVocity/univocity-parsers/issues/529
*/
val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && columnPruning)

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down Expand Up @@ -376,4 +385,5 @@ object CSVOptions extends DataSourceOptions {
val SEP = "sep"
val DELIMITER = "delimiter"
newOption(SEP, DELIMITER)
val COLUMN_PRUNING = newOption("columnPruning")
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class UnivocityParser(
// positions. Generally assigned by input configuration options, except when input column(s) have
// default values, in which case we omit the explicit indexes in order to know how many tokens
// were present in each line instead.
private def columnPruning: Boolean = options.columnPruning &&
private def columnPruning: Boolean = options.isColumnPruningEnabled &&
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))

// When column pruning is enabled, the parser only parses the required columns based on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class CSVPartitionReaderFactory(
actualReadDataSchema,
options,
filters)
val schema = if (options.columnPruning) actualReadDataSchema else actualDataSchema
val schema = if (options.isColumnPruningEnabled) actualReadDataSchema else actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,7 @@ abstract class CSVSuite
.option("header", true)
.option("enforceSchema", false)
.option("multiLine", multiLine)
.option("columnPruning", true)
.load(dir)
.select("columnA"),
Row("a"))
Expand All @@ -2099,6 +2100,7 @@ abstract class CSVSuite
.option("header", true)
.option("enforceSchema", false)
.option("multiLine", multiLine)
.option("columnPruning", true)
.load(dir)
.count() === 1L)
}
Expand Down Expand Up @@ -3163,7 +3165,7 @@ abstract class CSVSuite
}

test("SPARK-40667: validate CSV Options") {
assert(CSVOptions.getAllOptions.size == 38)
assert(CSVOptions.getAllOptions.size == 39)
// Please add validation on any new CSV options here
assert(CSVOptions.isValidOption("header"))
assert(CSVOptions.isValidOption("inferSchema"))
Expand Down Expand Up @@ -3203,6 +3205,7 @@ abstract class CSVSuite
assert(CSVOptions.isValidOption("codec"))
assert(CSVOptions.isValidOption("sep"))
assert(CSVOptions.isValidOption("delimiter"))
assert(CSVOptions.isValidOption("columnPruning"))
// Please add validation on any new parquet options with alternative here
assert(CSVOptions.getAlternativeOption("sep").contains("delimiter"))
assert(CSVOptions.getAlternativeOption("delimiter").contains("sep"))
Expand All @@ -3212,6 +3215,26 @@ abstract class CSVSuite
assert(CSVOptions.getAlternativeOption("codec").contains("compression"))
assert(CSVOptions.getAlternativeOption("preferDate").isEmpty)
}

test("SPARK-46862: column pruning in the multi-line mode") {
val data =
""""jobID","Name","City","Active"
|"1","DE","","Yes"
|"5",",","",","
|"3","SA","","No"
|"10","abcd""efgh"" \ndef","",""
|"8","SE","","No"""".stripMargin

withTempPath { path =>
Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
val df = spark.read
.option("multiline", "true")
.option("header", "true")
.option("escape", "\"")
.csv(path.getCanonicalPath)
assert(df.count() === 5)
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down

0 comments on commit 829e742

Please sign in to comment.