Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18937][SQL] Timezone support in CSV/JSON parsing #16750

Closed
wants to merge 11 commits into from
43 changes: 27 additions & 16 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def load(self, path=None, format=None, schema=None, **options):
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
timeZone=None):
"""
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
Expand Down Expand Up @@ -204,11 +205,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.

>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand All @@ -225,7 +228,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat)
timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -298,7 +301,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -341,11 +344,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
Expand All @@ -357,6 +360,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
uses the default value, ``10``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
Expand All @@ -374,7 +379,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
Expand Down Expand Up @@ -591,7 +596,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
self._jwrite.saveAsTable(name)

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
timeZone=None):
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.

:param path: the path in any Hadoop supported file system
Expand All @@ -607,17 +613,20 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to format timestamps.
If None is set, it uses the default value, session local timezone.

>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
timeZone=timeZone)
self._jwrite.json(path)

@since(1.4)
Expand Down Expand Up @@ -664,7 +673,7 @@ def text(self, path, compression=None):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None):
timestampFormat=None, timeZone=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -699,18 +708,20 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.

>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
dateFormat=dateFormat, timestampFormat=timestampFormat)
dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
self._jwrite.csv(path)

@since(1.5)
Expand Down
20 changes: 12 additions & 8 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
timestampFormat=None):
timestampFormat=None, timeZone=None):
"""
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.
Expand Down Expand Up @@ -476,11 +476,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.

>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
>>> json_sdf.isStreaming
Expand All @@ -494,7 +496,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat)
timestampFormat=timestampFormat, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
Expand Down Expand Up @@ -552,7 +554,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -597,18 +599,20 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param dateFormat: sets the string that indicates a date format. Custom date formats
follow the formats at ``java.text.SimpleDateFormat``. This
applies to date type. If None is set, it uses the
default value value, ``yyyy-MM-dd``.
default value, ``yyyy-MM-dd``.
:param timestampFormat: sets the string that indicates a timestamp format. Custom date
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param maxColumns: defines a hard limit of how many columns a record can have. If None is
set, it uses the default value, ``20480``.
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``-1`` meaning unlimited length.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
If None is set, it uses the default value, session local timezone.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
When a schema is set by user, it sets ``null`` for extra fields.
Expand All @@ -628,7 +632,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,19 +482,29 @@ case class JsonTuple(children: Seq[Expression])
/**
* Converts an json input string to a [[StructType]] with the specified schema.
*/
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
case class JsonToStruct(
schema: StructType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true

def this(schema: StructType, options: Map[String, String], child: Expression) =
this(schema, options, child, None)

@transient
lazy val parser =
new JacksonParser(
schema,
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))

override def dataType: DataType = schema

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
try parser.parse(json.toString).headOption.orNull catch {
case _: SparkSQLJsonProcessingException => null
Expand All @@ -507,10 +517,15 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
/**
* Converts a [[StructType]] to a json output string.
*/
case class StructToJson(options: Map[String, String], child: Expression)
extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
case class StructToJson(
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true

def this(options: Map[String, String], child: Expression) = this(options, child, None)

@transient
lazy val writer = new CharArrayWriter()

Expand All @@ -519,7 +534,7 @@ case class StructToJson(options: Map[String, String], child: Expression)
new JacksonGenerator(
child.dataType.asInstanceOf[StructType],
writer,
new JSONOptions(options))
new JSONOptions(options, timeZoneId.get))

override def dataType: DataType = StringType

Expand All @@ -538,6 +553,9 @@ case class StructToJson(options: Map[String, String], child: Expression)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(row: Any): Any = {
gen.write(row.asInstanceOf[InternalRow])
gen.flush()
Expand Down
Loading