Skip to content

Commit

Permalink
Add empty_list_if_zero_records option to _collect_as_arrow()
Browse files Browse the repository at this point in the history
  • Loading branch information
ianmcook committed Mar 13, 2024
1 parent af4b472 commit 674d71d
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,33 +274,46 @@ def _toArrow(self) -> "pa.Table":
from pyspark.sql.pandas.utils import require_minimum_pyarrow_version

require_minimum_pyarrow_version()
schema = to_arrow_schema(self.schema)
to_arrow_schema(self.schema)

import pyarrow as pa

self_destruct = jconf.arrowPySparkSelfDestructEnabled()
batches = self._collect_as_arrow(split_batches=self_destruct)
table = pa.Table.from_batches(batches, schema=schema)
batches = self._collect_as_arrow(
split_batches=self_destruct,
empty_list_if_zero_records=False
)
table = pa.Table.from_batches(batches)
# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches
return table

def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch"]:
def _collect_as_arrow(
self,
split_batches: bool = False,
empty_list_if_zero_records: bool = True,
) -> List["pa.RecordBatch"]:
"""
Returns all records as a list of ArrowRecordBatches, pyarrow must be installed
Returns all records as a list of Arrow RecordBatches. PyArrow must be installed
and available on driver and worker Python environments.
This is an experimental feature.
:param split_batches: split batches such that each column is in its own allocation, so
that the selfDestruct optimization is effective; default False.
:param empty_list_if_zero_records: If True (the default), returns an empty list if the
result has 0 records. Otherwise, returns a list of length 1 containing an empty
Arrow RecordBatch which includes the schema.
.. note:: Experimental.
"""
from pyspark.sql.dataframe import DataFrame

assert isinstance(self, DataFrame)

require_minimum_pyarrow_version()

with SCCallSiteSync(self._sc):
(
port,
Expand Down Expand Up @@ -343,8 +356,13 @@ def _collect_as_arrow(self, split_batches: bool = False) -> List["pa.RecordBatch
batches = results[:-1]
batch_order = results[-1]

# Re-order the batch list using the correct order
return [batches[i] for i in batch_order]
if len(batches) or empty_list_if_zero_records:
# Re-order the batch list using the correct order
return [batches[i] for i in batch_order]
else:
schema = to_arrow_schema(self.schema)
empty_arrays = [pa.array([], type=field.type) for field in schema]
return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)]


class SparkConversionMixin:
Expand Down

0 comments on commit 674d71d

Please sign in to comment.