Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add column-wise buffer builder #464

Merged
merged 42 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e924e9d
start on column builders
paleolimbot May 13, 2024
2049e63
maybe working tests
paleolimbot May 13, 2024
4526c0b
prototype null handling
paleolimbot May 13, 2024
f611ee8
some null handling
paleolimbot May 13, 2024
4fd4c2d
first stab at dispatch
paleolimbot May 13, 2024
3792f4c
nulls in columns
paleolimbot May 13, 2024
68fa3ff
fix doctests
paleolimbot May 14, 2024
b9be69a
Apply suggestions from code review
paleolimbot May 14, 2024
dd6c4f5
remove masked array handling
paleolimbot May 14, 2024
a427fca
document null handlers
paleolimbot May 14, 2024
62d5dd3
use base class
paleolimbot May 14, 2024
b474b29
fix doctests
paleolimbot May 14, 2024
14e3067
better doc
paleolimbot May 14, 2024
b249e99
test non-nullable dispatch
paleolimbot May 14, 2024
eb1aacc
use two columns in docs
paleolimbot May 16, 2024
ae44f7d
rename and export nulls_debug
paleolimbot May 16, 2024
67a7b44
test null error
paleolimbot May 16, 2024
6c1cde8
clarify null_count=-1
paleolimbot May 16, 2024
db7400c
Apply suggestions from code review
paleolimbot May 16, 2024
9ccd5a5
remove outdate param def
paleolimbot May 16, 2024
a7d5b90
improve handle_nulls docs
paleolimbot May 16, 2024
60f101c
use None as sentinel for "all valid"
paleolimbot May 16, 2024
b734af0
document is_valid as None
paleolimbot May 16, 2024
561f8ca
clarify sentinel handling
paleolimbot May 16, 2024
63adc2b
fix sentinel handler
paleolimbot May 16, 2024
beb1ce6
fix test
paleolimbot May 16, 2024
8899fcf
get names from the schema andn ot from the child visitors
paleolimbot May 16, 2024
8180f31
Update python/src/nanoarrow/visitor.py
paleolimbot May 16, 2024
c2d6813
Update python/src/nanoarrow/visitor.py
paleolimbot May 16, 2024
9a57453
add comment
paleolimbot May 16, 2024
7a2fb86
Builder -> Converter
paleolimbot May 17, 2024
408e2be
to_column -> convert
paleolimbot May 17, 2024
5ee1fc6
change names
paleolimbot May 17, 2024
e1d1690
remove usage of "column"
paleolimbot May 17, 2024
b881976
force kwarg for nulls, mark experimental
paleolimbot May 17, 2024
62c85fd
fix doctests
paleolimbot May 17, 2024
d2730a2
less confusing converter names
paleolimbot May 17, 2024
ca0d5eb
convert -> to_pysequence
paleolimbot May 17, 2024
78e666b
a few more refs
paleolimbot May 17, 2024
4c9d2d2
use arrayviewvisitor instead of arraystream visitor
paleolimbot May 17, 2024
3412b0f
fix some references
paleolimbot May 17, 2024
9d27bd1
one more rename
paleolimbot May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions python/src/nanoarrow/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,11 @@ cdef class CSchemaView:
if self.extension_name or self._schema_view.type != self._schema_view.storage_type:
return None

# String/binary types do not have format strings as far as the Python
# buffer protocol is concerned
if self.layout.n_buffers != 2:
return None

cdef char out[128]
cdef int element_size_bits = 0
if self._schema_view.type == NANOARROW_TYPE_FIXED_SIZE_BINARY:
Expand Down Expand Up @@ -1632,6 +1637,22 @@ cdef class CArrayView:

@property
def null_count(self):
if self._ptr.null_count != -1:
return self._ptr.null_count

cdef ArrowBufferType buffer_type = self._ptr.layout.buffer_type[0]
cdef uint8_t* validity_bits = self._ptr.buffer_views[0].data.as_uint8

if buffer_type != NANOARROW_BUFFER_TYPE_VALIDITY:
self._ptr.null_count = 0
elif validity_bits == NULL:
self._ptr.null_count = 0
elif self._device is DEVICE_CPU:
self._ptr.null_count = (
self._ptr.length -
ArrowBitCountSet(validity_bits, self.offset, self.length)
)

return self._ptr.null_count

@property
Expand Down Expand Up @@ -1869,7 +1890,7 @@ cdef class CBufferView:
return self._format.decode("UTF-8")

@property
def item_size(self):
def itemsize(self):
return self._strides

def __len__(self):
Expand Down Expand Up @@ -1957,7 +1978,7 @@ cdef class CBufferView:

cdef int64_t c_offset = offset
cdef int64_t c_length = length
cdef int64_t c_item_size = self.item_size
cdef int64_t c_item_size = self.itemsize
cdef int64_t c_dest_offset = dest_offset
self._check_copy_into_bounds(&buffer, c_offset, c_length, dest_offset, c_item_size)

Expand Down Expand Up @@ -2010,7 +2031,7 @@ cdef class CBufferView:
if length is None:
length = self.n_elements

cdef int64_t bytes_to_copy = length * self.item_size
cdef int64_t bytes_to_copy = length * self.itemsize
out = CBufferBuilder().set_data_type(self.data_type_id)
out.reserve_bytes(bytes_to_copy)
self.copy_into(out, offset, length)
Expand Down Expand Up @@ -2224,9 +2245,9 @@ cdef class CBuffer:
return self._element_size_bits

@property
def item_size(self):
def itemsize(self):
self._assert_valid()
return self._view.item_size
return self._view.itemsize

@property
def format(self):
Expand Down Expand Up @@ -2339,6 +2360,13 @@ cdef class CBufferBuilder:
"""The number of bytes that have been written to this buffer"""
return self._buffer.size_bytes

@property
def itemsize(self):
return self._buffer.itemsize

def __len__(self):
return self._buffer.size_bytes // self.itemsize

@property
def capacity_bytes(self):
"""The number of bytes allocated in the underlying buffer"""
Expand Down
2 changes: 1 addition & 1 deletion python/src/nanoarrow/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def to_columns(self) -> Tuple[str, Sequence]:
>>> names
['col1']
>>> columns
[[1, 2, 3]]
[nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3)]
"""
return to_columns(self)

Expand Down
2 changes: 1 addition & 1 deletion python/src/nanoarrow/array_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def to_columns(self) -> Tuple[str, Sequence]:
>>> names
['col1']
>>> columns
[[1, 2, 3]]
[nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3)]
"""
return to_columns(self)

Expand Down
6 changes: 1 addition & 5 deletions python/src/nanoarrow/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,7 @@ def _object_label(self):
return f"<unnamed {self._schema_view.type}>"

def _contains_nulls(self):
return (
self._schema_view.nullable
and len(self._array_view.buffer(0))
and self._array_view.null_count != 0
)
return self._schema_view.nullable and self._array_view.null_count != 0

def _set_array(self, array):
self._array_view._set_array(array)
Expand Down
206 changes: 196 additions & 10 deletions python/src/nanoarrow/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any, List, Sequence, Tuple, Union
from typing import Any, Callable, List, Sequence, Tuple, Union

from nanoarrow._lib import CArrayView
from nanoarrow._lib import CArrayView, CArrowType, CBuffer, CBufferBuilder
from nanoarrow.c_array_stream import c_array_stream
from nanoarrow.c_schema import c_schema_view
from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator
from nanoarrow.schema import Type

Expand Down Expand Up @@ -49,7 +50,7 @@ def to_pylist(obj, schema=None) -> List:
return ListBuilder.visit(obj, schema)


def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]:
def to_columns(obj, schema=None, handle_nulls=None) -> Tuple[List[str], List[Sequence]]:
"""Convert ``obj`` to a ``list()` of sequences

Converts a stream of struct arrays into its column-wise representation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the line below, but can you then clarify when you get a buffer or when a list?

Expand All @@ -74,9 +75,60 @@ def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]:
>>> names
['col1']
>>> columns
[[1, 2, 3]]
[nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3)]
"""
return ColumnsBuilder.visit(obj, schema)
return ColumnsBuilder.visit(obj, schema, handle_nulls=handle_nulls)


def nulls_forbid() -> Callable[[CBuffer, Sequence], Sequence]:
def handle(is_valid, data):
if len(is_valid) > 0:
raise ValueError("Null present with null_handler=nulls_forbid()")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not because there is a validity buffer that it includes any nulls? Or did you ensure that before? (eg not convert the bitmap to bytemap if the bitmap is all set?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes...the bool buffer indicating validity is never allocated unless a null is actually present (and is None otherwise). I added this to the handle_nulls documentation!


return data

return handle


def nulls_debug() -> Callable[[CBuffer, Sequence], Tuple[CBuffer, Sequence]]:
def handle(is_valid, data):
return is_valid, data

return handle


def nulls_as_sentinel(sentinel=None):
from numpy import array, result_type
paleolimbot marked this conversation as resolved.
Show resolved Hide resolved

def handle(is_valid, data):
is_valid = array(is_valid, copy=False)
data = array(data, copy=False)
paleolimbot marked this conversation as resolved.
Show resolved Hide resolved

if len(is_valid) > 0:
out_type = result_type(data, sentinel)
data = array(data, dtype=out_type, copy=True)
data[~is_valid] = sentinel
return data
else:
return data

return handle


def nulls_as_masked_array():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be more useful to actually return each array as a tuple of data and mask (the numpy masked array isn't used that much)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know! I'll remove that then 🙂

I wonder if it would be more useful to actually return each array as a tuple of data and mask

Currently that would be nulls_debug(), mostly because I couldn't think of a good name (in a previous version it was nulls_as_mask_and_data()). Returning a tuple here sort of breaks the "every column is a sequence" guarantee. There could also be a MaskedSequence class that wraps the mask and data but that seems like opening a can of worms.

from numpy import array
from numpy.ma import masked_array

def handle(is_valid, data):
is_valid = array(is_valid, copy=False)
data = array(data, copy=False)

if len(is_valid) > 0:
return masked_array(data, ~is_valid)
else:
return data

return handle


class ArrayStreamVisitor(ArrayViewBaseIterator):
Expand Down Expand Up @@ -144,7 +196,7 @@ def finish(self) -> List:


class ColumnsBuilder(ArrayStreamVisitor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, and I don't directly have a better idea, but I found the usage of "columns" somewhat confusing here (and also "Builder", because that sounds to similar as the builders to build arrays/buffers in the python->arrow direction). Naming is hard .. ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been through a few iterations of naming 🙂 . Maybe sequence is better than column? The gist of it is "something that either numpy or Pandas can understand" and/or "we dealt with all the hard parts of a non-contiguous or non-zero offset array so you didn't have to". I could stick with Visitor instead of Builder, too (but these are internal for now so at least that part can get renamed).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add that I also had a bit of trouble comprehending the naming. I think it might be worthwhile to rename "column" to something else. Technically, its just a contiguous array, but array is overloaded. Are these technically all concatenation helpers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At Dane's suggestion these are now to_pysequence() and to_columns_pysequence(). I think these can/should get renamed when we see exactly how/if they are used (it may be that everybody will want custom logic and the high-level convert method will get unused).

def __init__(self, schema, *, array_view=None):
def __init__(self, schema, handle_nulls=None, *, array_view=None):
super().__init__(schema, array_view=array_view)

if self.schema.type != Type.STRUCT:
Expand All @@ -156,18 +208,23 @@ def __init__(self, schema, *, array_view=None):
self._schema.children, self._array_view.children
):
self._child_visitors.append(
self._resolve_child_visitor(child_schema, child_array_view)
self._resolve_child_visitor(
child_schema, child_array_view, handle_nulls
)
)

def _resolve_child_visitor(self, child_schema, child_array_view):
# TODO: Resolve more efficient column builders for single-buffer types
return ListBuilder(child_schema, array_view=child_array_view)
def _resolve_child_visitor(self, child_schema, child_array_view, handle_nulls):
cls, kwargs = _resolve_column_builder_cls(child_schema, handle_nulls)
return cls(child_schema, **kwargs, array_view=child_array_view)

def begin(self, total_elements: Union[int, None] = None) -> None:
for child_visitor in self._child_visitors:
child_visitor.begin(total_elements)

def visit_chunk_view(self, array_view: CArrayView) -> Any:
if array_view.null_count > 0:
raise ValueError("null_count > 0 encountered in ColumnsBuilder")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any nulls here the column results will be bogus! (We have no way to propagate a top-level nulls through into the child columns). At the "record batch" level nulls don't exist (typically they are exported as an explicitly non-nullable struct; however, at least one implementation doesn't do this so we have to check for actual nulls whilst iterating instead of erroring when we see the schema).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief comment about this in the code?


for child_visitor, child_array_view in zip(
self._child_visitors, array_view.children
):
Expand All @@ -177,3 +234,132 @@ def finish(self) -> Tuple[List[str], List[Sequence]]:
return [v.schema.name for v in self._child_visitors], [
v.finish() for v in self._child_visitors
]


class BufferColumnBuilder(ArrayStreamVisitor):
def begin(self, total_elements: Union[int, None]):
self._builder = CBufferBuilder()
self._builder.set_format(self._schema_view.buffer_format)

if total_elements is not None:
element_size_bits = self._schema_view.layout.element_size_bits[1]
element_size_bytes = element_size_bits // 8
self._builder.reserve_bytes(total_elements * element_size_bytes)

def visit_chunk_view(self, array_view: CArrayView) -> None:
builder = self._builder
offset, length = array_view.offset, array_view.length
dst_bytes = length * builder.itemsize

builder.reserve_bytes(dst_bytes)
array_view.buffer(1).copy_into(builder, offset, length, len(builder))
builder.advance(dst_bytes)

def finish(self) -> Any:
return self._builder.finish()


class BooleanColumnBuilder(ArrayStreamVisitor):
def begin(self, total_elements: Union[int, None]):
self._builder = CBufferBuilder()
self._builder.set_format("?")

if total_elements is not None:
self._builder.reserve_bytes(total_elements)

def visit_chunk_view(self, array_view: CArrayView) -> None:
builder = self._builder
offset, length = array_view.offset, array_view.length
builder.reserve_bytes(length)
array_view.buffer(1).unpack_bits_into(builder, offset, length, len(builder))
builder.advance(length)

def finish(self) -> Any:
return self._builder.finish()


class NullableColumnBuilder(ArrayStreamVisitor):
def __init__(
self,
schema,
column_builder_cls=BufferColumnBuilder,
handle_nulls: Union[Callable[[CBuffer, Sequence], Any], None] = None,
*,
array_view=None
):
super().__init__(schema, array_view=array_view)
self._column_builder = column_builder_cls(schema, array_view=self._array_view)

if handle_nulls is None:
self._handle_nulls = nulls_forbid()
else:
self._handle_nulls = handle_nulls

def begin(self, total_elements: Union[int, None]):
self._builder = CBufferBuilder()
self._builder.set_format("?")
self._length = 0

self._column_builder.begin(total_elements)

def visit_chunk_view(self, array_view: CArrayView) -> None:
offset, length = array_view.offset, array_view.length

builder = self._builder
chunk_contains_nulls = array_view.null_count != 0
bitmap_allocated = len(builder) > 0

if chunk_contains_nulls:
current_length = self._length
if not bitmap_allocated:
self._fill_valid(current_length)

builder.reserve_bytes(length)
array_view.buffer(0).unpack_bits_into(
builder, offset, length, current_length
)
builder.advance(length)

elif bitmap_allocated:
self._fill_valid(length)

self._length += length
self._column_builder.visit_chunk_view(array_view)

def finish(self) -> Any:
is_valid = self._builder.finish()
column = self._column_builder.finish()
return self._handle_nulls(is_valid, column)

def _fill_valid(self, length):
builder = self._builder
builder.reserve_bytes(length)
out_start = len(builder)
memoryview(builder)[out_start : out_start + length] = b"\x01" * length
builder.advance(length)


def _resolve_column_builder_cls(schema, handle_nulls=None):
schema_view = c_schema_view(schema)

if schema_view.nullable:
if schema_view.type_id == CArrowType.BOOL:
return NullableColumnBuilder, {
"column_builder_cls": BooleanColumnBuilder,
"handle_nulls": handle_nulls,
}
elif schema_view.buffer_format is not None:
return NullableColumnBuilder, {
"column_builder_cls": BufferColumnBuilder,
"handle_nulls": handle_nulls,
}
else:
return ListBuilder, {}
else:

if schema_view.type_id == CArrowType.BOOL:
return BooleanColumnBuilder, {}
elif schema_view.buffer_format is not None:
return BufferColumnBuilder, {}
else:
return ListBuilder, {}
4 changes: 2 additions & 2 deletions python/tests/test_array_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def test_array_stream_to_columns():
stream = na.ArrayStream(c_array)
names, columns = stream.to_columns()
assert names == ["col1", "col2"]
assert columns[0] == [1, 2, 3]
assert columns[1] == ["a", "b", "c"]
assert list(columns[0]) == [1, 2, 3]
assert list(columns[1]) == ["a", "b", "c"]


def test_array_stream_read_all():
Expand Down
Loading
Loading