diff --git a/python/src/nanoarrow/array.py b/python/src/nanoarrow/array.py index 43ff0dc23..5cfa65b94 100644 --- a/python/src/nanoarrow/array.py +++ b/python/src/nanoarrow/array.py @@ -17,7 +17,7 @@ import itertools from functools import cached_property -from typing import Iterable, Tuple +from typing import Iterable, List, Sequence, Tuple from nanoarrow._lib import ( DEVICE_CPU, @@ -32,6 +32,7 @@ from nanoarrow.c_schema import c_schema from nanoarrow.iterator import iter_array_views, iter_py, iter_tuples from nanoarrow.schema import Schema, _schema_repr +from nanoarrow.visitor import to_columns, to_pylist from nanoarrow import _repr_utils @@ -344,6 +345,42 @@ def iter_chunk_views(self) -> Iterable[CArrayView]: """ return iter_array_views(self) + def to_pylist(self) -> List: + """Convert this Array to a ``list()` of Python objects + + Computes an identical value to list(:meth:`iter_py`) but can be several + times faster. + + Examples + -------- + + >>> import nanoarrow as na + >>> array = na.Array([1, 2, 3], na.int32()) + >>> array.to_pylist() + [1, 2, 3] + """ + return to_pylist(self) + + def to_columns(self) -> Tuple[str, Sequence]: + """Convert this Array to a ``list()` of sequences + + Converts a stream of struct arrays into its column-wise representation + such that each column is either a contiguous buffer or a ``list()``. + + Examples + -------- + + >>> import nanoarrow as na + >>> import pyarrow as pa + >>> array = na.Array(pa.record_batch([pa.array([1, 2, 3])], names=["col1"])) + >>> names, columns = array.to_columns() + >>> names + ['col1'] + >>> columns + [[1, 2, 3]] + """ + return to_columns(self) + @property def n_children(self) -> int: """Get the number of children for an Array of this type. diff --git a/python/src/nanoarrow/array_stream.py b/python/src/nanoarrow/array_stream.py index deaaece2e..ccd4669c0 100644 --- a/python/src/nanoarrow/array_stream.py +++ b/python/src/nanoarrow/array_stream.py @@ -16,7 +16,7 @@ # under the License. from functools import cached_property -from typing import Iterable, Tuple +from typing import Iterable, List, Sequence, Tuple from nanoarrow._lib import CMaterializedArrayStream from nanoarrow._repr_utils import make_class_label @@ -24,6 +24,7 @@ from nanoarrow.c_array_stream import c_array_stream from nanoarrow.iterator import iter_py, iter_tuples from nanoarrow.schema import Schema, _schema_repr +from nanoarrow.visitor import to_columns, to_pylist class ArrayStream: @@ -198,6 +199,43 @@ def iter_tuples(self) -> Iterable[Tuple]: """ return iter_tuples(self) + def to_pylist(self) -> List: + """Convert this Array to a ``list()` of Python objects + + Computes an identical value to list(:meth:`iter_py`) but can be several + times faster. + + Examples + -------- + + >>> import nanoarrow as na + >>> stream = na.ArrayStream([1, 2, 3], na.int32()) + >>> stream.to_pylist() + [1, 2, 3] + """ + return to_pylist(self) + + def to_columns(self) -> Tuple[str, Sequence]: + """Convert this Array to a ``list()` of sequences + + Converts a stream of struct arrays into its column-wise representation + such that each column is either a contiguous buffer or a ``list()``. + + Examples + -------- + + >>> import nanoarrow as na + >>> import pyarrow as pa + >>> batch = pa.record_batch([pa.array([1, 2, 3])], names=["col1"]) + >>> stream = na.ArrayStream(batch) + >>> names, columns = stream.to_columns() + >>> names + ['col1'] + >>> columns + [[1, 2, 3]] + """ + return to_columns(self) + def __repr__(self) -> str: cls = make_class_label(self, "nanoarrow") schema_repr = _schema_repr(self.schema, prefix="", include_metadata=False) diff --git a/python/src/nanoarrow/iterator.py b/python/src/nanoarrow/iterator.py index 5f85724d8..2364ea826 100644 --- a/python/src/nanoarrow/iterator.py +++ b/python/src/nanoarrow/iterator.py @@ -23,6 +23,7 @@ from nanoarrow._lib import CArrayView, CArrowType from nanoarrow.c_array_stream import c_array_stream from nanoarrow.c_schema import c_schema, c_schema_view +from nanoarrow.schema import Schema def iter_py(obj, schema=None) -> Iterable: @@ -130,47 +131,22 @@ class UnregisteredExtensionWarning(UserWarning): class ArrayViewBaseIterator: - """Base class for iterators that use an internal ArrowArrayView + """Base class for iterators and visitors that use an internal ArrowArrayView as the basis for conversion to Python objects. Intended for internal use. """ - @classmethod - def get_iterator(cls, obj, schema=None): - with c_array_stream(obj, schema=schema) as stream: - iterator = cls(stream._get_cached_schema()) - for array in stream: - iterator._set_array(array) - yield from iterator._iter_chunk(0, len(array)) - - def __init__(self, schema, *, _array_view=None): + def __init__(self, schema, *, array_view=None): self._schema = c_schema(schema) self._schema_view = c_schema_view(schema) - if _array_view is None: + if array_view is None: self._array_view = CArrayView.from_schema(self._schema) else: - self._array_view = _array_view - - self._children = list( - map(self._make_child, self._schema.children, self._array_view.children) - ) - - if self._schema.dictionary is None: - self._dictionary = None - else: - self._dictionary = self._make_child( - self._schema.dictionary, self._array_view.dictionary - ) - - def _make_child(self, schema, array_view): - return type(self)(schema, _array_view=array_view) - - def _iter_chunk(self, offset, length) -> Iterable: - yield self._array_view + self._array_view = array_view @cached_property - def _child_names(self): - return [child.name for child in self._schema.children] + def schema(self) -> Schema: + return Schema(self._schema) @cached_property def _object_label(self): @@ -199,7 +175,41 @@ class PyIterator(ArrayViewBaseIterator): Intended for internal use. """ + @classmethod + def get_iterator(cls, obj, schema=None): + with c_array_stream(obj, schema=schema) as stream: + iterator = cls(stream._get_cached_schema()) + for array in stream: + iterator._set_array(array) + yield from iterator + + def __init__(self, schema, *, array_view=None): + super().__init__(schema, array_view=array_view) + + self._children = list( + map(self._make_child, self._schema.children, self._array_view.children) + ) + + if self._schema.dictionary is None: + self._dictionary = None + else: + self._dictionary = self._make_child( + self._schema.dictionary, self._array_view.dictionary + ) + + def _make_child(self, schema, array_view): + return type(self)(schema, array_view=array_view) + + @cached_property + def _child_names(self): + return [child.name for child in self._schema.children] + + def __iter__(self): + """Iterate over all elements in the current chunk""" + return self._iter_chunk(0, len(self._array_view)) + def _iter_chunk(self, offset, length): + """Iterate over all elements in a slice of the current chunk""" # Check for an extension type first since this isn't reflected by # self._schema_view.type_id. Currently we just return the storage # iterator with a warning for extension types. @@ -480,8 +490,8 @@ class RowTupleIterator(PyIterator): Intended for internal use. """ - def __init__(self, schema, *, _array_view=None): - super().__init__(schema, _array_view=_array_view) + def __init__(self, schema, *, array_view=None): + super().__init__(schema, array_view=array_view) if self._schema_view.type != "struct": raise TypeError( "RowTupleIterator can only iterate over struct arrays " @@ -489,7 +499,7 @@ def __init__(self, schema, *, _array_view=None): ) def _make_child(self, schema, array_view): - return PyIterator(schema, _array_view=array_view) + return PyIterator(schema, array_view=array_view) def _iter_chunk(self, offset, length): return self._struct_tuple_iter(offset, length) diff --git a/python/src/nanoarrow/visitor.py b/python/src/nanoarrow/visitor.py new file mode 100644 index 000000000..53bc06916 --- /dev/null +++ b/python/src/nanoarrow/visitor.py @@ -0,0 +1,179 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, List, Sequence, Tuple, Union + +from nanoarrow._lib import CArrayView +from nanoarrow.c_array_stream import c_array_stream +from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator +from nanoarrow.schema import Type + + +def to_pylist(obj, schema=None) -> List: + """Convert ``obj`` to a ``list()` of Python objects + + Computes an identical value to ``list(iterator.iter_py())`` but is several + times faster. + + Paramters + --------- + obj : array stream-like + An array-like or array stream-like object as sanitized by + :func:`c_array_stream`. + schema : schema-like, optional + An optional schema, passed to :func:`c_array_stream`. + + Examples + -------- + + >>> import nanoarrow as na + >>> from nanoarrow import visitor + >>> array = na.c_array([1, 2, 3], na.int32()) + >>> visitor.to_pylist(array) + [1, 2, 3] + """ + return ListBuilder.visit(obj, schema) + + +def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]: + """Convert ``obj`` to a ``list()` of sequences + + Converts a stream of struct arrays into its column-wise representation + such that each column is either a contiguous buffer or a ``list()``. + + Paramters + --------- + obj : array stream-like + An array-like or array stream-like object as sanitized by + :func:`c_array_stream`. + schema : schema-like, optional + An optional schema, passed to :func:`c_array_stream`. + + Examples + -------- + + >>> import nanoarrow as na + >>> from nanoarrow import visitor + >>> import pyarrow as pa + >>> array = pa.record_batch([pa.array([1, 2, 3])], names=["col1"]) + >>> names, columns = visitor.to_columns(array) + >>> names + ['col1'] + >>> columns + [[1, 2, 3]] + """ + return ColumnsBuilder.visit(obj, schema) + + +class ArrayStreamVisitor(ArrayViewBaseIterator): + """Compute a value from one or more arrays in an ArrowArrayStream + + This class supports a (currently internal) pattern for building + output from a zero or more arrays in a stream. + + """ + + @classmethod + def visit(cls, obj, schema=None, total_elements=None, **kwargs): + """Visit all chunks in ``obj`` as a :func:`c_array_stream`.""" + + if total_elements is None and hasattr(obj, "__len__"): + total_elements = len(obj) + + with c_array_stream(obj, schema=schema) as stream: + visitor = cls(stream._get_cached_schema(), **kwargs) + visitor.begin(total_elements) + + visitor_set_array = visitor._set_array + visit_chunk_view = visitor.visit_chunk_view + array_view = visitor._array_view + + for array in stream: + visitor_set_array(array) + visit_chunk_view(array_view) + + return visitor.finish() + + def begin(self, total_elements: Union[int, None] = None): + """Called after the schema has been resolved but before any + chunks have been visited. If the total number of elements + (i.e., the sum of all chunk lengths) is known, it is provided here. + """ + pass + + def visit_chunk_view(self, array_view: CArrayView) -> None: + """Called exactly one for each chunk seen.""" + pass + + def finish(self) -> Any: + """Called exactly once after all chunks have been visited.""" + return None + + +class ListBuilder(ArrayStreamVisitor): + def __init__(self, schema, *, iterator_cls=PyIterator, array_view=None): + super().__init__(schema, array_view=array_view) + + # Ensure that self._iterator._array_view is self._array_view + self._iterator = iterator_cls(schema, array_view=self._array_view) + + def begin(self, total_elements: Union[int, None] = None): + self._lst = [] + + def visit_chunk_view(self, array_view: CArrayView): + # The constructor here ensured that self._iterator._array_view + # is populated when self._set_array() is called. + self._lst.extend(self._iterator) + + def finish(self) -> List: + return self._lst + + +class ColumnsBuilder(ArrayStreamVisitor): + def __init__(self, schema, *, array_view=None): + super().__init__(schema, array_view=array_view) + + if self.schema.type != Type.STRUCT: + raise ValueError("ColumnsBuilder can only be used on a struct array") + + # Resolve the appropriate visitor for each column + self._child_visitors = [] + for child_schema, child_array_view in zip( + self._schema.children, self._array_view.children + ): + self._child_visitors.append( + self._resolve_child_visitor(child_schema, child_array_view) + ) + + def _resolve_child_visitor(self, child_schema, child_array_view): + # TODO: Resolve more efficient column builders for single-buffer types + return ListBuilder(child_schema, array_view=child_array_view) + + def begin(self, total_elements: Union[int, None] = None) -> None: + for child_visitor in self._child_visitors: + child_visitor.begin(total_elements) + + def visit_chunk_view(self, array_view: CArrayView) -> Any: + for child_visitor, child_array_view in zip( + self._child_visitors, array_view.children + ): + child_visitor.visit_chunk_view(child_array_view) + + def finish(self) -> Tuple[List[str], List[Sequence]]: + return [v.schema.name for v in self._child_visitors], [ + v.finish() for v in self._child_visitors + ] diff --git a/python/tests/test_array.py b/python/tests/test_array.py index a6a79eeca..55ffe1143 100644 --- a/python/tests/test_array.py +++ b/python/tests/test_array.py @@ -48,13 +48,13 @@ def test_array_from_chunks(): array = na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32()) assert array.schema.type == na.Type.INT32 assert array.n_chunks == 2 - assert list(array.iter_py()) == [1, 2, 3, 4, 5, 6] + assert array.to_pylist() == [1, 2, 3, 4, 5, 6] # Check with schema inferred from first chunk array = na.Array.from_chunks(array.iter_chunks()) assert array.schema.type == na.Type.INT32 assert array.n_chunks == 2 - assert list(array.iter_py()) == [1, 2, 3, 4, 5, 6] + assert array.to_pylist() == [1, 2, 3, 4, 5, 6] # Check empty array = na.Array.from_chunks([], na.int32()) @@ -75,7 +75,7 @@ def test_array_from_chunks_validate(): # ...but that one can opt out array = na.Array.from_chunks(chunks, validate=False) - assert list(array.iter_py()) == [1, 2, 3, 1, 2, 3] + assert array.to_pylist() == [1, 2, 3, 1, 2, 3] def test_array_empty(): @@ -96,7 +96,7 @@ def test_array_empty(): with pytest.raises(IndexError): array.chunk(0) - assert list(array.iter_py()) == [] + assert array.to_pylist() == [] assert list(array.iter_scalar()) == [] with pytest.raises(IndexError): array[0] @@ -148,6 +148,9 @@ def test_array_contiguous(): for py_item, item in zip([1, 2, 3], array.iter_py()): assert item == py_item + # Python objects by to_pylist() + assert array.to_pylist() == list(array.iter_py()) + with na.c_array_stream(array) as stream: arrays = list(stream) assert len(arrays) == 1 @@ -194,6 +197,9 @@ def test_array_chunked(): for py_item, item in zip([1, 2, 3], array.iter_py()): assert item == py_item + # Python objects by to_pylist() + assert array.to_pylist() == list(array.iter_py()) + with na.c_array_stream(array) as stream: arrays = list(stream) assert len(arrays) == 2 @@ -216,7 +222,7 @@ def test_array_children(): assert array.n_children == 100 assert array.child(0).schema.type == na.Type.INT32 assert array.child(0).n_chunks == 2 - assert list(array.child(0).iter_py()) == [123456, 123456] + assert array.child(0).to_pylist() == [123456, 123456] children = list(array.iter_children()) assert len(children) == array.n_children @@ -225,6 +231,10 @@ def test_array_children(): assert len(tuples) == 2 assert len(tuples[0]) == 100 + names, columns = array.to_columns() + assert names == [f"col{i}" for i in range(100)] + assert all(len(col) == len(array) for col in columns) + def test_scalar_to_array(): array = na.Array([123456, 7890], na.int32()) diff --git a/python/tests/test_array_stream.py b/python/tests/test_array_stream.py index 035949f05..8a98f60f7 100644 --- a/python/tests/test_array_stream.py +++ b/python/tests/test_array_stream.py @@ -36,18 +36,68 @@ def test_array_stream_iter(): next(stream_iter) +def test_array_stream_iter_chunks(): + stream = na.ArrayStream([1, 2, 3], na.int32()) + chunks = list(stream.iter_chunks()) + assert len(chunks) == 1 + assert chunks[0].to_pylist() == [1, 2, 3] + + +def test_array_stream_iter_py(): + stream = na.ArrayStream([1, 2, 3], na.int32()) + assert list(stream.iter_py()) == [1, 2, 3] + + +def test_array_stream_iter_tuples(): + c_array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.string()}), + length=3, + buffers=[None], + children=[ + na.c_array([1, 2, 3], na.int32()), + na.c_array(["a", "b", "c"], na.string()), + ], + ) + + stream = na.ArrayStream(c_array) + assert list(stream.iter_tuples()) == [(1, "a"), (2, "b"), (3, "c")] + + +def test_array_stream_to_pylist(): + stream = na.ArrayStream([1, 2, 3], na.int32()) + assert stream.to_pylist() == [1, 2, 3] + + +def test_array_stream_to_columns(): + c_array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.string()}), + length=3, + buffers=[None], + children=[ + na.c_array([1, 2, 3], na.int32()), + na.c_array(["a", "b", "c"], na.string()), + ], + ) + + stream = na.ArrayStream(c_array) + names, columns = stream.to_columns() + assert names == ["col1", "col2"] + assert columns[0] == [1, 2, 3] + assert columns[1] == ["a", "b", "c"] + + def test_array_stream_read_all(): stream = na.ArrayStream([1, 2, 3], na.int32()) array = stream.read_all() assert array.schema.type == na.Type.INT32 - assert list(array.iter_py()) == [1, 2, 3] + assert array.to_pylist() == [1, 2, 3] def test_array_stream_read_next(): stream = na.ArrayStream([1, 2, 3], na.int32()) array = stream.read_next() assert array.schema.type == na.Type.INT32 - assert list(array.iter_py()) == [1, 2, 3] + assert array.to_pylist() == [1, 2, 3] with pytest.raises(StopIteration): stream.read_next() diff --git a/python/tests/test_visitor.py b/python/tests/test_visitor.py new file mode 100644 index 000000000..39c3c4d05 --- /dev/null +++ b/python/tests/test_visitor.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest + +import nanoarrow as na +from nanoarrow import visitor + + +def test_to_pylist(): + array = na.c_array([1, 2, 3], na.int32()) + assert visitor.to_pylist(array) == [1, 2, 3] + + +def test_to_columms(): + array = na.c_array_from_buffers( + na.struct({"col1": na.int32(), "col2": na.bool_(), "col3": na.string()}), + length=3, + buffers=[None], + children=[ + na.c_array([1, 2, 3], na.int32()), + na.c_array([1, 0, 1], na.bool_()), + na.c_array(["abc", "def", "ghi"], na.string()), + ], + ) + + names, columns = visitor.to_columns(array) + assert names == ["col1", "col2", "col3"] + assert columns[0] == [1, 2, 3] + assert columns[1] == [True, False, True] + assert columns[2] == ["abc", "def", "ghi"] + + with pytest.raises(ValueError, match="can only be used on a struct array"): + visitor.to_columns([], na.int32())