Skip to content

Commit

Permalink
ARROW-2428: [Python] Support pandas ExtensionArray in Table.to_pandas…
Browse files Browse the repository at this point in the history
… conversion
  • Loading branch information
jorisvandenbossche committed Oct 9, 2019
1 parent 4db8f7b commit 2cd29d8
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 56 deletions.
4 changes: 3 additions & 1 deletion python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ cdef class _PandasConvertible:
bint date_as_object=True,
bint use_threads=True,
bint deduplicate_objects=True,
bint ignore_metadata=False
bint ignore_metadata=False,
extension_columns=None,
):
"""
Convert to a pandas-compatible NumPy array or DataFrame, as appropriate
Expand Down Expand Up @@ -466,6 +467,7 @@ cdef class _PandasConvertible:
deduplicate_objects=deduplicate_objects
)
return self._to_pandas(options, categories=categories,
extension_columns=extension_columns,
ignore_metadata=ignore_metadata)


Expand Down
16 changes: 15 additions & 1 deletion python/pyarrow/pandas-shim.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ cdef class _PandasAPIShim(object):
object _loose_version, _version
object _pd, _types_api, _compat_module
object _data_frame, _index, _series, _categorical_type
object _datetimetz_type, _extension_array
object _datetimetz_type, _extension_array, _extension_dtype
object _array_like_types
bint has_sparse

Expand Down Expand Up @@ -64,10 +64,12 @@ cdef class _PandasAPIShim(object):
self._array_like_types = (
self._series, self._index, self._categorical_type,
self._extension_array)
self._extension_dtype = pd.api.extensions.ExtensionDtype
else:
self._extension_array = None
self._array_like_types = (
self._series, self._index, self._categorical_type)
self._extension_dtype = None

if self._loose_version >= LooseVersion('0.20.0'):
from pandas.api.types import DatetimeTZDtype
Expand Down Expand Up @@ -130,6 +132,13 @@ cdef class _PandasAPIShim(object):
except AttributeError:
return self._pd.lib.infer_dtype(obj)

cpdef pandas_dtype(self, dtype):
self._check_import()
try:
return self._types_api.pandas_dtype(dtype)
except AttributeError:
return None

@property
def loose_version(self):
self._check_import()
Expand All @@ -149,6 +158,11 @@ cdef class _PandasAPIShim(object):
def datetimetz_type(self):
return self._datetimetz_type

@property
def extension_dtype(self):
self._check_import()
return self._extension_dtype

cpdef is_array_like(self, obj):
self._check_import()
return isinstance(obj, self._array_like_types)
Expand Down
86 changes: 64 additions & 22 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def get_logical_type_map():
pa.lib.Type_BINARY: 'bytes',
pa.lib.Type_FIXED_SIZE_BINARY: 'bytes',
pa.lib.Type_STRING: 'unicode',
# pa.lib.Type_EXTENSION: 'extension',
})
return _logical_type_map

Expand Down Expand Up @@ -643,7 +644,7 @@ def serialized_dict_to_dataframe(data):
return _pandas_api.data_frame(block_mgr)


def _reconstruct_block(item):
def _reconstruct_block(item, columns=None, extension_columns=None):
import pandas.core.internals as _int
# Construct the individual blocks converting dictionary types to pandas
# categorical types and Timestamps-with-timezones types to the proper
Expand All @@ -666,25 +667,15 @@ def _reconstruct_block(item):
block = _int.make_block(builtin_pickle.loads(block_arr),
placement=placement, klass=_int.ObjectBlock)
elif 'py_array' in item:
arr = item['py_array']
# TODO have mechanism to know a method to create a
# pandas ExtensionArray given the pyarrow type
# Now hardcode here to create a pandas IntegerArray for the example
arr = arr.chunk(0)
buflist = arr.buffers()
data = np.frombuffer(buflist[-1], dtype=arr.type.to_pandas_dtype())[
arr.offset:arr.offset + len(arr)]
bitmask = buflist[0]
if bitmask is not None:
mask = pa.BooleanArray.from_buffers(
pa.bool_(), len(arr), [None, bitmask])
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
block_arr = _pandas_api.pd.arrays.IntegerArray(
data.copy(), ~mask, copy=False)
# create ExtensionBlock
block = _int.make_block(block_arr, placement=placement,
arr = item['py_array']
assert len(placement) == 1
name = columns[placement[0]]
pandas_dtype = extension_columns[name]
if not hasattr(pandas_dtype, '__from_arrow__'):
raise ValueError("This column does not support")
pd_ext_arr = pandas_dtype.__from_arrow__(arr)
block = _int.make_block(pd_ext_arr, placement=placement,
klass=_int.ExtensionBlock)
else:
block = _int.make_block(block_arr, placement=placement)
Expand Down Expand Up @@ -716,17 +707,66 @@ def table_to_blockmanager(options, table, categories=None,
table = _add_any_metadata(table, pandas_metadata)
table, index = _reconstruct_index(table, index_descriptors,
all_columns)
ext_columns_dtypes = _get_extension_dtypes(
all_columns, extension_columns)
else:
index = _pandas_api.pd.RangeIndex(table.num_rows)
if extension_columns:
raise ValueError("extension_columns not supported if there is "
"no pandas_metadata")
ext_columns_dtypes = {}

_check_data_column_metadata_consistency(all_columns)
blocks = _table_to_blocks(options, table, categories, extension_columns)
blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
columns = _deserialize_column_index(table, all_columns, column_indexes)

axes = [columns, index]
return BlockManager(blocks, axes)


def _get_extension_dtypes(columns, extension_columns):
"""
Based on the stored column pandas metadata, infer which columns
should be converted to a pandas extension dtype.
The 'numpy_type' field in the column metadata stores the string
representation of the original pandas dtype (and, despite its name,
not the 'pandas_type' field).
Based on this string representation, a pandas/numpy dtype is constructed
and then we can check if this dtype supports conversion from arrow.
"""
ext_columns = {}

# older pandas version that does not yet support extension dtypes
if _pandas_api.extension_dtype is None:
if extension_columns is not None:
raise ValueError("not supported")
return ext_columns

if extension_columns is None:
# infer the extension columns
for col_meta in columns:
name = col_meta['name']
pandas_dtype = _pandas_api.pandas_dtype(col_meta['numpy_type'])
if isinstance(pandas_dtype, _pandas_api.extension_dtype):
if hasattr(pandas_dtype, "__from_arrow__"):
ext_columns[name] = pandas_dtype
else:
# get the extension dtype for the specified columns
for name in extension_columns:
col_meta = [meta for meta in columns if meta['name'] == name][0]
pandas_dtype = _pandas_api.pandas_dtype(col_meta['numpy_type'])
if not isinstance(pandas_dtype, _pandas_api.extension_dtype):
raise ValueError("not an extension dtype")
if not hasattr(pandas_dtype, "__from_arrow__"):
raise ValueError("this column does not support to be "
"converted to extension dtype")
ext_columns[name] = pandas_dtype

return ext_columns


def _check_data_column_metadata_consistency(all_columns):
# It can never be the case in a released version of pyarrow that
# c['name'] is None *and* 'field_name' is not a key in the column metadata,
Expand Down Expand Up @@ -993,10 +1033,12 @@ def _table_to_blocks(options, block_table, categories, extension_columns):

# Convert an arrow table to Block from the internal pandas API
result = pa.lib.table_to_blocks(options, block_table, categories,
extension_columns)
list(extension_columns.keys()))

# Defined above
return [_reconstruct_block(item) for item in result]
columns = block_table.column_names
return [_reconstruct_block(item, columns, extension_columns)
for item in result]


def _flatten_single_level_multiindex(index):
Expand Down
5 changes: 3 additions & 2 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1262,10 +1262,11 @@ cdef class Table(_PandasConvertible):

return result

def _to_pandas(self, options, categories=None, ignore_metadata=False):
def _to_pandas(self, options, categories=None, extension_columns=None,
ignore_metadata=False):
from pyarrow.pandas_compat import table_to_blockmanager
mgr = table_to_blockmanager(
options, self, categories,
options, self, categories, extension_columns,
ignore_metadata=ignore_metadata)
return pandas_api.data_frame(mgr)

Expand Down
88 changes: 58 additions & 30 deletions python/pyarrow/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3202,45 +3202,73 @@ def test_array_protocol():
# Pandas ExtensionArray support


def _to_pandas(table, extension_columns=None):
# temporary test function as long as we have no public API to do this
from pyarrow.pandas_compat import table_to_blockmanager

options = dict(
pool=None,
strings_to_categorical=False,
zero_copy_only=False,
integer_object_nulls=False,
date_as_object=True,
use_threads=True,
deduplicate_objects=True)

mgr = table_to_blockmanager(
options, table, extension_columns=extension_columns)
return pd.DataFrame(mgr)
def _Int64Dtype__from_arrow__(self, array):
# for test only deal with single chunk for now
# TODO: do we require handling of chunked arrays in the protocol?
arr = array.chunk(0)
buflist = arr.buffers()
data = np.frombuffer(buflist[-1], dtype=arr.type.to_pandas_dtype())[
arr.offset:arr.offset + len(arr)]
bitmask = buflist[0]
if bitmask is not None:
mask = pa.BooleanArray.from_buffers(
pa.bool_(), len(arr), [None, bitmask])
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
int_arr = pd.arrays.IntegerArray(data.copy(), ~mask, copy=False)
return int_arr


def test_convert_to_extension_array():
if LooseVersion(pd.__version__) < '0.24.0':
pytest.skip(reason='IntegerArray only introduced in 0.24')
if LooseVersion(pd.__version__) < "0.26.0.dev":
pytest.skip("Conversion from IntegerArray to arrow not yet supported")

import pandas.core.internals as _int

table = pa.table({'a': [1, 2, 3], 'b': [2, 3, 4]})
# table converted from dataframe with extension types (so pandas_metadata
# has this information)
df = pd.DataFrame(
{'a': [1, 2, 3], 'b': pd.array([2, 3, 4], dtype='Int64'),
'c': [4, 5, 6]})
table = pa.table(df)

df = _to_pandas(table)
assert len(df._data.blocks) == 1
assert isinstance(df._data.blocks[0], _int.IntBlock)
# Int64Dtype has no __arrow_array__ -> use normal conversion
result = table.to_pandas()
assert len(result._data.blocks) == 1
assert isinstance(result._data.blocks[0], _int.IntBlock)

# raise error is explicitly asking for unsupported conversion
with pytest.raises(ValueError):
table.to_pandas(extension_columns=['b'])

try:
# patch pandas Int64Dtype to have the protocol method
pd.Int64Dtype.__from_arrow__ = _Int64Dtype__from_arrow__

# Int64Dtype is recognized -> convert to extension block by default
# for a proper roundtrip
result = table.to_pandas()
assert isinstance(result._data.blocks[0], _int.IntBlock)
assert isinstance(result._data.blocks[1], _int.ExtensionBlock)
tm.assert_frame_equal(result, df)

# explicitly specifying the column works as well
# TODO is this useful?
result = table.to_pandas(extension_columns=['b'])
assert isinstance(result._data.blocks[0], _int.IntBlock)
assert isinstance(result._data.blocks[1], _int.ExtensionBlock)
tm.assert_frame_equal(result, df)

df = _to_pandas(table, extension_columns=['b'])
assert isinstance(df._data.blocks[0], _int.IntBlock)
assert isinstance(df._data.blocks[1], _int.ExtensionBlock)
# test with missing values
df2 = pd.DataFrame({'a': pd.array([1, 2, None], dtype='Int64')})
table2 = pa.table(df2)
result = table2.to_pandas(extension_columns=['a'])
assert isinstance(result._data.blocks[0], _int.ExtensionBlock)
tm.assert_frame_equal(result, df2)

table = pa.table({'a': [1, 2, None]})
df = _to_pandas(table, extension_columns=['a'])
assert isinstance(df._data.blocks[0], _int.ExtensionBlock)
expected = pd.DataFrame({'a': pd.Series([1, 2, None], dtype='Int64')})
tm.assert_frame_equal(df, expected)
finally:
del pd.Int64Dtype.__from_arrow__


# ----------------------------------------------------------------------
Expand Down

0 comments on commit 2cd29d8

Please sign in to comment.