Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
LuciferYang committed Nov 26, 2020
1 parent dfa3978 commit b025271
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ class CSVOptions(
}
val lineSeparatorInWrite: Option[String] = lineSeparator

/**
* The handling method to be used when unescaped quotes are found in the input.
*/
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
.getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down Expand Up @@ -258,7 +264,7 @@ class CSVOptions(
settings.setNullValue(nullValue)
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no
* delimiter is found in the value, the parser will continue accumulating characters from
* the input until a delimiter or line ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
* to parse only required columns in CSV under column pruning. Therefore, corrupt records
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
c1,c2
"a,""b,c","xyz"
"a,b,c","x""yz"
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ abstract class CSVSuite
private val valueMalformedFile = "test-data/value-malformed.csv"
private val badAfterGoodFile = "test-data/bad_after_good.csv"
private val malformedRowFile = "test-data/malformedRow.csv"
private val unescapedQuotesAndUnescapedDelimiterFile =
"test-data/unescaped-quotes-unescaped-delimiter.csv"

/** Verifies data and schema. */
private def verifyCars(
Expand Down Expand Up @@ -2428,6 +2430,19 @@ abstract class CSVSuite
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
}
}

test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
"unescapedQuotesAndUnescapedDelimiterFile correctly") {
// Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
// the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
val result = spark.read
.option("inferSchema", "true")
.option("header", "true")
.option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
.csv(testFile(unescapedQuotesAndUnescapedDelimiterFile)).collect()
val exceptResults = Array(Row("""a,""b,c""", "xyz"), Row("""a,b,c""", """x""yz"""))
assert(result.sameElements(exceptResults))
}
}

class CSVv1Suite extends CSVSuite {
Expand Down

0 comments on commit b025271

Please sign in to comment.