Skip to content

Commit

Permalink
ARROW-369: [Python] Convert multiple record batches at once to Pandas
Browse files Browse the repository at this point in the history
Modified Pandas adapter to handle columns with multiple chunks with `ConvertColumnToPandas`.  This modifies the pyarrow public API by adding a class `RecordBatchList` and static method `toPandas` which takes a list of Arrow RecordBatches and outputs a Pandas DataFrame.

Adds unit test in test_table.py to do the conversion for each column with typed specialization.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #216 from BryanCutler/multi-batch-toPandas-ARROW-369 and squashes the following commits:

b6c9986 [Bryan Cutler] fixed formatting
edf056e [Bryan Cutler] simplified with pyarrow.schema.Schema.equals
068bc1b [Bryan Cutler] Merge remote-tracking branch 'upstream/master' into multi-batch-toPandas-ARROW-369
da65345 [Bryan Cutler] fixed test case for schema checking
9edb0ba [Bryan Cutler] used auto keyword where some typecasting was done in ConvertValues
bd2a720 [Bryan Cutler] added testcase for schema not equal, disabled now
c3d7e8f [Bryan Cutler] Changed conversion to make Table from columns first, now conversion is now just a free function
3ee51e6 [Bryan Cutler] cleanup
398b18d [Bryan Cutler] Fixed case for Integer specialization without nulls
7b29a55 [Bryan Cutler] Initial working version of RecordBatch list to_pandas, need more tests and cleanup
  • Loading branch information
BryanCutler authored and wesm committed Dec 2, 2016
1 parent ebe7dc8 commit b5de9e5
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 79 deletions.
4 changes: 3 additions & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@
list_, struct, field,
DataType, Field, Schema, schema)

from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table,
from_pandas_dataframe)

from pyarrow.version import version as __version__
3 changes: 3 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CColumn(const shared_ptr[CField]& field,
const shared_ptr[CArray]& data)

CColumn(const shared_ptr[CField]& field,
const vector[shared_ptr[CArray]]& chunks)

int64_t length()
int64_t null_count()
const c_string& name()
Expand Down
47 changes: 47 additions & 0 deletions python/pyarrow/table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
import pyarrow.config

from pyarrow.array cimport Array, box_arrow_array
from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
from pyarrow.schema cimport box_data_type, box_schema

Expand Down Expand Up @@ -414,6 +415,52 @@ cdef class RecordBatch:
return result


def dataframe_from_batches(batches):
"""
Convert a list of Arrow RecordBatches to a pandas.DataFrame
Parameters
----------
batches: list of RecordBatch
RecordBatch list to be converted, schemas must be equal
"""

cdef:
vector[shared_ptr[CArray]] c_array_chunks
vector[shared_ptr[CColumn]] c_columns
shared_ptr[CTable] c_table
Array arr
Schema schema

import pandas as pd

schema = batches[0].schema

# check schemas are equal
if any((not schema.equals(other.schema) for other in batches[1:])):
raise ArrowException("Error converting list of RecordBatches to "
"DataFrame, not all schemas are equal")

cdef int K = batches[0].num_columns

# create chunked columns from the batches
c_columns.resize(K)
for i in range(K):
for batch in batches:
arr = batch[i]
c_array_chunks.push_back(arr.sp_array)
c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i),
c_array_chunks))
c_array_chunks.clear()

# create a Table from columns and convert to DataFrame
c_table.reset(new CTable('', schema.sp_schema, c_columns))
table = Table()
table.init(c_table)
return table.to_pandas()


cdef class Table:
"""
A collection of top-level named, equal length Arrow arrays.
Expand Down
35 changes: 35 additions & 0 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pandas.util.testing import assert_frame_equal
import pandas as pd
import pytest

import pyarrow as pa

Expand Down Expand Up @@ -50,6 +51,40 @@ def test_recordbatch_from_to_pandas():
assert_frame_equal(data, result)


def test_recordbatchlist_to_pandas():
data1 = pd.DataFrame({
'c1': np.array([1, 1, 2], dtype='uint32'),
'c2': np.array([1.0, 2.0, 3.0], dtype='float64'),
'c3': [True, None, False],
'c4': ['foo', 'bar', None]
})

data2 = pd.DataFrame({
'c1': np.array([3, 5], dtype='uint32'),
'c2': np.array([4.0, 5.0], dtype='float64'),
'c3': [True, True],
'c4': ['baz', 'qux']
})

batch1 = pa.RecordBatch.from_pandas(data1)
batch2 = pa.RecordBatch.from_pandas(data2)

result = pa.dataframe_from_batches([batch1, batch2])
data = pd.concat([data1, data2], ignore_index=True)
assert_frame_equal(data, result)


def test_recordbatchlist_schema_equals():
data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')})
data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')})

batch1 = pa.RecordBatch.from_pandas(data1)
batch2 = pa.RecordBatch.from_pandas(data2)

with pytest.raises(pa.ArrowException):
pa.dataframe_from_batches([batch1, batch2])


def test_table_basics():
data = [
pa.from_pylist(range(5)),
Expand Down
Loading

0 comments on commit b5de9e5

Please sign in to comment.