diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bb31e6a3e09f8..d120daa5a9434 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -522,7 +522,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, - pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None): + pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, + unescapedQuoteHandling=None): r"""Loads a CSV file and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -685,6 +686,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non modifiedAfter (batch only) : an optional timestamp to only include files with modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00) + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate + the quote character and proceed parsing the value as a quoted value, until a closing + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters of the current + parsed value until the delimiter is found. If no delimiter is found in the value, the + parser will continue accumulating characters from the input until a delimiter or line + ending is found. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters until the + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed + for the given value will be skipped and the value set in nullValue will be produced + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException + will be thrown. Examples -------- @@ -708,7 +729,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, - modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter) + modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter, + unescapedQuoteHandling=unescapedQuoteHandling) if isinstance(path, str): path = [path] if type(path) == list: diff --git a/python/pyspark/sql/readwriter.pyi b/python/pyspark/sql/readwriter.pyi index 64c5697203a44..c3b9a428f22b3 100644 --- a/python/pyspark/sql/readwriter.pyi +++ b/python/pyspark/sql/readwriter.pyi @@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils): lineSep: Optional[str] = ..., pathGlobFilter: Optional[Union[bool, str]] = ..., recursiveFileLookup: Optional[Union[bool, str]] = ..., + unescapedQuoteHandling: Optional[str] = ..., ) -> DataFrame: ... def orc( self, diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e7b2fa16d620a..365b5f38694a7 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -761,7 +761,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, - pathGlobFilter=None, recursiveFileLookup=None): + pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None): r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`. This function will go through the input once to determine the input schema if @@ -900,6 +900,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non recursiveFileLookup : str or bool, optional recursively scan a directory for files. Using this option disables `partition discovery `_. # noqa + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate + the quote character and proceed parsing the value as a quoted value, until a closing + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters of the current + parsed value until the delimiter is found. If no delimiter is found in the value, the + parser will continue accumulating characters from the input until a delimiter or line + ending is found. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value + as an unquoted value. This will make the parser accumulate all characters until the + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed + for the given value will be skipped and the value set in nullValue will be produced + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException + will be thrown. .. versionadded:: 2.0.0 @@ -926,7 +946,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine, charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep, - pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup) + pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup, + unescapedQuoteHandling=unescapedQuoteHandling) if isinstance(path, str): return self._df(self._jreader.csv(path)) else: diff --git a/python/pyspark/sql/streaming.pyi b/python/pyspark/sql/streaming.pyi index 56ce140b826d5..829610ad3b94b 100644 --- a/python/pyspark/sql/streaming.pyi +++ b/python/pyspark/sql/streaming.pyi @@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils): lineSep: Optional[str] = ..., pathGlobFilter: Optional[Union[bool, str]] = ..., recursiveFileLookup: Optional[Union[bool, str]] = ..., + unescapedQuoteHandling: Optional[str] = ..., ) -> DataFrame: ... class DataStreamWriter: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index f2191fcf35f1a..ec405994eadef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -213,6 +213,12 @@ class CSVOptions( } val lineSeparatorInWrite: Option[String] = lineSeparator + /** + * The handling method to be used when unescaped quotes are found in the input. + */ + val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters + .getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat @@ -258,7 +264,7 @@ class CSVOptions( settings.setNullValue(nullValue) settings.setEmptyValue(emptyValueInRead) settings.setMaxCharsPerColumn(maxCharsPerColumn) - settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) + settings.setUnescapedQuoteHandling(unescapedQuoteHandling) settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine) lineSeparatorInRead.foreach { _ => settings.setNormalizeLineEndingsWithinQuotes(!multiLine) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b26bc6441b6cf..8f96f0b882424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • + *
  • `unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * + *
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. Note that Spark tries * to parse only required columns in CSV under column pruning. Therefore, corrupt records diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 9bc4acd49a980..7f4ef8be562fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • + *
  • `unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * + *
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. *