Skip to content

Commit

Permalink
feat(python): Add visitor pattern + builders for column sequences (#454)
Browse files Browse the repository at this point in the history
Assembling columns from chunked things is rather difficult to do and is
a valid thing that somebody might want to assemble from Arrow data. This
PR adds a "visitor" pattern that can be extended to build "column"s,
which are currently just `list()`s. Before trimming down this PR to a
managable set of changes, I also implemented the visitor that
concatenates data buffers for single data buffer types (
https://gist.github.com/paleolimbot/17263e38b5d97c770e44d33b11181eaf ),
which will be needed for `to_columns()` to be used in any kind of
serious way.

To support the "visitor" pattern, I moved some of the
`PyIterator`-specific pieces into the `PyIterator` so that the visitor
can re-use the relevant pieces of `ArrayViewBaseIterator`. This pattern
also solves one of the problems I had when attempting a "repr" iterator,
which is that I was trying to build something rather than iterate over
it.

```python
import nanoarrow as na
import pandas as pd
from nanoarrow import visitor

url = "https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
array = na.ArrayStream.from_url(url).read_all()

# to_columns() doesn't (and won't) produce anything numpy or pandas-related
names, columns = visitor.to_columns(array)

# ..but lets data frames be built rather compactly
pd.DataFrame({k: v for k, v in zip(names, columns)})
```
  • Loading branch information
paleolimbot authored May 13, 2024
1 parent 197f117 commit 490b980
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 43 deletions.
39 changes: 38 additions & 1 deletion python/src/nanoarrow/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import itertools
from functools import cached_property
from typing import Iterable, Tuple
from typing import Iterable, List, Sequence, Tuple

from nanoarrow._lib import (
DEVICE_CPU,
Expand All @@ -32,6 +32,7 @@
from nanoarrow.c_schema import c_schema
from nanoarrow.iterator import iter_array_views, iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
from nanoarrow.visitor import to_columns, to_pylist

from nanoarrow import _repr_utils

Expand Down Expand Up @@ -344,6 +345,42 @@ def iter_chunk_views(self) -> Iterable[CArrayView]:
"""
return iter_array_views(self)

def to_pylist(self) -> List:
"""Convert this Array to a ``list()` of Python objects
Computes an identical value to list(:meth:`iter_py`) but can be several
times faster.
Examples
--------
>>> import nanoarrow as na
>>> array = na.Array([1, 2, 3], na.int32())
>>> array.to_pylist()
[1, 2, 3]
"""
return to_pylist(self)

def to_columns(self) -> Tuple[str, Sequence]:
"""Convert this Array to a ``list()` of sequences
Converts a stream of struct arrays into its column-wise representation
such that each column is either a contiguous buffer or a ``list()``.
Examples
--------
>>> import nanoarrow as na
>>> import pyarrow as pa
>>> array = na.Array(pa.record_batch([pa.array([1, 2, 3])], names=["col1"]))
>>> names, columns = array.to_columns()
>>> names
['col1']
>>> columns
[[1, 2, 3]]
"""
return to_columns(self)

@property
def n_children(self) -> int:
"""Get the number of children for an Array of this type.
Expand Down
40 changes: 39 additions & 1 deletion python/src/nanoarrow/array_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
# under the License.

from functools import cached_property
from typing import Iterable, Tuple
from typing import Iterable, List, Sequence, Tuple

from nanoarrow._lib import CMaterializedArrayStream
from nanoarrow._repr_utils import make_class_label
from nanoarrow.array import Array
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.iterator import iter_py, iter_tuples
from nanoarrow.schema import Schema, _schema_repr
from nanoarrow.visitor import to_columns, to_pylist


class ArrayStream:
Expand Down Expand Up @@ -198,6 +199,43 @@ def iter_tuples(self) -> Iterable[Tuple]:
"""
return iter_tuples(self)

def to_pylist(self) -> List:
"""Convert this Array to a ``list()` of Python objects
Computes an identical value to list(:meth:`iter_py`) but can be several
times faster.
Examples
--------
>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> stream.to_pylist()
[1, 2, 3]
"""
return to_pylist(self)

def to_columns(self) -> Tuple[str, Sequence]:
"""Convert this Array to a ``list()` of sequences
Converts a stream of struct arrays into its column-wise representation
such that each column is either a contiguous buffer or a ``list()``.
Examples
--------
>>> import nanoarrow as na
>>> import pyarrow as pa
>>> batch = pa.record_batch([pa.array([1, 2, 3])], names=["col1"])
>>> stream = na.ArrayStream(batch)
>>> names, columns = stream.to_columns()
>>> names
['col1']
>>> columns
[[1, 2, 3]]
"""
return to_columns(self)

def __repr__(self) -> str:
cls = make_class_label(self, "nanoarrow")
schema_repr = _schema_repr(self.schema, prefix="", include_metadata=False)
Expand Down
78 changes: 44 additions & 34 deletions python/src/nanoarrow/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from nanoarrow._lib import CArrayView, CArrowType
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema, c_schema_view
from nanoarrow.schema import Schema


def iter_py(obj, schema=None) -> Iterable:
Expand Down Expand Up @@ -130,47 +131,22 @@ class UnregisteredExtensionWarning(UserWarning):


class ArrayViewBaseIterator:
"""Base class for iterators that use an internal ArrowArrayView
"""Base class for iterators and visitors that use an internal ArrowArrayView
as the basis for conversion to Python objects. Intended for internal use.
"""

@classmethod
def get_iterator(cls, obj, schema=None):
with c_array_stream(obj, schema=schema) as stream:
iterator = cls(stream._get_cached_schema())
for array in stream:
iterator._set_array(array)
yield from iterator._iter_chunk(0, len(array))

def __init__(self, schema, *, _array_view=None):
def __init__(self, schema, *, array_view=None):
self._schema = c_schema(schema)
self._schema_view = c_schema_view(schema)

if _array_view is None:
if array_view is None:
self._array_view = CArrayView.from_schema(self._schema)
else:
self._array_view = _array_view

self._children = list(
map(self._make_child, self._schema.children, self._array_view.children)
)

if self._schema.dictionary is None:
self._dictionary = None
else:
self._dictionary = self._make_child(
self._schema.dictionary, self._array_view.dictionary
)

def _make_child(self, schema, array_view):
return type(self)(schema, _array_view=array_view)

def _iter_chunk(self, offset, length) -> Iterable:
yield self._array_view
self._array_view = array_view

@cached_property
def _child_names(self):
return [child.name for child in self._schema.children]
def schema(self) -> Schema:
return Schema(self._schema)

@cached_property
def _object_label(self):
Expand Down Expand Up @@ -199,7 +175,41 @@ class PyIterator(ArrayViewBaseIterator):
Intended for internal use.
"""

@classmethod
def get_iterator(cls, obj, schema=None):
with c_array_stream(obj, schema=schema) as stream:
iterator = cls(stream._get_cached_schema())
for array in stream:
iterator._set_array(array)
yield from iterator

def __init__(self, schema, *, array_view=None):
super().__init__(schema, array_view=array_view)

self._children = list(
map(self._make_child, self._schema.children, self._array_view.children)
)

if self._schema.dictionary is None:
self._dictionary = None
else:
self._dictionary = self._make_child(
self._schema.dictionary, self._array_view.dictionary
)

def _make_child(self, schema, array_view):
return type(self)(schema, array_view=array_view)

@cached_property
def _child_names(self):
return [child.name for child in self._schema.children]

def __iter__(self):
"""Iterate over all elements in the current chunk"""
return self._iter_chunk(0, len(self._array_view))

def _iter_chunk(self, offset, length):
"""Iterate over all elements in a slice of the current chunk"""
# Check for an extension type first since this isn't reflected by
# self._schema_view.type_id. Currently we just return the storage
# iterator with a warning for extension types.
Expand Down Expand Up @@ -480,16 +490,16 @@ class RowTupleIterator(PyIterator):
Intended for internal use.
"""

def __init__(self, schema, *, _array_view=None):
super().__init__(schema, _array_view=_array_view)
def __init__(self, schema, *, array_view=None):
super().__init__(schema, array_view=array_view)
if self._schema_view.type != "struct":
raise TypeError(
"RowTupleIterator can only iterate over struct arrays "
f"(got '{self._schema_view.type}')"
)

def _make_child(self, schema, array_view):
return PyIterator(schema, _array_view=array_view)
return PyIterator(schema, array_view=array_view)

def _iter_chunk(self, offset, length):
return self._struct_tuple_iter(offset, length)
Expand Down
Loading

0 comments on commit 490b980

Please sign in to comment.