diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 9abd2735fff11..3a0adef178475 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -274,33 +274,46 @@ def _toArrow(self) -> "pa.Table": from pyspark.sql.pandas.utils import require_minimum_pyarrow_version require_minimum_pyarrow_version() - schema = to_arrow_schema(self.schema) + to_arrow_schema(self.schema) import pyarrow as pa self_destruct = jconf.arrowPySparkSelfDestructEnabled() - batches = self._collect_as_arrow(split_batches=self_destruct) - table = pa.Table.from_batches(batches, schema=schema) + batches = self._collect_as_arrow( + split_batches=self_destruct, + empty_list_if_zero_records=False + ) + table = pa.Table.from_batches(batches) # Ensure only the table has a reference to the batches, so that # self_destruct (if enabled) is effective del batches return table - def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch"]: + def _collect_as_arrow( + self, + split_batches: bool = False, + empty_list_if_zero_records: bool = True, + ) -> List["pa.RecordBatch"]: """ - Returns all records as a list of ArrowRecordBatches, pyarrow must be installed + Returns all records as a list of Arrow RecordBatches. PyArrow must be installed and available on driver and worker Python environments. This is an experimental feature. :param split_batches: split batches such that each column is in its own allocation, so that the selfDestruct optimization is effective; default False. + :param empty_list_if_zero_records: If True (the default), returns an empty list if the + result has 0 records. Otherwise, returns a list of length 1 containing an empty + Arrow RecordBatch which includes the schema. + .. note:: Experimental. """ from pyspark.sql.dataframe import DataFrame assert isinstance(self, DataFrame) + require_minimum_pyarrow_version() + with SCCallSiteSync(self._sc): ( port, @@ -343,8 +356,13 @@ def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch batches = results[:-1] batch_order = results[-1] - # Re-order the batch list using the correct order - return [batches[i] for i in batch_order] + if len(batches) or empty_list_if_zero_records: + # Re-order the batch list using the correct order + return [batches[i] for i in batch_order] + else: + schema = to_arrow_schema(self.schema) + empty_arrays = [pa.array([], type=field.type) for field in schema] + return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)] class SparkConversionMixin: