From ffecd738688c9b5e076b585bbb098d9d6d4e7bb3 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 12 Mar 2024 17:09:51 -0400 Subject: [PATCH 01/28] Add _toArrow() --- python/pyspark/sql/pandas/conversion.py | 46 +++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index ec4e21daba97b..483becfb78c77 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -225,6 +225,52 @@ def toPandas(self) -> "PandasDataFrameLike": else: return pdf + def _toArrow(self) -> "pa.Table": + """ + Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. + + This is only available if PyArrow is installed and available. + + Notes + ----- + This method should only be used if the resulting PyArrow ``pyarrow.Table`` is + expected to be small, as all the data is loaded into the driver's memory. + + Examples + -------- + >>> df.toArrow() # doctest: +SKIP + pyarrow.Table + age: int64 + name: string + ---- + age: [[2,5]] + name: [["Alice","Bob"]] + + .. note:: Experimental. + """ + from pyspark.sql.dataframe import DataFrame + + assert isinstance(self, DataFrame) + + jconf = self.sparkSession._jconf + + try: + from pyspark.sql.pandas.types import to_arrow_schema + from pyspark.sql.pandas.utils import require_minimum_pyarrow_version + + require_minimum_pyarrow_version() + schema = to_arrow_schema(self.schema) + + import pyarrow as pa + + self_destruct = jconf.arrowPySparkSelfDestructEnabled() + batches = self._collect_as_arrow(split_batches=self_destruct) + table = pa.Table.from_batches(batches, schema=schema) + # Ensure only the table has a reference to the batches, so that + # self_destruct (if enabled) is effective + del batches + return table + def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch"]: """ Returns all records as a list of ArrowRecordBatches, pyarrow must be installed From 90c0404cb01b263c7836a14d2f5aeb3d36eaf861 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 12 Mar 2024 18:05:34 -0400 Subject: [PATCH 02/28] Remove unnecessary try --- python/pyspark/sql/pandas/conversion.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 483becfb78c77..ec4a12575db66 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -254,22 +254,21 @@ def _toArrow(self) -> "pa.Table": jconf = self.sparkSession._jconf - try: - from pyspark.sql.pandas.types import to_arrow_schema - from pyspark.sql.pandas.utils import require_minimum_pyarrow_version + from pyspark.sql.pandas.types import to_arrow_schema + from pyspark.sql.pandas.utils import require_minimum_pyarrow_version - require_minimum_pyarrow_version() - schema = to_arrow_schema(self.schema) + require_minimum_pyarrow_version() + schema = to_arrow_schema(self.schema) - import pyarrow as pa + import pyarrow as pa - self_destruct = jconf.arrowPySparkSelfDestructEnabled() - batches = self._collect_as_arrow(split_batches=self_destruct) - table = pa.Table.from_batches(batches, schema=schema) - # Ensure only the table has a reference to the batches, so that - # self_destruct (if enabled) is effective - del batches - return table + self_destruct = jconf.arrowPySparkSelfDestructEnabled() + batches = self._collect_as_arrow(split_batches=self_destruct) + table = pa.Table.from_batches(batches, schema=schema) + # Ensure only the table has a reference to the batches, so that + # self_destruct (if enabled) is effective + del batches + return table def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch"]: """ From fe7210129f0a696c90f96d070a5a5099dfc6d5d6 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 12 Mar 2024 21:40:49 -0400 Subject: [PATCH 03/28] Add empty_list_if_zero_records option to _collect_as_arrow() --- python/pyspark/sql/pandas/conversion.py | 32 +++++++++++++++++++------ 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index ec4a12575db66..ff590c11dde96 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -258,33 +258,46 @@ def _toArrow(self) -> "pa.Table": from pyspark.sql.pandas.utils import require_minimum_pyarrow_version require_minimum_pyarrow_version() - schema = to_arrow_schema(self.schema) + to_arrow_schema(self.schema) import pyarrow as pa self_destruct = jconf.arrowPySparkSelfDestructEnabled() - batches = self._collect_as_arrow(split_batches=self_destruct) - table = pa.Table.from_batches(batches, schema=schema) + batches = self._collect_as_arrow( + split_batches=self_destruct, + empty_list_if_zero_records=False + ) + table = pa.Table.from_batches(batches) # Ensure only the table has a reference to the batches, so that # self_destruct (if enabled) is effective del batches return table - def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch"]: + def _collect_as_arrow( + self, + split_batches: bool = False, + empty_list_if_zero_records: bool = True, + ) -> List["pa.RecordBatch"]: """ - Returns all records as a list of ArrowRecordBatches, pyarrow must be installed + Returns all records as a list of Arrow RecordBatches. PyArrow must be installed and available on driver and worker Python environments. This is an experimental feature. :param split_batches: split batches such that each column is in its own allocation, so that the selfDestruct optimization is effective; default False. + :param empty_list_if_zero_records: If True (the default), returns an empty list if the + result has 0 records. Otherwise, returns a list of length 1 containing an empty + Arrow RecordBatch which includes the schema. + .. note:: Experimental. """ from pyspark.sql.dataframe import DataFrame assert isinstance(self, DataFrame) + require_minimum_pyarrow_version() + with SCCallSiteSync(self._sc): ( port, @@ -327,8 +340,13 @@ def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch batches = results[:-1] batch_order = results[-1] - # Re-order the batch list using the correct order - return [batches[i] for i in batch_order] + if len(batches) or empty_list_if_zero_records: + # Re-order the batch list using the correct order + return [batches[i] for i in batch_order] + else: + schema = to_arrow_schema(self.schema) + empty_arrays = [pa.array([], type=field.type) for field in schema] + return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)] class SparkConversionMixin: From 25254c8054fc01692232a733c15a31badfe86827 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 12 Mar 2024 23:29:17 -0400 Subject: [PATCH 04/28] Add missing imports --- python/pyspark/sql/pandas/conversion.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index ff590c11dde96..c1622e860434d 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -296,6 +296,9 @@ def _collect_as_arrow( assert isinstance(self, DataFrame) + from pyspark.sql.pandas.types import to_arrow_schema + from pyspark.sql.pandas.utils import require_minimum_pyarrow_version + require_minimum_pyarrow_version() with SCCallSiteSync(self._sc): From 82af39b55b3b9ae4e0a0422b9c1cb9f130b9a494 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 13 Mar 2024 07:09:48 -0400 Subject: [PATCH 05/28] Format Python --- python/pyspark/sql/pandas/conversion.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index c1622e860434d..a4242bac8c887 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -264,8 +264,7 @@ def _toArrow(self) -> "pa.Table": self_destruct = jconf.arrowPySparkSelfDestructEnabled() batches = self._collect_as_arrow( - split_batches=self_destruct, - empty_list_if_zero_records=False + split_batches=self_destruct, empty_list_if_zero_records=False ) table = pa.Table.from_batches(batches) # Ensure only the table has a reference to the batches, so that @@ -343,7 +342,7 @@ def _collect_as_arrow( batches = results[:-1] batch_order = results[-1] - if len(batches) or empty_list_if_zero_records: + if len(batches) or empty_list_if_zero_records: # Re-order the batch list using the correct order return [batches[i] for i in batch_order] else: From 7d139622f4ce67c2866298a0b0ee0800d12aa26f Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Sat, 16 Mar 2024 10:48:09 -0400 Subject: [PATCH 06/28] Changes based on review feedback --- python/pyspark/sql/pandas/conversion.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a4242bac8c887..6a02a6527663d 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -295,11 +295,6 @@ def _collect_as_arrow( assert isinstance(self, DataFrame) - from pyspark.sql.pandas.types import to_arrow_schema - from pyspark.sql.pandas.utils import require_minimum_pyarrow_version - - require_minimum_pyarrow_version() - with SCCallSiteSync(self._sc): ( port, @@ -346,6 +341,7 @@ def _collect_as_arrow( # Re-order the batch list using the correct order return [batches[i] for i in batch_order] else: + from pyspark.sql.pandas.types import to_arrow_schema schema = to_arrow_schema(self.schema) empty_arrays = [pa.array([], type=field.type) for field in schema] return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)] From 3b3a7cf63726cd554d50d98737401684c725e5b9 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Sat, 16 Mar 2024 16:29:00 -0400 Subject: [PATCH 07/28] Format Python --- python/pyspark/sql/pandas/conversion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 6a02a6527663d..825bd3292633f 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -342,6 +342,7 @@ def _collect_as_arrow( return [batches[i] for i in batch_order] else: from pyspark.sql.pandas.types import to_arrow_schema + schema = to_arrow_schema(self.schema) empty_arrays = [pa.array([], type=field.type) for field in schema] return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)] From 4261c257d2579bca5c06f19bec51d43c075d04df Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Mon, 18 Mar 2024 21:47:36 -0400 Subject: [PATCH 08/28] Add test --- python/pyspark/sql/tests/test_arrow.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 8636e953aaf8f..a28e19a82ef80 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -179,6 +179,26 @@ def create_pandas_data_frame(self): data_dict["4_float_t"] = np.float32(data_dict["4_float_t"]) return pd.DataFrame(data=data_dict) + def create_arrow_table(self): + import pyarrow as pa + + data_dict = {} + for j, name in enumerate(self.schema.names): + data_dict[name] = [self.data[i][j] for i in range(len(self.data))] + t = pa.Table.from_pydict(data_dict) + # convert these to Arrow types + new_schema = t.schema.set( + t.schema.get_field_index("2_int_t"), pa.field("2_int_t", pa.int32()) + ) + new_schema = new_schema.set( + new_schema.get_field_index("4_float_t"), pa.field("4_float_t", pa.float32()) + ) + new_schema = new_schema.set( + new_schema.get_field_index("6_decimal_t"), + pa.field("6_decimal_t", pa.decimal128(38, 18)), + ) + return t.cast(new_schema) + @property def create_np_arrs(self): import numpy as np @@ -339,6 +359,12 @@ def test_pandas_round_trip(self): pdf_arrow = df.toPandas() assert_frame_equal(pdf_arrow, pdf) + def test_arrow_round_trip(self): + t_in = self.create_arrow_table() + df = self.spark.createDataFrame(self.data, schema=self.schema) + t_out = df._toArrow() + self.assertTrue(t_out.equals(t_in)) + def test_pandas_self_destruct(self): import pyarrow as pa From 7b7b88a21c6627f47e1ecfe8db9cb8b1d4eaea6f Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 19 Mar 2024 09:20:30 -0400 Subject: [PATCH 09/28] Fix timezone error in test --- python/pyspark/sql/tests/test_arrow.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index a28e19a82ef80..4ec9527f640d0 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -181,6 +181,7 @@ def create_pandas_data_frame(self): def create_arrow_table(self): import pyarrow as pa + import pyarrow.compute as pc data_dict = {} for j, name in enumerate(self.schema.names): @@ -197,7 +198,15 @@ def create_arrow_table(self): new_schema.get_field_index("6_decimal_t"), pa.field("6_decimal_t", pa.decimal128(38, 18)), ) - return t.cast(new_schema) + t = t.cast(new_schema) + # convert timestamp to local timezone + timezone = self.spark.conf.get("spark.sql.session.timeZone") + t = t.set_column( + t.schema.get_field_index("8_timestamp_t"), + "8_timestamp_t", + pc.assume_timezone(t["8_timestamp_t"], timezone), + ) + return t @property def create_np_arrs(self): From 2430e55217eeb7f4e4290dc0fb4f486190d33b08 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 19 Mar 2024 10:40:21 -0400 Subject: [PATCH 10/28] Resolve rebase conflicts --- python/pyspark/sql/connect/dataframe.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 843c92a9b27d2..4f12d20c59df5 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1768,6 +1768,12 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: assert table is not None return (table, schema) + def _toArrow(self) -> "pa.Table": + query = self._plan.to_proto(self._session.client) + return self._session.client.to_table(query, self._plan.observations) + + _toArrow.__doc__ = PySparkDataFrame._toArrow.__doc__ + def toPandas(self) -> "PandasDataFrameLike": query = self._plan.to_proto(self._session.client) return self._session.client.to_pandas(query, self._plan.observations) From 054bd4df6b863644a771c49feda52e5e34157f8f Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 19 Mar 2024 11:17:44 -0400 Subject: [PATCH 11/28] Bugfix --- python/pyspark/sql/connect/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 4f12d20c59df5..90106efb1e3de 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1769,8 +1769,8 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: return (table, schema) def _toArrow(self) -> "pa.Table": - query = self._plan.to_proto(self._session.client) - return self._session.client.to_table(query, self._plan.observations) + table = self._to_table()[0] + return table _toArrow.__doc__ = PySparkDataFrame._toArrow.__doc__ From c10912c77eb2b0124e57e5ff73d1011952153723 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Mon, 29 Apr 2024 16:09:49 -0400 Subject: [PATCH 12/28] Remove unneeded __doc__ --- python/pyspark/sql/connect/dataframe.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 90106efb1e3de..d69338d477403 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1772,8 +1772,6 @@ def _toArrow(self) -> "pa.Table": table = self._to_table()[0] return table - _toArrow.__doc__ = PySparkDataFrame._toArrow.__doc__ - def toPandas(self) -> "PandasDataFrameLike": query = self._plan.to_proto(self._session.client) return self._session.client.to_pandas(query, self._plan.observations) From 9e0db3e5d4fac91c2e1396e769432a80e5f9f126 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 07:18:42 -0400 Subject: [PATCH 13/28] Incorporate feedback from review Co-authored-by: Martin Grund --- python/pyspark/sql/connect/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index d69338d477403..ada9436ef5c95 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1769,7 +1769,7 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: return (table, schema) def _toArrow(self) -> "pa.Table": - table = self._to_table()[0] + table, _ = self._to_table() return table def toPandas(self) -> "PandasDataFrameLike": From 0af77cdc45f89046a247c4cff8a281c103174e55 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 07:31:41 -0400 Subject: [PATCH 14/28] Make non-experimental and rename toArrowTable --- python/pyspark/sql/connect/dataframe.py | 2 +- python/pyspark/sql/pandas/conversion.py | 6 ++---- python/pyspark/sql/tests/test_arrow.py | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index ada9436ef5c95..d1b14be910bb8 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1768,7 +1768,7 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: assert table is not None return (table, schema) - def _toArrow(self) -> "pa.Table": + def toArrowTable(self) -> "pa.Table": table, _ = self._to_table() return table diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 825bd3292633f..4a6a753fec8ce 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -225,7 +225,7 @@ def toPandas(self) -> "PandasDataFrameLike": else: return pdf - def _toArrow(self) -> "pa.Table": + def toArrowTable(self) -> "pa.Table": """ Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. @@ -238,15 +238,13 @@ def _toArrow(self) -> "pa.Table": Examples -------- - >>> df.toArrow() # doctest: +SKIP + >>> df.toArrowTable() # doctest: +SKIP pyarrow.Table age: int64 name: string ---- age: [[2,5]] name: [["Alice","Bob"]] - - .. note:: Experimental. """ from pyspark.sql.dataframe import DataFrame diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 4ec9527f640d0..8fbbe0636ac1b 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -371,7 +371,7 @@ def test_pandas_round_trip(self): def test_arrow_round_trip(self): t_in = self.create_arrow_table() df = self.spark.createDataFrame(self.data, schema=self.schema) - t_out = df._toArrow() + t_out = df.toArrowTable() self.assertTrue(t_out.equals(t_in)) def test_pandas_self_destruct(self): From 6f8fcbeb25692400149ff93ae7740245f10bb80b Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 08:50:08 -0400 Subject: [PATCH 15/28] Add entry to Apache Arrow in PySpark page --- examples/src/main/python/sql/arrow.py | 17 ++++++++++ .../source/user_guide/sql/arrow_pandas.rst | 34 +++++++++++++------ 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index 03daf18eadbf3..1e0fd35628d65 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -33,6 +33,21 @@ require_minimum_pyarrow_version() +def dataframe_to_arrow_table_example(spark: SparkSession) -> None: + import pyarrow as pa + + # Create a Spark DataFrame + df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()}) + + # Convert the Spark DataFrame to a PyArrow Table + table = df.select("*").toArrowTable() + + print(table.schema) + # 0: double not null + # 1: double not null + # 2: double not null + + def dataframe_with_arrow_example(spark: SparkSession) -> None: import numpy as np import pandas as pd @@ -302,6 +317,8 @@ def arrow_slen(s): # type: ignore[no-untyped-def] .appName("Python Arrow-in-Spark example") \ .getOrCreate() + print("Running Arrow conversion example: DataFrame to Table") + dataframe_to_arrow_table_example(spark) print("Running Pandas to/from conversion example") dataframe_with_arrow_example(spark) print("Running pandas_udf example: Series to Frame") diff --git a/python/docs/source/user_guide/sql/arrow_pandas.rst b/python/docs/source/user_guide/sql/arrow_pandas.rst index 1d6a4df606906..838a1ef24ada0 100644 --- a/python/docs/source/user_guide/sql/arrow_pandas.rst +++ b/python/docs/source/user_guide/sql/arrow_pandas.rst @@ -39,6 +39,20 @@ is installed and available on all cluster nodes. You can install it using pip or conda from the conda-forge channel. See PyArrow `installation `_ for details. +Conversion to Arrow Table +------------------------- + +You can call :meth:`DataFrame.toArrowTable` to convert a Spark DataFrame to a PyArrow Table. + +.. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py + :language: python + :lines: 37-48 + :dedent: 4 + +Note that :meth:`DataFrame.toArrowTable` results in the collection of all records in the DataFrame +to the driver program and should be done on a small subset of the data. Not all Spark data types +are currently supported and an error can be raised if a column has an unsupported type. + Enabling for Conversion to/from Pandas -------------------------------------- @@ -53,7 +67,7 @@ This can be controlled by ``spark.sql.execution.arrow.pyspark.fallback.enabled`` .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 37-52 + :lines: 52-67 :dedent: 4 Using the above optimizations with Arrow will produce the same results as when Arrow is not @@ -90,7 +104,7 @@ specify the type hints of ``pandas.Series`` and ``pandas.DataFrame`` as below: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 56-80 + :lines: 71-95 :dedent: 4 In the following sections, it describes the combinations of the supported type hints. For simplicity, @@ -113,7 +127,7 @@ The following example shows how to create this Pandas UDF that computes the prod .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 84-114 + :lines: 99-129 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -152,7 +166,7 @@ The following example shows how to create this Pandas UDF: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 118-140 + :lines: 133-155 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -174,7 +188,7 @@ The following example shows how to create this Pandas UDF: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 144-167 + :lines: 159-182 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -205,7 +219,7 @@ and window operations: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 171-212 + :lines: 186-227 :dedent: 4 .. currentmodule:: pyspark.sql.functions @@ -270,7 +284,7 @@ in the group. .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 216-234 + :lines: 231-249 :dedent: 4 For detailed usage, please see please see :meth:`GroupedData.applyInPandas` @@ -288,7 +302,7 @@ The following example shows how to use :meth:`DataFrame.mapInPandas`: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 238-249 + :lines: 253-264 :dedent: 4 For detailed usage, please see :meth:`DataFrame.mapInPandas`. @@ -327,7 +341,7 @@ The following example shows how to use ``DataFrame.groupby().cogroup().applyInPa .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 253-275 + :lines: 268-290 :dedent: 4 @@ -349,7 +363,7 @@ Here's an example that demonstrates the usage of both a default, pickled Python .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 279-297 + :lines: 294-312 :dedent: 4 Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF From 88f820875210210700b1b5e0f974fa5a1dbee17f Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 11:00:44 -0400 Subject: [PATCH 16/28] Add missing import in example --- examples/src/main/python/sql/arrow.py | 1 + .../source/user_guide/sql/arrow_pandas.rst | 22 +++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index 1e0fd35628d65..a324aac0171bd 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -35,6 +35,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: import pyarrow as pa + from pyspark.sql.functions import rand # Create a Spark DataFrame df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()}) diff --git a/python/docs/source/user_guide/sql/arrow_pandas.rst b/python/docs/source/user_guide/sql/arrow_pandas.rst index 838a1ef24ada0..3f31f64a0a7ab 100644 --- a/python/docs/source/user_guide/sql/arrow_pandas.rst +++ b/python/docs/source/user_guide/sql/arrow_pandas.rst @@ -46,7 +46,7 @@ You can call :meth:`DataFrame.toArrowTable` to convert a Spark DataFrame to a Py .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 37-48 + :lines: 37-49 :dedent: 4 Note that :meth:`DataFrame.toArrowTable` results in the collection of all records in the DataFrame @@ -67,7 +67,7 @@ This can be controlled by ``spark.sql.execution.arrow.pyspark.fallback.enabled`` .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 52-67 + :lines: 53-68 :dedent: 4 Using the above optimizations with Arrow will produce the same results as when Arrow is not @@ -104,7 +104,7 @@ specify the type hints of ``pandas.Series`` and ``pandas.DataFrame`` as below: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 71-95 + :lines: 72-96 :dedent: 4 In the following sections, it describes the combinations of the supported type hints. For simplicity, @@ -127,7 +127,7 @@ The following example shows how to create this Pandas UDF that computes the prod .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 99-129 + :lines: 100-130 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -166,7 +166,7 @@ The following example shows how to create this Pandas UDF: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 133-155 + :lines: 134-156 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -188,7 +188,7 @@ The following example shows how to create this Pandas UDF: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 159-182 + :lines: 160-183 :dedent: 4 For detailed usage, please see :func:`pandas_udf`. @@ -219,7 +219,7 @@ and window operations: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 186-227 + :lines: 187-228 :dedent: 4 .. currentmodule:: pyspark.sql.functions @@ -284,7 +284,7 @@ in the group. .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 231-249 + :lines: 232-250 :dedent: 4 For detailed usage, please see please see :meth:`GroupedData.applyInPandas` @@ -302,7 +302,7 @@ The following example shows how to use :meth:`DataFrame.mapInPandas`: .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 253-264 + :lines: 254-265 :dedent: 4 For detailed usage, please see :meth:`DataFrame.mapInPandas`. @@ -341,7 +341,7 @@ The following example shows how to use ``DataFrame.groupby().cogroup().applyInPa .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 268-290 + :lines: 269-291 :dedent: 4 @@ -363,7 +363,7 @@ Here's an example that demonstrates the usage of both a default, pickled Python .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python - :lines: 294-312 + :lines: 295-313 :dedent: 4 Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF From ac6a35e33683f02380992cc392aa2936b667e55e Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 11:04:33 -0400 Subject: [PATCH 17/28] Ignore unused pyarrow import --- examples/src/main/python/sql/arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index a324aac0171bd..a4cdb047d2add 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -34,7 +34,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: - import pyarrow as pa + import pyarrow as pa # noqa: F401 from pyspark.sql.functions import rand # Create a Spark DataFrame From a1550e9cb19ac18516ccda8377f79c088b1e402b Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 12:50:45 -0400 Subject: [PATCH 18/28] Add additional space before # noqa --- examples/src/main/python/sql/arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index a4cdb047d2add..9c2c52978379e 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -34,7 +34,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: - import pyarrow as pa # noqa: F401 + import pyarrow as pa # noqa: F401 from pyspark.sql.functions import rand # Create a Spark DataFrame From a7b892225ca66c63858f1a17e9a32d5b868b8de1 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 13:34:36 -0400 Subject: [PATCH 19/28] Try fix mypy check error --- examples/src/main/python/sql/arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index 9c2c52978379e..460efd8f39df2 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -41,7 +41,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()}) # Convert the Spark DataFrame to a PyArrow Table - table = df.select("*").toArrowTable() + table = df.select("*").toArrowTable() # type: ignore print(table.schema) # 0: double not null From 48769d0cc47c4c5904cb0389860a627036dd1df4 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Tue, 7 May 2024 14:40:10 -0400 Subject: [PATCH 20/28] Document use of self_destruct with toArrowTable --- .../docs/source/user_guide/sql/arrow_pandas.rst | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/python/docs/source/user_guide/sql/arrow_pandas.rst b/python/docs/source/user_guide/sql/arrow_pandas.rst index 3f31f64a0a7ab..f89b5c166044d 100644 --- a/python/docs/source/user_guide/sql/arrow_pandas.rst +++ b/python/docs/source/user_guide/sql/arrow_pandas.rst @@ -435,9 +435,12 @@ be verified by the user. Setting Arrow ``self_destruct`` for memory savings ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Since Spark 3.2, the Spark configuration ``spark.sql.execution.arrow.pyspark.selfDestruct.enabled`` can be used to enable PyArrow's ``self_destruct`` feature, which can save memory when creating a Pandas DataFrame via ``toPandas`` by freeing Arrow-allocated memory while building the Pandas DataFrame. -This option is experimental, and some operations may fail on the resulting Pandas DataFrame due to immutable backing arrays. -Typically, you would see the error ``ValueError: buffer source array is read-only``. -Newer versions of Pandas may fix these errors by improving support for such cases. -You can work around this error by copying the column(s) beforehand. -Additionally, this conversion may be slower because it is single-threaded. +Since Spark 3.2, the Spark configuration ``spark.sql.execution.arrow.pyspark.selfDestruct.enabled`` +can be used to enable PyArrow's ``self_destruct`` feature, which can save memory when creating a +Pandas DataFrame via ``toPandas`` by freeing Arrow-allocated memory while building the Pandas +DataFrame. This option can also save memory when creating a PyArrow Table via ``toArrowTable``. +This option is experimental. When used with ``toPandas``, some operations may fail on the resulting +Pandas DataFrame due to immutable backing arrays. Typically, you would see the error +``ValueError: buffer source array is read-only``. Newer versions of Pandas may fix these errors by +improving support for such cases. You can work around this error by copying the column(s) +beforehand. Additionally, this conversion may be slower because it is single-threaded. From 4deb837eecff6082e6de78ed51b732bce965e4cc Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 8 May 2024 20:38:32 -0400 Subject: [PATCH 21/28] Add entry to dataframe.rst --- python/docs/source/reference/pyspark.sql/dataframe.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst b/python/docs/source/reference/pyspark.sql/dataframe.rst index b69a2771b04fc..96ccc52f3a2e7 100644 --- a/python/docs/source/reference/pyspark.sql/dataframe.rst +++ b/python/docs/source/reference/pyspark.sql/dataframe.rst @@ -109,6 +109,7 @@ DataFrame DataFrame.tail DataFrame.take DataFrame.to + DataFrame.toArrowTable DataFrame.toDF DataFrame.toJSON DataFrame.toLocalIterator From 27f8464ed50520464970d8a9914813d81ce48ccf Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 8 May 2024 21:24:01 -0400 Subject: [PATCH 22/28] Update for consistency after #46129 --- python/pyspark/sql/classic/dataframe.py | 3 +++ python/pyspark/sql/dataframe.py | 26 +++++++++++++++++++++++++ python/pyspark/sql/pandas/conversion.py | 20 ------------------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index db9f22517ddad..aa2bf8d02b8c2 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -1825,6 +1825,9 @@ def mapInArrow( ) -> ParentDataFrame: return PandasMapOpsMixin.mapInArrow(self, func, schema, barrier, profile) + def toArrowTable(self) -> "pa.Table": + return PandasConversionMixin.toArrowTable(self) + def toPandas(self) -> "PandasDataFrameLike": return PandasConversionMixin.toPandas(self) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e3d52c45d0c1d..4219d956b362a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1200,6 +1200,7 @@ def collect(self) -> List[Row]: DataFrame.take : Returns the first `n` rows. DataFrame.head : Returns the first `n` rows. DataFrame.toPandas : Returns the data as a pandas DataFrame. + DataFrame.toArrowTable : Returns the data as a PyArrow Table. Notes ----- @@ -6213,6 +6214,31 @@ def mapInArrow( """ ... + def toArrowTable(self) -> "pa.Table": + """ + Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. + + This is only available if PyArrow is installed and available. + + .. versionadded:: 4.0.0 + + Notes + ----- + This method should only be used if the resulting PyArrow ``pyarrow.Table`` is + expected to be small, as all the data is loaded into the driver's memory. + + Examples + -------- + >>> df.toArrowTable() # doctest: +SKIP + pyarrow.Table + age: int64 + name: string + ---- + age: [[2,5]] + name: [["Alice","Bob"]] + """ + ... + def toPandas(self) -> "PandasDataFrameLike": """ Returns the contents of this :class:`DataFrame` as Pandas ``pandas.DataFrame``. diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 4a6a753fec8ce..07272daf00fd0 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -226,26 +226,6 @@ def toPandas(self) -> "PandasDataFrameLike": return pdf def toArrowTable(self) -> "pa.Table": - """ - Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. - - This is only available if PyArrow is installed and available. - - Notes - ----- - This method should only be used if the resulting PyArrow ``pyarrow.Table`` is - expected to be small, as all the data is loaded into the driver's memory. - - Examples - -------- - >>> df.toArrowTable() # doctest: +SKIP - pyarrow.Table - age: int64 - name: string - ---- - age: [[2,5]] - name: [["Alice","Bob"]] - """ from pyspark.sql.dataframe import DataFrame assert isinstance(self, DataFrame) From f0b1d71fa0fbe464fb06a880ccdddf033f7d859d Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 8 May 2024 22:55:11 -0400 Subject: [PATCH 23/28] Incorporate feedback from review Co-authored-by: Hyukjin Kwon --- python/pyspark/sql/dataframe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 4219d956b362a..532927c7dbd35 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -6227,6 +6227,8 @@ def toArrowTable(self) -> "pa.Table": This method should only be used if the resulting PyArrow ``pyarrow.Table`` is expected to be small, as all the data is loaded into the driver's memory. + This API is a developer API. + Examples -------- >>> df.toArrowTable() # doctest: +SKIP From 1bf181672aed244090c56eacac08246d7d454c13 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 8 May 2024 22:53:34 -0400 Subject: [PATCH 24/28] Add @dispatch_df_method --- python/pyspark/sql/dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 532927c7dbd35..a1d5d2b02336e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -6214,6 +6214,7 @@ def mapInArrow( """ ... + @dispatch_df_method def toArrowTable(self) -> "pa.Table": """ Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. From 3bc3901cc52d57fcb14875fca2794a52705852a7 Mon Sep 17 00:00:00 2001 From: Ian Cook Date: Wed, 8 May 2024 22:59:39 -0400 Subject: [PATCH 25/28] Rename toArrowTable -> toArrow --- examples/src/main/python/sql/arrow.py | 2 +- python/docs/source/reference/pyspark.sql/dataframe.rst | 2 +- python/docs/source/user_guide/sql/arrow_pandas.rst | 10 +++++----- python/pyspark/sql/classic/dataframe.py | 4 ++-- python/pyspark/sql/connect/dataframe.py | 2 +- python/pyspark/sql/dataframe.py | 6 +++--- python/pyspark/sql/pandas/conversion.py | 2 +- python/pyspark/sql/tests/test_arrow.py | 2 +- 8 files changed, 15 insertions(+), 15 deletions(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index 460efd8f39df2..a06de81ca2cd3 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -41,7 +41,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()}) # Convert the Spark DataFrame to a PyArrow Table - table = df.select("*").toArrowTable() # type: ignore + table = df.select("*").toArrow() # type: ignore print(table.schema) # 0: double not null diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst b/python/docs/source/reference/pyspark.sql/dataframe.rst index 96ccc52f3a2e7..ec39b645b1403 100644 --- a/python/docs/source/reference/pyspark.sql/dataframe.rst +++ b/python/docs/source/reference/pyspark.sql/dataframe.rst @@ -109,7 +109,7 @@ DataFrame DataFrame.tail DataFrame.take DataFrame.to - DataFrame.toArrowTable + DataFrame.toArrow DataFrame.toDF DataFrame.toJSON DataFrame.toLocalIterator diff --git a/python/docs/source/user_guide/sql/arrow_pandas.rst b/python/docs/source/user_guide/sql/arrow_pandas.rst index f89b5c166044d..0a527d832e211 100644 --- a/python/docs/source/user_guide/sql/arrow_pandas.rst +++ b/python/docs/source/user_guide/sql/arrow_pandas.rst @@ -42,16 +42,16 @@ You can install it using pip or conda from the conda-forge channel. See PyArrow Conversion to Arrow Table ------------------------- -You can call :meth:`DataFrame.toArrowTable` to convert a Spark DataFrame to a PyArrow Table. +You can call :meth:`DataFrame.toArrow` to convert a Spark DataFrame to a PyArrow Table. .. literalinclude:: ../../../../../examples/src/main/python/sql/arrow.py :language: python :lines: 37-49 :dedent: 4 -Note that :meth:`DataFrame.toArrowTable` results in the collection of all records in the DataFrame -to the driver program and should be done on a small subset of the data. Not all Spark data types -are currently supported and an error can be raised if a column has an unsupported type. +Note that :meth:`DataFrame.toArrow` results in the collection of all records in the DataFrame to +the driver program and should be done on a small subset of the data. Not all Spark data types are +currently supported and an error can be raised if a column has an unsupported type. Enabling for Conversion to/from Pandas -------------------------------------- @@ -438,7 +438,7 @@ Setting Arrow ``self_destruct`` for memory savings Since Spark 3.2, the Spark configuration ``spark.sql.execution.arrow.pyspark.selfDestruct.enabled`` can be used to enable PyArrow's ``self_destruct`` feature, which can save memory when creating a Pandas DataFrame via ``toPandas`` by freeing Arrow-allocated memory while building the Pandas -DataFrame. This option can also save memory when creating a PyArrow Table via ``toArrowTable``. +DataFrame. This option can also save memory when creating a PyArrow Table via ``toArrow``. This option is experimental. When used with ``toPandas``, some operations may fail on the resulting Pandas DataFrame due to immutable backing arrays. Typically, you would see the error ``ValueError: buffer source array is read-only``. Newer versions of Pandas may fix these errors by diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index aa2bf8d02b8c2..082d1aeb54cbd 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -1825,8 +1825,8 @@ def mapInArrow( ) -> ParentDataFrame: return PandasMapOpsMixin.mapInArrow(self, func, schema, barrier, profile) - def toArrowTable(self) -> "pa.Table": - return PandasConversionMixin.toArrowTable(self) + def toArrow(self) -> "pa.Table": + return PandasConversionMixin.toArrow(self) def toPandas(self) -> "PandasDataFrameLike": return PandasConversionMixin.toPandas(self) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index d1b14be910bb8..3c9415adec2dd 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -1768,7 +1768,7 @@ def _to_table(self) -> Tuple["pa.Table", Optional[StructType]]: assert table is not None return (table, schema) - def toArrowTable(self) -> "pa.Table": + def toArrow(self) -> "pa.Table": table, _ = self._to_table() return table diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a1d5d2b02336e..04522bd18eb61 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1200,7 +1200,7 @@ def collect(self) -> List[Row]: DataFrame.take : Returns the first `n` rows. DataFrame.head : Returns the first `n` rows. DataFrame.toPandas : Returns the data as a pandas DataFrame. - DataFrame.toArrowTable : Returns the data as a PyArrow Table. + DataFrame.toArrow : Returns the data as a PyArrow Table. Notes ----- @@ -6215,7 +6215,7 @@ def mapInArrow( ... @dispatch_df_method - def toArrowTable(self) -> "pa.Table": + def toArrow(self) -> "pa.Table": """ Returns the contents of this :class:`DataFrame` as PyArrow ``pyarrow.Table``. @@ -6232,7 +6232,7 @@ def toArrowTable(self) -> "pa.Table": Examples -------- - >>> df.toArrowTable() # doctest: +SKIP + >>> df.toArrow() # doctest: +SKIP pyarrow.Table age: int64 name: string diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 07272daf00fd0..344608317beb7 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -225,7 +225,7 @@ def toPandas(self) -> "PandasDataFrameLike": else: return pdf - def toArrowTable(self) -> "pa.Table": + def toArrow(self) -> "pa.Table": from pyspark.sql.dataframe import DataFrame assert isinstance(self, DataFrame) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 8fbbe0636ac1b..71d3c46e5ee1e 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -371,7 +371,7 @@ def test_pandas_round_trip(self): def test_arrow_round_trip(self): t_in = self.create_arrow_table() df = self.spark.createDataFrame(self.data, schema=self.schema) - t_out = df.toArrowTable() + t_out = df.toArrow() self.assertTrue(t_out.equals(t_in)) def test_pandas_self_destruct(self): From 1647b5876f64523481a05e4f4185ff5ef76dff62 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 9 May 2024 15:27:15 +0900 Subject: [PATCH 26/28] Update dataframe.py --- python/pyspark/sql/classic/dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/classic/dataframe.py b/python/pyspark/sql/classic/dataframe.py index 082d1aeb54cbd..9b6790d29aaa7 100644 --- a/python/pyspark/sql/classic/dataframe.py +++ b/python/pyspark/sql/classic/dataframe.py @@ -74,6 +74,7 @@ if TYPE_CHECKING: from py4j.java_gateway import JavaObject + import pyarrow as pa from pyspark.core.rdd import RDD from pyspark.core.context import SparkContext from pyspark._typing import PrimitiveType From 57550a9e258bf2507aa833fc744ab093713d7a1e Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 9 May 2024 15:27:48 +0900 Subject: [PATCH 27/28] Update dataframe.py --- python/pyspark/sql/dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 04522bd18eb61..886f72cc371e9 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -44,6 +44,7 @@ if TYPE_CHECKING: from py4j.java_gateway import JavaObject + import pyarrow as pa from pyspark.core.context import SparkContext from pyspark.core.rdd import RDD from pyspark._typing import PrimitiveType From 5e99fe3d573ed544690ee3eb8bcd8e3d85e09924 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 9 May 2024 17:24:37 +0900 Subject: [PATCH 28/28] Update examples/src/main/python/sql/arrow.py --- examples/src/main/python/sql/arrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index a06de81ca2cd3..48aee48d929c8 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -41,7 +41,7 @@ def dataframe_to_arrow_table_example(spark: SparkSession) -> None: df = spark.range(100).drop("id").withColumns({"0": rand(), "1": rand(), "2": rand()}) # Convert the Spark DataFrame to a PyArrow Table - table = df.select("*").toArrow() # type: ignore + table = df.select("*").toArrow() print(table.schema) # 0: double not null