Skip to content

Commit

Permalink
[SPARK-46862][SQL][FOLLOWUP] Fix column pruning without schema enforc…
Browse files Browse the repository at this point in the history
…ing in V1 CSV datasource

### What changes were proposed in this pull request?
In the PR, I propose to invoke `CSVOptons.isColumnPruningEnabled` introduced by #44872 while matching of CSV header to a schema in the V1 CSV datasource.

### Why are the changes needed?
To fix the failure when column pruning happens and a schema is not enforced:
```scala
scala> spark.read.
     | option("multiLine", true).
     | option("header", true).
     | option("escape", "\"").
     | option("enforceSchema", false).
     | csv("/Users/maximgekk/tmp/es-939111-data.csv").
     | count()
24/01/27 12:43:14 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 4, schema size: 0
CSV file: file:///Users/maximgekk/tmp/es-939111-data.csv
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *CSVv1Suite"
$ build/sbt "test:testOnly *CSVv2Suite"
$ build/sbt "test:testOnly *CSVLegacyTimeParserSuite"
$ build/sbt "testOnly *.CsvFunctionsSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44910 from MaxGekk/check-header-column-pruning.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
MaxGekk committed Jan 27, 2024
1 parent 6d29c72 commit bc51c9f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val columnPruning = sparkSession.sessionState.conf.csvColumnPruning
val parsedOptions = new CSVOptions(
options,
columnPruning,
sparkSession.sessionState.conf.csvColumnPruning,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val isColumnPruningEnabled = parsedOptions.isColumnPruningEnabled

// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
Expand All @@ -125,7 +125,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
actualRequiredSchema,
parsedOptions,
actualFilters)
val schema = if (columnPruning) actualRequiredSchema else actualDataSchema
val schema = if (isColumnPruningEnabled) actualRequiredSchema else actualDataSchema
val isStartOfFile = file.start == 0
val headerChecker = new CSVHeaderChecker(
schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3237,12 +3237,15 @@ abstract class CSVSuite

withTempPath { path =>
Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
val df = spark.read
.option("multiline", "true")
.option("header", "true")
.option("escape", "\"")
.csv(path.getCanonicalPath)
assert(df.count() === 5)
Seq(true, false).foreach { enforceSchema =>
val df = spark.read
.option("multiLine", true)
.option("header", true)
.option("escape", "\"")
.option("enforceSchema", enforceSchema)
.csv(path.getCanonicalPath)
assert(df.count() === 5)
}
}
}
}
Expand Down

0 comments on commit bc51c9f

Please sign in to comment.