Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-2428: [Python] Support pandas ExtensionArray in Table.to_pandas conversion #5512

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/source/python/extending_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,39 @@ data type from above would look like::
return PeriodType, (self.freq,)

Also the storage type does not need to be fixed but can be parametrized.

Conversion to pandas
~~~~~~~~~~~~~~~~~~~~

The conversion to pandas (in :meth:`Table.to_pandas`) of columns with an
extension type can controlled in case there is a corresponding
`pandas extension array <https://pandas.pydata.org/pandas-docs/stable/development/extending.html#extension-types>`__
for your extension type.

For this, the :meth:`ExtensionType.to_pandas_dtype` method needs to be
implemented, and should return a ``pandas.api.extensions.ExtensionDtype``
subclass instance.

Using the pandas period type from above as example, this would look like::

class PeriodType(pa.ExtensionType):
...

def to_pandas_dtype(self):
import pandas as pd
return pd.PeriodDtype(freq=self.freq)

Secondly, the pandas ``ExtensionDtype`` on its turn needs to have the
``__from_arrow__`` method implemented: a method that given a pyarrow Array
or ChunkedArray of the extesion type can construct the corresponding
pandas ``ExtensionArray``. This method should have the following signature::


class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
...

def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
...

This way, you can control the conversion of an pyarrow ``Array`` of your pyarrow
extension type to a pandas ``ExtensionArray`` that can be stored in a DataFrame.
16 changes: 15 additions & 1 deletion python/pyarrow/pandas-shim.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cdef class _PandasAPIShim(object):
object _loose_version, _version
object _pd, _types_api, _compat_module
object _data_frame, _index, _series, _categorical_type
object _datetimetz_type, _extension_array
object _datetimetz_type, _extension_array, _extension_dtype
object _array_like_types
bint has_sparse

Expand Down Expand Up @@ -64,10 +64,12 @@ cdef class _PandasAPIShim(object):
self._array_like_types = (
self._series, self._index, self._categorical_type,
self._extension_array)
self._extension_dtype = pd.api.extensions.ExtensionDtype
else:
self._extension_array = None
self._array_like_types = (
self._series, self._index, self._categorical_type)
self._extension_dtype = None

if self._loose_version >= LooseVersion('0.20.0'):
from pandas.api.types import DatetimeTZDtype
Expand Down Expand Up @@ -130,6 +132,13 @@ cdef class _PandasAPIShim(object):
except AttributeError:
return self._pd.lib.infer_dtype(obj)

cpdef pandas_dtype(self, dtype):
self._check_import()
try:
return self._types_api.pandas_dtype(dtype)
except AttributeError:
return None

@property
def loose_version(self):
self._check_import()
Expand All @@ -149,6 +158,11 @@ cdef class _PandasAPIShim(object):
def datetimetz_type(self):
return self._datetimetz_type

@property
def extension_dtype(self):
self._check_import()
return self._extension_dtype

cpdef is_array_like(self, obj):
self._check_import()
return isinstance(obj, self._array_like_types)
Expand Down
144 changes: 119 additions & 25 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,35 @@ def serialized_dict_to_dataframe(data):
return _pandas_api.data_frame(block_mgr)


def _reconstruct_block(item):
def _reconstruct_block(item, columns=None, extension_columns=None):
"""
Construct a pandas Block from the `item` dictionary coming from pyarrow's
serialization or returned by arrow::python::ConvertTableToPandas.

This function takes care of converting dictionary types to pandas
categorical, Timestamp-with-timezones to the proper pandas Block, and
conversion to pandas ExtensionBlock

Parameters
----------
item : dict
For basic types, this is a dictionary in the form of
{'block': np.ndarray of values, 'placement': pandas block placement}.
Additional keys are present for other types (dictionary, timezone,
object).
columns :
Column names of the table being constructed, used for extension types
extension_columns : dict
Dictionary of {column_name: pandas_dtype} that includes all columns
and corresponding dtypes that will be converted to a pandas
ExtensionBlock.

Returns
-------
pandas Block

"""
import pandas.core.internals as _int
# Construct the individual blocks converting dictionary types to pandas
# categorical types and Timestamps-with-timezones types to the proper
# pandas Blocks

block_arr = item.get('block', None)
placement = item['placement']
Expand All @@ -668,25 +692,16 @@ def _reconstruct_block(item):
block = _int.make_block(builtin_pickle.loads(block_arr),
placement=placement, klass=_int.ObjectBlock)
elif 'py_array' in item:
arr = item['py_array']
# TODO have mechanism to know a method to create a
# pandas ExtensionArray given the pyarrow type
# Now hardcode here to create a pandas IntegerArray for the example
arr = arr.chunk(0)
buflist = arr.buffers()
data = np.frombuffer(buflist[-1], dtype=arr.type.to_pandas_dtype())[
arr.offset:arr.offset + len(arr)]
bitmask = buflist[0]
if bitmask is not None:
mask = pa.BooleanArray.from_buffers(
pa.bool_(), len(arr), [None, bitmask])
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
block_arr = _pandas_api.pd.arrays.IntegerArray(
data.copy(), ~mask, copy=False)
# create ExtensionBlock
block = _int.make_block(block_arr, placement=placement,
arr = item['py_array']
assert len(placement) == 1
name = columns[placement[0]]
pandas_dtype = extension_columns[name]
if not hasattr(pandas_dtype, '__from_arrow__'):
raise ValueError("This column does not support to be converted "
"to a pandas ExtensionArray")
pd_ext_arr = pandas_dtype.__from_arrow__(arr)
block = _int.make_block(pd_ext_arr, placement=placement,
klass=_int.ExtensionBlock)
else:
block = _int.make_block(block_arr, placement=placement)
Expand Down Expand Up @@ -718,17 +733,94 @@ def table_to_blockmanager(options, table, categories=None,
table = _add_any_metadata(table, pandas_metadata)
table, index = _reconstruct_index(table, index_descriptors,
all_columns)
ext_columns_dtypes = _get_extension_dtypes(
table, all_columns, extension_columns)
else:
index = _pandas_api.pd.RangeIndex(table.num_rows)
if extension_columns:
raise ValueError("extension_columns not supported if there is "
"no pandas_metadata")
ext_columns_dtypes = _get_extension_dtypes(
table, [], extension_columns)

_check_data_column_metadata_consistency(all_columns)
blocks = _table_to_blocks(options, table, categories, extension_columns)
blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
columns = _deserialize_column_index(table, all_columns, column_indexes)

axes = [columns, index]
return BlockManager(blocks, axes)


# Set of the string repr of all numpy dtypes that can be stored in a pandas
# dataframe (complex not included since not supported by Arrow)
_pandas_supported_numpy_types = set([
str(np.dtype(typ))
for typ in (np.sctypes['int'] + np.sctypes['uint'] + np.sctypes['float']
+ ['object', 'bool'])
])


def _get_extension_dtypes(table, columns_metadata, extension_columns):
"""
Based on the stored column pandas metadata and the extension types
in the arrow schema, infer which columns should be converted to a
pandas extension dtype.

The 'numpy_type' field in the column metadata stores the string
representation of the original pandas dtype (and, despite its name,
not the 'pandas_type' field).
Based on this string representation, a pandas/numpy dtype is constructed
and then we can check if this dtype supports conversion from arrow.

"""
ext_columns = {}

# older pandas version that does not yet support extension dtypes
if _pandas_api.extension_dtype is None:
if extension_columns is not None:
raise ValueError(
"Converting to pandas ExtensionDtypes is not supported")
return ext_columns

if extension_columns is None:
# infer the extension columns from the pandas metadata
for col_meta in columns_metadata:
name = col_meta['name']
dtype = col_meta['numpy_type']
if dtype not in _pandas_supported_numpy_types:
# pandas_dtype is expensive, so avoid doing this for types
# that are certainly numpy dtypes
pandas_dtype = _pandas_api.pandas_dtype(dtype)
if isinstance(pandas_dtype, _pandas_api.extension_dtype):
if hasattr(pandas_dtype, "__from_arrow__"):
ext_columns[name] = pandas_dtype
# infer from extension type in the schema
for field in table.schema:
typ = field.type
if isinstance(typ, pa.BaseExtensionType):
try:
pandas_dtype = typ.to_pandas_dtype()
except NotImplementedError:
pass
else:
ext_columns[field.name] = pandas_dtype

else:
# get the extension dtype for the specified columns
for name in extension_columns:
col_meta = [
meta for meta in columns_metadata if meta['name'] == name][0]
pandas_dtype = _pandas_api.pandas_dtype(col_meta['numpy_type'])
if not isinstance(pandas_dtype, _pandas_api.extension_dtype):
raise ValueError("not an extension dtype")
if not hasattr(pandas_dtype, "__from_arrow__"):
raise ValueError("this column does not support to be "
"converted to extension dtype")
ext_columns[name] = pandas_dtype

return ext_columns


def _check_data_column_metadata_consistency(all_columns):
# It can never be the case in a released version of pyarrow that
# c['name'] is None *and* 'field_name' is not a key in the column metadata,
Expand Down Expand Up @@ -995,10 +1087,12 @@ def _table_to_blocks(options, block_table, categories, extension_columns):

# Convert an arrow table to Block from the internal pandas API
result = pa.lib.table_to_blocks(options, block_table, categories,
extension_columns)
list(extension_columns.keys()))

# Defined above
return [_reconstruct_block(item) for item in result]
columns = block_table.column_names
return [_reconstruct_block(item, columns, extension_columns)
for item in result]


def _flatten_single_level_multiindex(index):
Expand Down
109 changes: 80 additions & 29 deletions python/pyarrow/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3226,45 +3226,96 @@ def test_array_protocol():
# Pandas ExtensionArray support


def _to_pandas(table, extension_columns=None):
# temporary test function as long as we have no public API to do this
from pyarrow.pandas_compat import table_to_blockmanager
def _Int64Dtype__from_arrow__(self, array):
# for test only deal with single chunk for now
# TODO: do we require handling of chunked arrays in the protocol?
arr = array.chunk(0)
buflist = arr.buffers()
data = np.frombuffer(buflist[-1], dtype='int64')[
arr.offset:arr.offset + len(arr)]
bitmask = buflist[0]
if bitmask is not None:
mask = pa.BooleanArray.from_buffers(
pa.bool_(), len(arr), [None, bitmask])
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
int_arr = pd.arrays.IntegerArray(data.copy(), ~mask, copy=False)
return int_arr

options = dict(
pool=None,
strings_to_categorical=False,
zero_copy_only=False,
integer_object_nulls=False,
date_as_object=True,
use_threads=True,
deduplicate_objects=True)

mgr = table_to_blockmanager(
options, table, extension_columns=extension_columns)
return pd.DataFrame(mgr)
def test_convert_to_extension_array(monkeypatch):
if LooseVersion(pd.__version__) < "0.26.0.dev":
pytest.skip("Conversion from IntegerArray to arrow not yet supported")

import pandas.core.internals as _int

def test_convert_to_extension_array():
if LooseVersion(pd.__version__) < '0.24.0':
pytest.skip(reason='IntegerArray only introduced in 0.24')
# table converted from dataframe with extension types (so pandas_metadata
# has this information)
df = pd.DataFrame(
{'a': [1, 2, 3], 'b': pd.array([2, 3, 4], dtype='Int64'),
'c': [4, 5, 6]})
table = pa.table(df)

# Int64Dtype has no __arrow_array__ -> use normal conversion
result = table.to_pandas()
assert len(result._data.blocks) == 1
assert isinstance(result._data.blocks[0], _int.IntBlock)

# patch pandas Int64Dtype to have the protocol method
monkeypatch.setattr(
pd.Int64Dtype, '__from_arrow__', _Int64Dtype__from_arrow__,
raising=False)

# Int64Dtype is recognized -> convert to extension block by default
# for a proper roundtrip
result = table.to_pandas()
assert isinstance(result._data.blocks[0], _int.IntBlock)
assert isinstance(result._data.blocks[1], _int.ExtensionBlock)
tm.assert_frame_equal(result, df)

# test with missing values
df2 = pd.DataFrame({'a': pd.array([1, 2, None], dtype='Int64')})
table2 = pa.table(df2)
result = table2.to_pandas()
assert isinstance(result._data.blocks[0], _int.ExtensionBlock)
tm.assert_frame_equal(result, df2)


class MyCustomIntegerType(pa.PyExtensionType):

def __init__(self):
pa.PyExtensionType.__init__(self, pa.int64())

def __reduce__(self):
return MyCustomIntegerType, ()

def to_pandas_dtype(self):
return pd.Int64Dtype()


def test_conversion_extensiontype_to_extensionarray(monkeypatch):
# converting extension type to linked pandas ExtensionDtype/Array
import pandas.core.internals as _int

table = pa.table({'a': [1, 2, 3], 'b': [2, 3, 4]})
storage = pa.array([1, 2, 3, 4], pa.int64())
arr = pa.ExtensionArray.from_storage(MyCustomIntegerType(), storage)
table = pa.table({'a': arr})

df = _to_pandas(table)
assert len(df._data.blocks) == 1
assert isinstance(df._data.blocks[0], _int.IntBlock)
with pytest.raises(ValueError):
table.to_pandas()

df = _to_pandas(table, extension_columns=['b'])
assert isinstance(df._data.blocks[0], _int.IntBlock)
assert isinstance(df._data.blocks[1], _int.ExtensionBlock)
# patch pandas Int64Dtype to have the protocol method
monkeypatch.setattr(
pd.Int64Dtype, '__from_arrow__', _Int64Dtype__from_arrow__,
raising=False)

table = pa.table({'a': [1, 2, None]})
df = _to_pandas(table, extension_columns=['a'])
assert isinstance(df._data.blocks[0], _int.ExtensionBlock)
expected = pd.DataFrame({'a': pd.Series([1, 2, None], dtype='Int64')})
tm.assert_frame_equal(df, expected)
# extension type points to Int64Dtype, which knows how to create a
# pandas ExtensionArray
result = table.to_pandas()
assert isinstance(result._data.blocks[0], _int.ExtensionBlock)
expected = pd.DataFrame({'a': pd.array([1, 2, 3, 4], dtype='Int64')})
tm.assert_frame_equal(result, expected)


# ----------------------------------------------------------------------
Expand Down