-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add IPC stream writing #571
Conversation
bkietz
commented
Aug 1, 2024
•
edited
Loading
edited
- adds ArrowIpcArrayStreamWriter along with Init() and Reset() methods
- embeds an Encoder and an OutputStream
- adds WriteSchema() and WriteArrayView() methods which encode then write a schema or array
- adds WriteArrayStream() method which serializes an entire ArrowArrayStream, finishing with an explicit EOS
- non-blocking IO is not currently supported
47bb06a
to
708574e
Compare
708574e
to
5973471
Compare
5973471
to
0cc7f7c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for bearing with me!
src/nanoarrow/nanoarrow_ipc.h
Outdated
/// takes ownership of the output byte stream and the encoder, and the caller is | ||
/// responsible for releasing the writer by calling ArrowIpcWriterReset(). | ||
ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, | ||
struct ArrowIpcEncoder* encoder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to initialize the encoder internally here instead of force the caller to initialize one and then immediately move it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that the keeping the encoder's initialization separate allows us to avoid duplicating options for encoder. However we can probably just give configuration access to the stream's internal encoder when that becomes necessary.
Valgrind is complaining about an invalid read. I can reproduce this locally and I get
AFAICT from preliminary debugging flatcc's usage of realloc() is not incorrect, and Valgrind doesn't complain about the same test compiled using clang. ASAN also doesn't produce any error here. In particular I don't get any segfaults or Valgrind failures when replacing realloc with an equivalent malloc+memcpy+free (which should expose the common realloc error of not accounting for the memory block moving). Maybe this should just be handled by a new suppression?
Alternatively flatcc supports straightforward configuration of whether realloc will be used ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a tiny bit of reading about this, Valgrind's realloc()
implementation moves every block and frees the original allocation. Debugging this may take a while...maybe add the suppression with a follow-up tagged with a milestone for the next version? It may be easier to generate more cases where this occurs when there are R or Python bindings, too.
Now valgrind is complaining about protobuf singletons which are not free'd before exit:
I'll add a suppression for that as well. |
Erg. (I wonder how protobuf sneaked into our "minimal" C++ build for CI!) |
It's arrow is linked to protobuf, and the tests link to arrow. What's odd to me is this wasn't in arrow's own suppressions file |
Okay, that's fun: since we are linked to protobuf, we have inherited responsibility to call ShutdownProtobufLibrary in order to clean up these singletons. That's not something libarrow does or should do, because the presumption in libarrow is anyone who uses protobuf features will want control over when protobuf gets shut down. |
|
||
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( | ||
&private->encoder, in, &private->body_buffer, error)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello,
Shouldn't private->buffer
be finalized here? Otherwise the header of a message isn't written to the output stream.
Like this:
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
&private->buffer),
error);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know... it really should. Why isn't this failing tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question :) When I tried to use this API, I noticed the absence of the header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay... I'll get a fix as soon as I write a failing test. Thanks!
- As noted #571 (comment) RecordBatch Messages were not being written during IPC streaming. This has been corrected - Testing uses one more round trip case where we have arrow C++ read from a stream written by nanoarrow. - During testing it also became apparent that encapsulated messages were not always stored with the correct size; metadata_size should include padding (but not the continuation or size itself) https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format