-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(python): Add column-wise buffer builder #464
Changes from 7 commits
e924e9d
2049e63
4526c0b
f611ee8
4fd4c2d
3792f4c
68fa3ff
b9be69a
dd6c4f5
a427fca
62d5dd3
b474b29
14e3067
b249e99
eb1aacc
ae44f7d
67a7b44
6c1cde8
db7400c
9ccd5a5
a7d5b90
60f101c
b734af0
561f8ca
63adc2b
beb1ce6
8899fcf
8180f31
c2d6813
9a57453
7a2fb86
408e2be
5ee1fc6
e1d1690
b881976
62c85fd
d2730a2
ca0d5eb
78e666b
4c9d2d2
3412b0f
9d27bd1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,11 @@ | |
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from typing import Any, List, Sequence, Tuple, Union | ||
from typing import Any, Callable, List, Sequence, Tuple, Union | ||
|
||
from nanoarrow._lib import CArrayView | ||
from nanoarrow._lib import CArrayView, CArrowType, CBuffer, CBufferBuilder | ||
from nanoarrow.c_array_stream import c_array_stream | ||
from nanoarrow.c_schema import c_schema_view | ||
from nanoarrow.iterator import ArrayViewBaseIterator, PyIterator | ||
from nanoarrow.schema import Type | ||
|
||
|
@@ -49,7 +50,7 @@ def to_pylist(obj, schema=None) -> List: | |
return ListBuilder.visit(obj, schema) | ||
|
||
|
||
def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]: | ||
def to_columns(obj, schema=None, handle_nulls=None) -> Tuple[List[str], List[Sequence]]: | ||
"""Convert ``obj`` to a ``list()` of sequences | ||
|
||
Converts a stream of struct arrays into its column-wise representation | ||
|
@@ -74,9 +75,60 @@ def to_columns(obj, schema=None) -> Tuple[List[str], List[Sequence]]: | |
>>> names | ||
['col1'] | ||
>>> columns | ||
[[1, 2, 3]] | ||
[nanoarrow.c_lib.CBuffer(int64[24 b] 1 2 3)] | ||
""" | ||
return ColumnsBuilder.visit(obj, schema) | ||
return ColumnsBuilder.visit(obj, schema, handle_nulls=handle_nulls) | ||
|
||
|
||
def nulls_forbid() -> Callable[[CBuffer, Sequence], Sequence]: | ||
def handle(is_valid, data): | ||
if len(is_valid) > 0: | ||
raise ValueError("Null present with null_handler=nulls_forbid()") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not because there is a validity buffer that it includes any nulls? Or did you ensure that before? (eg not convert the bitmap to bytemap if the bitmap is all set?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes...the bool buffer indicating validity is never allocated unless a null is actually present (and is None otherwise). I added this to the |
||
|
||
return data | ||
|
||
return handle | ||
|
||
|
||
def nulls_debug() -> Callable[[CBuffer, Sequence], Tuple[CBuffer, Sequence]]: | ||
def handle(is_valid, data): | ||
return is_valid, data | ||
|
||
return handle | ||
|
||
|
||
def nulls_as_sentinel(sentinel=None): | ||
from numpy import array, result_type | ||
paleolimbot marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def handle(is_valid, data): | ||
is_valid = array(is_valid, copy=False) | ||
data = array(data, copy=False) | ||
paleolimbot marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if len(is_valid) > 0: | ||
out_type = result_type(data, sentinel) | ||
data = array(data, dtype=out_type, copy=True) | ||
data[~is_valid] = sentinel | ||
return data | ||
else: | ||
return data | ||
|
||
return handle | ||
|
||
|
||
def nulls_as_masked_array(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it would be more useful to actually return each array as a tuple of data and mask (the numpy masked array isn't used that much) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know! I'll remove that then 🙂
Currently that would be |
||
from numpy import array | ||
from numpy.ma import masked_array | ||
|
||
def handle(is_valid, data): | ||
is_valid = array(is_valid, copy=False) | ||
data = array(data, copy=False) | ||
|
||
if len(is_valid) > 0: | ||
return masked_array(data, ~is_valid) | ||
else: | ||
return data | ||
|
||
return handle | ||
|
||
|
||
class ArrayStreamVisitor(ArrayViewBaseIterator): | ||
|
@@ -144,7 +196,7 @@ def finish(self) -> List: | |
|
||
|
||
class ColumnsBuilder(ArrayStreamVisitor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, and I don't directly have a better idea, but I found the usage of "columns" somewhat confusing here (and also "Builder", because that sounds to similar as the builders to build arrays/buffers in the python->arrow direction). Naming is hard .. ;) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's been through a few iterations of naming 🙂 . Maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add that I also had a bit of trouble comprehending the naming. I think it might be worthwhile to rename "column" to something else. Technically, its just a contiguous array, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At Dane's suggestion these are now |
||
def __init__(self, schema, *, array_view=None): | ||
def __init__(self, schema, handle_nulls=None, *, array_view=None): | ||
super().__init__(schema, array_view=array_view) | ||
|
||
if self.schema.type != Type.STRUCT: | ||
|
@@ -156,18 +208,23 @@ def __init__(self, schema, *, array_view=None): | |
self._schema.children, self._array_view.children | ||
): | ||
self._child_visitors.append( | ||
self._resolve_child_visitor(child_schema, child_array_view) | ||
self._resolve_child_visitor( | ||
child_schema, child_array_view, handle_nulls | ||
) | ||
) | ||
|
||
def _resolve_child_visitor(self, child_schema, child_array_view): | ||
# TODO: Resolve more efficient column builders for single-buffer types | ||
return ListBuilder(child_schema, array_view=child_array_view) | ||
def _resolve_child_visitor(self, child_schema, child_array_view, handle_nulls): | ||
cls, kwargs = _resolve_column_builder_cls(child_schema, handle_nulls) | ||
return cls(child_schema, **kwargs, array_view=child_array_view) | ||
|
||
def begin(self, total_elements: Union[int, None] = None) -> None: | ||
for child_visitor in self._child_visitors: | ||
child_visitor.begin(total_elements) | ||
|
||
def visit_chunk_view(self, array_view: CArrayView) -> Any: | ||
if array_view.null_count > 0: | ||
raise ValueError("null_count > 0 encountered in ColumnsBuilder") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there are any nulls here the column results will be bogus! (We have no way to propagate a top-level nulls through into the child columns). At the "record batch" level nulls don't exist (typically they are exported as an explicitly non-nullable struct; however, at least one implementation doesn't do this so we have to check for actual nulls whilst iterating instead of erroring when we see the schema). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a brief comment about this in the code? |
||
|
||
for child_visitor, child_array_view in zip( | ||
self._child_visitors, array_view.children | ||
): | ||
|
@@ -177,3 +234,132 @@ def finish(self) -> Tuple[List[str], List[Sequence]]: | |
return [v.schema.name for v in self._child_visitors], [ | ||
v.finish() for v in self._child_visitors | ||
] | ||
|
||
|
||
class BufferColumnBuilder(ArrayStreamVisitor): | ||
def begin(self, total_elements: Union[int, None]): | ||
self._builder = CBufferBuilder() | ||
self._builder.set_format(self._schema_view.buffer_format) | ||
|
||
if total_elements is not None: | ||
element_size_bits = self._schema_view.layout.element_size_bits[1] | ||
element_size_bytes = element_size_bits // 8 | ||
self._builder.reserve_bytes(total_elements * element_size_bytes) | ||
|
||
def visit_chunk_view(self, array_view: CArrayView) -> None: | ||
builder = self._builder | ||
offset, length = array_view.offset, array_view.length | ||
dst_bytes = length * builder.itemsize | ||
|
||
builder.reserve_bytes(dst_bytes) | ||
array_view.buffer(1).copy_into(builder, offset, length, len(builder)) | ||
builder.advance(dst_bytes) | ||
|
||
def finish(self) -> Any: | ||
return self._builder.finish() | ||
|
||
|
||
class BooleanColumnBuilder(ArrayStreamVisitor): | ||
def begin(self, total_elements: Union[int, None]): | ||
self._builder = CBufferBuilder() | ||
self._builder.set_format("?") | ||
|
||
if total_elements is not None: | ||
self._builder.reserve_bytes(total_elements) | ||
|
||
def visit_chunk_view(self, array_view: CArrayView) -> None: | ||
builder = self._builder | ||
offset, length = array_view.offset, array_view.length | ||
builder.reserve_bytes(length) | ||
array_view.buffer(1).unpack_bits_into(builder, offset, length, len(builder)) | ||
builder.advance(length) | ||
|
||
def finish(self) -> Any: | ||
return self._builder.finish() | ||
|
||
|
||
class NullableColumnBuilder(ArrayStreamVisitor): | ||
def __init__( | ||
self, | ||
schema, | ||
column_builder_cls=BufferColumnBuilder, | ||
handle_nulls: Union[Callable[[CBuffer, Sequence], Any], None] = None, | ||
*, | ||
array_view=None | ||
): | ||
super().__init__(schema, array_view=array_view) | ||
self._column_builder = column_builder_cls(schema, array_view=self._array_view) | ||
|
||
if handle_nulls is None: | ||
self._handle_nulls = nulls_forbid() | ||
else: | ||
self._handle_nulls = handle_nulls | ||
|
||
def begin(self, total_elements: Union[int, None]): | ||
self._builder = CBufferBuilder() | ||
self._builder.set_format("?") | ||
self._length = 0 | ||
|
||
self._column_builder.begin(total_elements) | ||
|
||
def visit_chunk_view(self, array_view: CArrayView) -> None: | ||
offset, length = array_view.offset, array_view.length | ||
|
||
builder = self._builder | ||
chunk_contains_nulls = array_view.null_count != 0 | ||
bitmap_allocated = len(builder) > 0 | ||
|
||
if chunk_contains_nulls: | ||
current_length = self._length | ||
if not bitmap_allocated: | ||
self._fill_valid(current_length) | ||
|
||
builder.reserve_bytes(length) | ||
array_view.buffer(0).unpack_bits_into( | ||
builder, offset, length, current_length | ||
) | ||
builder.advance(length) | ||
|
||
elif bitmap_allocated: | ||
self._fill_valid(length) | ||
|
||
self._length += length | ||
self._column_builder.visit_chunk_view(array_view) | ||
|
||
def finish(self) -> Any: | ||
is_valid = self._builder.finish() | ||
column = self._column_builder.finish() | ||
return self._handle_nulls(is_valid, column) | ||
|
||
def _fill_valid(self, length): | ||
builder = self._builder | ||
builder.reserve_bytes(length) | ||
out_start = len(builder) | ||
memoryview(builder)[out_start : out_start + length] = b"\x01" * length | ||
builder.advance(length) | ||
|
||
|
||
def _resolve_column_builder_cls(schema, handle_nulls=None): | ||
schema_view = c_schema_view(schema) | ||
|
||
if schema_view.nullable: | ||
if schema_view.type_id == CArrowType.BOOL: | ||
return NullableColumnBuilder, { | ||
"column_builder_cls": BooleanColumnBuilder, | ||
"handle_nulls": handle_nulls, | ||
} | ||
elif schema_view.buffer_format is not None: | ||
return NullableColumnBuilder, { | ||
"column_builder_cls": BufferColumnBuilder, | ||
"handle_nulls": handle_nulls, | ||
} | ||
else: | ||
return ListBuilder, {} | ||
else: | ||
|
||
if schema_view.type_id == CArrowType.BOOL: | ||
return BooleanColumnBuilder, {} | ||
elif schema_view.buffer_format is not None: | ||
return BufferColumnBuilder, {} | ||
else: | ||
return ListBuilder, {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the line below, but can you then clarify when you get a buffer or when a list?