Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add IPC stream writing #571

Merged
merged 4 commits into from
Aug 8, 2024
Merged

Conversation

bkietz
Copy link
Member

@bkietz bkietz commented Aug 1, 2024

  • adds ArrowIpcArrayStreamWriter along with Init() and Reset() methods
    • embeds an Encoder and an OutputStream
  • adds WriteSchema() and WriteArrayView() methods which encode then write a schema or array
  • adds WriteArrayStream() method which serializes an entire ArrowArrayStream, finishing with an explicit EOS
    • non-blocking IO is not currently supported

@bkietz bkietz requested a review from paleolimbot August 1, 2024 16:37
@bkietz bkietz force-pushed the ipc-stream-writing branch 3 times, most recently from 47bb06a to 708574e Compare August 1, 2024 19:32
src/nanoarrow/ipc/writer.c Outdated Show resolved Hide resolved
src/nanoarrow/ipc/writer.c Outdated Show resolved Hide resolved
Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for bearing with me!

src/nanoarrow/common/inline_types.h Outdated Show resolved Hide resolved
src/nanoarrow/ipc/writer.c Outdated Show resolved Hide resolved
src/nanoarrow/ipc/writer.c Outdated Show resolved Hide resolved
/// takes ownership of the output byte stream and the encoder, and the caller is
/// responsible for releasing the writer by calling ArrowIpcWriterReset().
ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
struct ArrowIpcEncoder* encoder,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to initialize the encoder internally here instead of force the caller to initialize one and then immediately move it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that the keeping the encoder's initialization separate allows us to avoid duplicating options for encoder. However we can probably just give configuration access to the stream's internal encoder when that becomes necessary.

src/nanoarrow/ipc/files_test.cc Show resolved Hide resolved
@bkietz
Copy link
Member Author

bkietz commented Aug 7, 2024

Valgrind is complaining about an invalid read. I can reproduce this locally and I get

==308668== Invalid read of size 4
==308668==    at 0x13B378: flatcc_builder_create_cached_vtable (builder.c:1233)
==308668==    by 0x13B882: flatcc_builder_end_table (builder.c:1353)
==308668==    by 0x1342F6: org_apache_arrow_flatbuf_Message_end (flatcc_generated.h:9224)
==308668==    by 0x1342F6: org_apache_arrow_flatbuf_Message_end_as_root (flatcc_generated.h:9244)
==308668==    by 0x1342F6: ArrowIpcEncoderEncodeRecordBatch (encoder.c:583)
==308668==    by 0x1342F6: ArrowIpcEncoderEncodeSimpleRecordBatch (encoder.c:599)
==308668==    by 0x12BF27: ArrowIpcWriterWriteArrayView (writer.c:271)
==308668==    by 0x127C4F: WriteNanoarrowStream (files_test.cc:216)
==308668==    by 0x127C4F: TestFile::TestEqualsArrowCpp(std::__cxx11::basic_string<char, std::char_traits<char>, s
td::allocator<char> > const&) (files_test.cc:247)
==308668==    by 0x128623: TestFileFixture_NanoarrowIpcTestFileNativeEndian_Test::TestBody() (files_test.cc:391)
==308668==    by 0x18A7C6: HandleSehExceptionsInMethodIfSupported<testing::Test, void> (gtest.cc:2612)
==308668==    by 0x18A7C6: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testin
g::Test*, void (testing::Test::*)(), char const*) (gtest.cc:2648)
==308668==    by 0x18AA20: Run (gtest.cc:2687)
==308668==    by 0x18AA20: testing::Test::Run() (gtest.cc:2677)
==308668==    by 0x18ADAE: testing::TestInfo::Run() (gtest.cc:2836)
==308668==    by 0x18E21E: Run (gtest.cc:3015)
==308668==    by 0x18E21E: testing::TestSuite::Run() (gtest.cc:2968)
==308668==    by 0x193E52: testing::internal::UnitTestImpl::RunAllTests() (gtest.cc:5920)
==308668==    by 0x18B056: HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool> (gtest.cc
:2612)
==308668==    by 0x18B056: bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTest
Impl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) (gtest.cc:
2648)
==308668==  Address 0x88fa018 is 40 bytes inside a block of size 128 free'd
==308668==    at 0x484DCD3: realloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==308668==    by 0x139FC8: flatcc_builder_default_alloc (builder.c:171)
==308668==    by 0x13B312: reserve_buffer (builder.c:320)
==308668==    by 0x13B312: flatcc_builder_create_cached_vtable (builder.c:1216)
==308668==    by 0x13B882: flatcc_builder_end_table (builder.c:1353)
==308668==    by 0x1342F6: org_apache_arrow_flatbuf_Message_end (flatcc_generated.h:9224)
==308668==    by 0x1342F6: org_apache_arrow_flatbuf_Message_end_as_root (flatcc_generated.h:9244)
==308668==    by 0x1342F6: ArrowIpcEncoderEncodeRecordBatch (encoder.c:583)
==308668==    by 0x1342F6: ArrowIpcEncoderEncodeSimpleRecordBatch (encoder.c:599)
==308668==    by 0x12BF27: ArrowIpcWriterWriteArrayView (writer.c:271)
==308668==    by 0x127C4F: WriteNanoarrowStream (files_test.cc:216)
==308668==    by 0x127C4F: TestFile::TestEqualsArrowCpp(std::__cxx11::basic_string<char, std::char_traits<char>, s
td::allocator<char> > const&) (files_test.cc:247)
==308668==    by 0x128623: TestFileFixture_NanoarrowIpcTestFileNativeEndian_Test::TestBody() (files_test.cc:391)
==308668==    by 0x18A7C6: HandleSehExceptionsInMethodIfSupported<testing::Test, void> (gtest.cc:2612)
==308668==    by 0x18A7C6: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testin
g::Test*, void (testing::Test::*)(), char const*) (gtest.cc:2648)
==308668==    by 0x18AA20: Run (gtest.cc:2687)
==308668==    by 0x18AA20: testing::Test::Run() (gtest.cc:2677)
==308668==    by 0x18ADAE: testing::TestInfo::Run() (gtest.cc:2836)
==308668==    by 0x18E21E: Run (gtest.cc:3015)
==308668==    by 0x18E21E: testing::TestSuite::Run() (gtest.cc:2968)
==308668==  Block was alloc'd at
==308668==    at 0x484DCD3: realloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==308668==    by 0x139FC8: flatcc_builder_default_alloc (builder.c:171)
==308668==    by 0x13B312: reserve_buffer (builder.c:320)
==308668==    by 0x13B312: flatcc_builder_create_cached_vtable (builder.c:1216)
==308668==    by 0x13B882: flatcc_builder_end_table (builder.c:1353)
==308668==    by 0x133DB1: org_apache_arrow_flatbuf_Message_end (flatcc_generated.h:9224)
==308668==    by 0x133DB1: org_apache_arrow_flatbuf_Message_end_as_root (flatcc_generated.h:9244)
==308668==    by 0x133DB1: ArrowIpcEncoderEncodeSchema (encoder.c:442)
==308668==    by 0x12BDA7: ArrowIpcWriterWriteSchema (writer.c:245)
==308668==    by 0x127C1B: WriteNanoarrowStream (files_test.cc:211)
==308668==    by 0x127C1B: TestFile::TestEqualsArrowCpp(std::__cxx11::basic_string<char, std::char_traits<char>, s
td::allocator<char> > const&) (files_test.cc:247)
==308668==    by 0x128623: TestFileFixture_NanoarrowIpcTestFileNativeEndian_Test::TestBody() (files_test.cc:391)
==308668==    by 0x18A7C6: HandleSehExceptionsInMethodIfSupported<testing::Test, void> (gtest.cc:2612)
==308668==    by 0x18A7C6: void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testin
g::Test*, void (testing::Test::*)(), char const*) (gtest.cc:2648)
==308668==    by 0x18AA20: Run (gtest.cc:2687)
==308668==    by 0x18AA20: testing::Test::Run() (gtest.cc:2677)
==308668==    by 0x18ADAE: testing::TestInfo::Run() (gtest.cc:2836)
==308668==    by 0x18E21E: Run (gtest.cc:3015)
==308668==    by 0x18E21E: testing::TestSuite::Run() (gtest.cc:2968)
==308668== 

AFAICT from preliminary debugging flatcc's usage of realloc() is not incorrect, and Valgrind doesn't complain about the same test compiled using clang. ASAN also doesn't produce any error here. In particular I don't get any segfaults or Valgrind failures when replacing realloc with an equivalent malloc+memcpy+free (which should expose the common realloc error of not accounting for the memory block moving).

Maybe this should just be handled by a new suppression?

{
   <flatcc>:flatcc uses realloc() and valgrind thinks something was free'd
   Memcheck:Addr4
   fun:flatcc_builder_create_cached_vtable
}

Alternatively flatcc supports straightforward configuration of whether realloc will be used (FLATCC_BUILDER_REALLOC) , so another non-invasive fix would be to use that to replace realloc with malloc+memcpy+free until further notice.

Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a tiny bit of reading about this, Valgrind's realloc() implementation moves every block and frees the original allocation. Debugging this may take a while...maybe add the suppression with a follow-up tagged with a milestone for the next version? It may be easier to generate more cases where this occurs when there are R or Python bindings, too.

src/nanoarrow/common/inline_buffer.h Show resolved Hide resolved
@bkietz
Copy link
Member Author

bkietz commented Aug 8, 2024

Now valgrind is complaining about protobuf singletons which are not free'd before exit:

==74086== 
==74086== HEAP SUMMARY:
==74086==     in use at exit: 17,120 bytes in 283 blocks
==74086==   total heap usage: 5,783 allocs, 5,500 frees, 591,780 bytes allocated
==74086== 
==74086== 160 bytes in 1 blocks are possibly lost in loss record 217 of 235
==74086==    at 0x4849F0F: operator new(unsigned long) (vg_replace_malloc.c:487)
==74086==    by 0x760BFE8: bool absl::lts_20240116::container_internal::HashSetResizeHelper::InitializeSlots<std::allocator<char>, 8ul, true, 8ul>(absl::lts_20240116::conta
iner_internal::CommonFields&, void*, std::allocator<char>) [clone .isra.0] (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x760EE3F: absl::lts_20240116::container_internal::raw_hash_set<absl::lts_20240116::container_internal::FlatHashSetPolicy<google::protobuf::internal::Descri
ptorTable const*>, google::protobuf::(anonymous namespace)::GeneratedMessageFactory::DescriptorByNameHash, google::protobuf::(anonymous namespace)::GeneratedMessageFactory:
:DescriptorByNameEq, std::allocator<google::protobuf::internal::DescriptorTable const*> >::resize(unsigned long) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.
25.3.0)
==74086==    by 0x760F0A1: absl::lts_20240116::container_internal::raw_hash_set<absl::lts_20240116::container_internal::FlatHashSetPolicy<google::protobuf::internal::Descri
ptorTable const*>, google::protobuf::(anonymous namespace)::GeneratedMessageFactory::DescriptorByNameHash, google::protobuf::(anonymous namespace)::GeneratedMessageFactory:
:DescriptorByNameEq, std::allocator<google::protobuf::internal::DescriptorTable const*> >::prepare_insert(unsigned long) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprot
obuf.so.25.3.0)
==74086==    by 0x7610491: google::protobuf::MessageFactory::InternalRegisterGeneratedFile(google::protobuf::internal::DescriptorTable const*) (in /home/ben/mambaforge/envs
/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x400647D: call_init.part.0 (dl-init.c:70)
==74086==    by 0x4006567: call_init (dl-init.c:33)
==74086==    by 0x4006567: _dl_init (dl-init.c:117)
==74086==    by 0x40202C9: ??? (in /usr/lib/x86_64-linux-gnu/ld-linux-x86-64.so.2)
==74086==    by 0x1: ???
==74086==    by 0x1FFEFFE68A: ???
==74086==    by 0x1FFEFFE6AB: ???
==74086== 
==74086== 414 bytes in 16 blocks are possibly lost in loss record 230 of 235
==74086==    at 0x4849F0F: operator new(unsigned long) (vg_replace_malloc.c:487)
==74086==    by 0x74E5F4A: void std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_M_construct<char*>(char*, char*, std::forward_iterator_tag
) [clone .constprop.0] (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750EF04: google::protobuf::DescriptorPool::Tables::Tables() (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F172: google::protobuf::DescriptorPool::DescriptorPool(google::protobuf::DescriptorDatabase*, google::protobuf::DescriptorPool::ErrorCollector*) (in /h
ome/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F232: google::protobuf::DescriptorPool::internal_generated_pool() (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F307: google::protobuf::DescriptorPool::InternalAddGeneratedFile(void const*, int) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x75703CF: google::protobuf::internal::AddDescriptorsRunner::AddDescriptorsRunner(google::protobuf::internal::DescriptorTable const*) (in /home/ben/mambafor
ge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x400647D: call_init.part.0 (dl-init.c:70)
==74086==    by 0x4006567: call_init (dl-init.c:33)
==74086==    by 0x4006567: _dl_init (dl-init.c:117)
==74086==    by 0x40202C9: ??? (in /usr/lib/x86_64-linux-gnu/ld-linux-x86-64.so.2)
==74086==    by 0x1: ???
==74086==    by 0x1FFEFFE68A: ???
==74086== 
==74086== 1,296 bytes in 1 blocks are possibly lost in loss record 234 of 235
==74086==    at 0x4849F0F: operator new(unsigned long) (vg_replace_malloc.c:487)
==74086==    by 0x74E7487: bool absl::lts_20240116::container_internal::HashSetResizeHelper::InitializeSlots<std::allocator<char>, 40ul, false, 8ul>(absl::lts_20240116::con
tainer_internal::CommonFields&, void*, std::allocator<char>) [clone .isra.0] (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750C68A: absl::lts_20240116::container_internal::raw_hash_set<absl::lts_20240116::container_internal::FlatHashMapPolicy<std::__cxx11::basic_string<char, s
td::char_traits<char>, std::allocator<char> >, google::protobuf::Descriptor::WellKnownType>, absl::lts_20240116::container_internal::StringHash, absl::lts_20240116::contain
er_internal::StringEq, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, google::protobuf::Descriptor::WellKno
wnType> > >::resize(unsigned long) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750CA9F: absl::lts_20240116::container_internal::raw_hash_set<absl::lts_20240116::container_internal::FlatHashMapPolicy<std::__cxx11::basic_string<char, s
td::char_traits<char>, std::allocator<char> >, google::protobuf::Descriptor::WellKnownType>, absl::lts_20240116::container_internal::StringHash, absl::lts_20240116::contain
er_internal::StringEq, std::allocator<std::pair<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const, google::protobuf::Descriptor::WellKno
wnType> > >::prepare_insert(unsigned long) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750EED9: google::protobuf::DescriptorPool::Tables::Tables() (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F172: google::protobuf::DescriptorPool::DescriptorPool(google::protobuf::DescriptorDatabase*, google::protobuf::DescriptorPool::ErrorCollector*) (in /h
ome/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F232: google::protobuf::DescriptorPool::internal_generated_pool() (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x750F307: google::protobuf::DescriptorPool::InternalAddGeneratedFile(void const*, int) (in /home/ben/mambaforge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x75703CF: google::protobuf::internal::AddDescriptorsRunner::AddDescriptorsRunner(google::protobuf::internal::DescriptorTable const*) (in /home/ben/mambafor
ge/envs/nanoarrow/lib/libprotobuf.so.25.3.0)
==74086==    by 0x400647D: call_init.part.0 (dl-init.c:70)
==74086==    by 0x4006567: call_init (dl-init.c:33)
==74086==    by 0x4006567: _dl_init (dl-init.c:117)
==74086==    by 0x40202C9: ??? (in /usr/lib/x86_64-linux-gnu/ld-linux-x86-64.so.2)
==74086== 
==74086== LEAK SUMMARY:
==74086==    definitely lost: 0 bytes in 0 blocks
==74086==    indirectly lost: 0 bytes in 0 blocks
==74086==      possibly lost: 1,870 bytes in 18 blocks
==74086==    still reachable: 14,770 bytes in 264 blocks
==74086==         suppressed: 480 bytes in 1 blocks
==74086== Reachable blocks (those to which a pointer was found) are not shown.
==74086== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==74086== 
==74086== For lists of detected and suppressed errors, rerun with: -s
==74086== ERROR SUMMARY: 3 errors from 3 contexts (suppressed: 1 from 1)

I'll add a suppression for that as well.

@paleolimbot
Copy link
Member

Erg. (I wonder how protobuf sneaked into our "minimal" C++ build for CI!)

@bkietz
Copy link
Member Author

bkietz commented Aug 8, 2024

It's arrow is linked to protobuf, and the tests link to arrow. What's odd to me is this wasn't in arrow's own suppressions file

@bkietz
Copy link
Member Author

bkietz commented Aug 8, 2024

Okay, that's fun: since we are linked to protobuf, we have inherited responsibility to call ShutdownProtobufLibrary in order to clean up these singletons. That's not something libarrow does or should do, because the presumption in libarrow is anyone who uses protobuf features will want control over when protobuf gets shut down.

@bkietz bkietz merged commit 162dcbd into apache:main Aug 8, 2024
34 checks passed
@bkietz bkietz deleted the ipc-stream-writing branch August 8, 2024 18:28

NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
&private->encoder, in, &private->body_buffer, error));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello,

Shouldn't private->buffer be finalized here? Otherwise the header of a message isn't written to the output stream.

Like this:

   NANOARROW_RETURN_NOT_OK_WITH_ERROR(
       ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
                                     &private->buffer),
       error);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know... it really should. Why isn't this failing tests?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question :) When I tried to use this API, I noticed the absence of the header.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay... I'll get a fix as soon as I write a failing test. Thanks!

bkietz added a commit that referenced this pull request Aug 12, 2024
- As noted
#571 (comment)
RecordBatch Messages were not being written during IPC streaming. This
has been corrected
- Testing uses one more round trip case where we have arrow C++ read
from a stream written by nanoarrow.
- During testing it also became apparent that encapsulated messages were
not always stored with the correct size; metadata_size should include
padding (but not the continuation or size itself)
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
@paleolimbot paleolimbot added this to the nanoarrow 0.6.0 milestone Sep 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants