From 8ac7925d4e330c569aeb4525329d5f2482aa924b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 15 Feb 2019 16:34:07 +0900 Subject: [PATCH 1/3] Create datetime.date directly instead of creating datetime64[ns] as intermediate data. --- python/pyspark/serializers.py | 5 +- python/pyspark/sql/dataframe.py | 5 +- python/pyspark/sql/tests/test_arrow.py | 5 +- .../sql/tests/test_pandas_udf_scalar.py | 3 +- python/pyspark/sql/types.py | 55 ++++++++++++------- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 3db259551fa8b..a2c59fedfc8cd 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -311,10 +311,9 @@ def __init__(self, timezone, safecheck): def arrow_to_pandas(self, arrow_column): from pyspark.sql.types import from_arrow_type, \ - _check_series_convert_date, _check_series_localize_timestamps + _arrow_column_to_pandas, _check_series_localize_timestamps - s = arrow_column.to_pandas() - s = _check_series_convert_date(s, from_arrow_type(arrow_column.type)) + s = _arrow_column_to_pandas(arrow_column, from_arrow_type(arrow_column.type)) s = _check_series_localize_timestamps(s, self._timezone) return s diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a1056d0b787e3..472d2969b3e19 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2107,14 +2107,13 @@ def toPandas(self): # of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled. if use_arrow: try: - from pyspark.sql.types import _check_dataframe_convert_date, \ + from pyspark.sql.types import _arrow_table_to_pandas, \ _check_dataframe_localize_timestamps import pyarrow batches = self._collectAsArrow() if len(batches) > 0: table = pyarrow.Table.from_batches(batches) - pdf = table.to_pandas() - pdf = _check_dataframe_convert_date(pdf, self.schema) + pdf = _arrow_table_to_pandas(table, self.schema) return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 8a62500b17f27..38a6402c01322 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -68,7 +68,9 @@ def setUpClass(cls): (u"b", 2, 20, 0.4, 4.0, Decimal("4.0"), date(2012, 2, 2), datetime(2012, 2, 2, 2, 2, 2)), (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), - date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] + date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3)), + (u"d", 4, 40, 1.0, 8.0, Decimal("8.0"), + date(2262, 4, 12), datetime(2262, 3, 3, 3, 3, 3))] # TODO: remove version check once minimum pyarrow version is 0.10.0 if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): @@ -76,6 +78,7 @@ def setUpClass(cls): cls.data[0] = cls.data[0] + (bytearray(b"a"),) cls.data[1] = cls.data[1] + (bytearray(b"bb"),) cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) + cls.data[3] = cls.data[3] + (bytearray(b"dddd"),) @classmethod def tearDownClass(cls): diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/test_pandas_udf_scalar.py index 6a6865a9fb16d..28ef98d7b3f1e 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py @@ -349,7 +349,8 @@ def test_vectorized_udf_dates(self): data = [(0, date(1969, 1, 1),), (1, date(2012, 2, 2),), (2, None,), - (3, date(2100, 4, 4),)] + (3, date(2100, 4, 4),), + (4, date(2262, 4, 12),)] df = self.spark.createDataFrame(data, schema=schema) date_copy = pandas_udf(lambda t: t, returnType=DateType()) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 4b8f2efff4acc..1f113d334dde5 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1681,38 +1681,53 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _check_series_convert_date(series, data_type): - """ - Cast the series to datetime.date if it's a date type, otherwise returns the original series. +def _arrow_column_to_pandas(column, data_type): + """ Convert Arrow Column to pandas Series. + + If the given column is a date type column, creates a series of datetime.date directly instead + of creating datetime64[ns] as intermediate data. - :param series: pandas.Series - :param data_type: a Spark data type for the series + :param series: pyarrow.lib.Column + :param data_type: a Spark data type for the column """ - import pyarrow + import pandas as pd + import pyarrow as pa from distutils.version import LooseVersion - # As of Arrow 0.12.0, date_as_objects is True by default, see ARROW-3910 - if LooseVersion(pyarrow.__version__) < LooseVersion("0.12.0") and type(data_type) == DateType: - return series.dt.date + # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of np.datetime64. + if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): + if type(data_type) == DateType: + return pd.Series(column.to_pylist(), name=column.name) + else: + return column.to_pandas() else: - return series + return column.to_pandas(date_as_object=True) + +def _arrow_table_to_pandas(table, schema): + """ Convert Arrow Table to pandas DataFrame. -def _check_dataframe_convert_date(pdf, schema): - """ Correct date type value to use datetime.date. + If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 + or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as + intermediate data. Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should use datetime.date to match the behavior with when Arrow optimization is disabled. - :param pdf: pandas.DataFrame - :param schema: a Spark schema of the pandas.DataFrame + :param table: pyarrow.lib.Table + :param schema: a Spark schema of the pyarrow.lib.Table """ - import pyarrow + import pandas as pd + import pyarrow as pa from distutils.version import LooseVersion - # As of Arrow 0.12.0, date_as_objects is True by default, see ARROW-3910 - if LooseVersion(pyarrow.__version__) < LooseVersion("0.12.0"): - for field in schema: - pdf[field.name] = _check_series_convert_date(pdf[field.name], field.dataType) - return pdf + # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of np.datetime64. + if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): + if any(type(field.dataType) == DateType for field in schema): + return pd.concat([_arrow_column_to_pandas(column, field.dataType) + for column, field in zip(table.itercolumns(), schema)], axis=1) + else: + return table.to_pandas() + else: + return table.to_pandas(date_as_object=True) def _get_local_timezone(): From b93b500b9fe3424e8f2a95311e7895d3bb5dbf52 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 18 Feb 2019 12:24:38 +0900 Subject: [PATCH 2/3] Address comments. --- python/pyspark/sql/types.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 1f113d334dde5..ed1ff8766fe0a 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1684,22 +1684,23 @@ def from_arrow_schema(arrow_schema): def _arrow_column_to_pandas(column, data_type): """ Convert Arrow Column to pandas Series. - If the given column is a date type column, creates a series of datetime.date directly instead - of creating datetime64[ns] as intermediate data. - :param series: pyarrow.lib.Column :param data_type: a Spark data type for the column """ import pandas as pd import pyarrow as pa from distutils.version import LooseVersion - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of np.datetime64. + # If the given column is a date type column, creates a series of datetime.date directly instead + # of creating datetime64[ns] as intermediate data to avoid overflow caused by datetime64[ns] + # type handling. if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): if type(data_type) == DateType: return pd.Series(column.to_pylist(), name=column.name) else: return column.to_pandas() else: + # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of + # np.datetime64. return column.to_pandas(date_as_object=True) @@ -1719,7 +1720,9 @@ def _arrow_table_to_pandas(table, schema): import pandas as pd import pyarrow as pa from distutils.version import LooseVersion - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of np.datetime64. + # If the given table contains date type columns, creates series of datetime.date directly + # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by + # datetime64[ns] type handling. if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): if any(type(field.dataType) == DateType for field in schema): return pd.concat([_arrow_column_to_pandas(column, field.dataType) @@ -1727,6 +1730,8 @@ def _arrow_table_to_pandas(table, schema): else: return table.to_pandas() else: + # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of + # np.datetime64. return table.to_pandas(date_as_object=True) From cca2634abda4f2af057cb26a6f44f762d5db856d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 18 Feb 2019 12:29:47 +0900 Subject: [PATCH 3/3] Fix. --- python/pyspark/sql/types.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index ed1ff8766fe0a..348cb5b118594 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1707,10 +1707,6 @@ def _arrow_column_to_pandas(column, data_type): def _arrow_table_to_pandas(table, schema): """ Convert Arrow Table to pandas DataFrame. - If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 - or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as - intermediate data. - Pandas DataFrame created from PyArrow uses datetime64[ns] for date type values, but we should use datetime.date to match the behavior with when Arrow optimization is disabled. @@ -1720,9 +1716,9 @@ def _arrow_table_to_pandas(table, schema): import pandas as pd import pyarrow as pa from distutils.version import LooseVersion - # If the given table contains date type columns, creates series of datetime.date directly - # instead of creating datetime64[ns] as intermediate data to avoid overflow caused by - # datetime64[ns] type handling. + # If the given table contains a date type column, use `_arrow_column_to_pandas` for pyarrow<0.11 + # or use `date_as_object` option for pyarrow>=0.11 to avoid creating datetime64[ns] as + # intermediate data. if LooseVersion(pa.__version__) < LooseVersion("0.11.0"): if any(type(field.dataType) == DateType for field in schema): return pd.concat([_arrow_column_to_pandas(column, field.dataType) @@ -1730,8 +1726,6 @@ def _arrow_table_to_pandas(table, schema): else: return table.to_pandas() else: - # Since Arrow 0.11.0, support date_as_object to return datetime.date instead of - # np.datetime64. return table.to_pandas(date_as_object=True)