From 0cc7f7c9f4452e2016fef92890a9139dd370e894 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 25 Jul 2024 14:56:10 -0500 Subject: [PATCH 1/4] add IPC stream writing --- src/nanoarrow/common/inline_types.h | 2 +- src/nanoarrow/ipc/encoder.c | 1 - src/nanoarrow/ipc/files_test.cc | 170 ++++++++++++++++++++-------- src/nanoarrow/ipc/writer.c | 158 ++++++++++++++++++++++++++ src/nanoarrow/nanoarrow_ipc.h | 47 ++++++++ src/nanoarrow/nanoarrow_ipc.hpp | 20 ++++ 6 files changed, 348 insertions(+), 50 deletions(-) diff --git a/src/nanoarrow/common/inline_types.h b/src/nanoarrow/common/inline_types.h index 015cc5416..c74b1a439 100644 --- a/src/nanoarrow/common/inline_types.h +++ b/src/nanoarrow/common/inline_types.h @@ -314,7 +314,7 @@ static inline void ArrowErrorSetString(struct ArrowError* error, const char* src #define NANOARROW_DCHECK(EXPR) _NANOARROW_DCHECK_IMPL(EXPR, #EXPR) #else #define NANOARROW_ASSERT_OK(EXPR) (void)(EXPR) -#define NANOARROW_DCHECK(EXPR) +#define NANOARROW_DCHECK(EXPR) (void)(EXPR) #endif static inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dst) { diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c index 889f66ad7..8497616b0 100644 --- a/src/nanoarrow/ipc/encoder.c +++ b/src/nanoarrow/ipc/encoder.c @@ -531,7 +531,6 @@ static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch( const struct ArrowArrayView* array_view, struct ArrowError* error) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && buffer_encoder != NULL && buffer_encoder->encode_buffer != NULL); - if (array_view->null_count != 0 && ArrowArrayViewComputeNullCount(array_view) != 0) { ArrowErrorSet(error, "RecordBatches cannot be constructed from arrays with top level nulls"); diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc index 765823274..271984215 100644 --- a/src/nanoarrow/ipc/files_test.cc +++ b/src/nanoarrow/ipc/files_test.cc @@ -29,7 +29,7 @@ #include #include "nanoarrow/nanoarrow.hpp" -#include "nanoarrow/nanoarrow_ipc.h" +#include "nanoarrow/nanoarrow_ipc.hpp" #include "nanoarrow/nanoarrow_testing.hpp" #include "flatcc/portable/pendian_detect.h" @@ -105,6 +105,29 @@ class TestFile { return NANOARROW_OK; } + ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix, + struct ArrowSchema* schema, + std::vector* arrays, + ArrowError* error) { + nanoarrow::UniqueArrayStream stream; + NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), error)); + + NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema, error)); + + while (true) { + nanoarrow::UniqueArray array; + + NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(), array.get(), error)); + + if (array->release == nullptr) { + break; + } + + arrays->push_back(std::move(array)); + } + return NANOARROW_OK; + } + ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix, ArrowArrayStream* out, ArrowError* error) { std::stringstream path_builder; @@ -171,6 +194,60 @@ class TestFile { return std::make_shared(content_copy_wrapped); } + // Set a trivial release callback which won't free() anything + template + static void SetTrivialRelease(void (**release)(T*)) { + *release = [](T* ptr) { ptr->release = nullptr; }; + } + + ArrowErrorCode WriteNanoarrowStream(const nanoarrow::UniqueSchema& schema, + const std::vector& arrays, + struct ArrowBuffer* buffer, + struct ArrowError* error) { + struct { + const nanoarrow::UniqueSchema& schema; + const std::vector& arrays; + size_t i = 0; + } stream_private{schema, arrays}; + + static auto get_private = [](struct ArrowArrayStream* stream) { + return static_cast(stream->private_data); + }; + + struct ArrowArrayStream in; + in.get_schema = [](struct ArrowArrayStream* stream, struct ArrowSchema* out) { + memcpy(out, get_private(stream)->schema.get(), sizeof(struct ArrowSchema)); + SetTrivialRelease(&out->release); + return 0; + }; + in.get_next = [](struct ArrowArrayStream* stream, struct ArrowArray* out) { + size_t i = get_private(stream)->i++; + out->release = nullptr; + if (i < get_private(stream)->arrays.size()) { + memcpy(out, get_private(stream)->arrays[i].get(), sizeof(struct ArrowArray)); + SetTrivialRelease(&out->release); + } + return 0; + }; + in.get_last_error = [](struct ArrowArrayStream*) { return "temp stream error"; }; + in.private_data = &stream_private; + SetTrivialRelease(&in.release); + + nanoarrow::ipc::UniqueOutputStream output_stream; + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamInitBuffer(output_stream.get(), buffer)); + + nanoarrow::ipc::UniqueArrayStreamWriter writer; + NANOARROW_RETURN_NOT_OK( + ArrowIpcArrayStreamWriterInit(writer.get(), &in, output_stream.get())); + + while (!writer->finished) { + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterWriteSome(writer.get(), error)); + } + writer.reset(); + + return NANOARROW_OK; + } + void TestEqualsArrowCpp(const std::string& dir_prefix) { std::stringstream path_builder; path_builder << dir_prefix << "/" << path_; @@ -179,76 +256,72 @@ class TestFile { ArrowErrorInit(&error); // Read using nanoarrow_ipc - nanoarrow::UniqueArrayStream stream; - ASSERT_EQ(GetArrowArrayStreamIPC(dir_prefix, stream.get(), &error), NANOARROW_OK) - << error.message; - nanoarrow::UniqueSchema schema; - int result = ArrowArrayStreamGetSchema(stream.get(), schema.get(), &error); + std::vector arrays; + int result = ReadArrowArrayStreamIPC(dir_prefix, schema.get(), &arrays, &error); if (result != NANOARROW_OK) { if (Check(result, error.message)) { return; } - GTEST_FAIL() << MakeError(result, error.message); } - std::vector arrays; - while (true) { - nanoarrow::UniqueArray array; - - result = ArrowArrayStreamGetNext(stream.get(), array.get(), &error); - if (result != NANOARROW_OK) { - if (Check(result, error.message)) { - return; - } - - GTEST_FAIL() << MakeError(result, error.message); - } - - if (array->release == nullptr) { - break; - } - - arrays.push_back(std::move(array)); - } - // If the file was supposed to fail the read but did not, fail here if (expected_return_code_ != NANOARROW_OK) { GTEST_FAIL() << MakeError(NANOARROW_OK, ""); } - // Read the same file with Arrow C++ - nanoarrow::UniqueBuffer content_copy; - ASSERT_EQ(ReadFileBuffer(path_builder.str(), content_copy.get(), &error), + // Write back to a buffer using nanoarrow + nanoarrow::UniqueBuffer roundtripped; + ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error), NANOARROW_OK) << error.message; - std::shared_ptr input_stream = BufferInputStream(content_copy.get()); - auto maybe_reader = ipc::RecordBatchStreamReader::Open(input_stream); - FAIL_RESULT_NOT_OK(maybe_reader); - - auto maybe_table_arrow = maybe_reader.ValueUnsafe()->ToTable(); + // Read the same file with Arrow C++ + auto maybe_table_arrow = ReadTable(io::ReadableFile::Open(path_builder.str())); FAIL_RESULT_NOT_OK(maybe_table_arrow); - // Make a Table from the our vector of arrays - auto maybe_schema = ImportSchema(schema.get()); - FAIL_RESULT_NOT_OK(maybe_schema); + AssertEqualsTable(std::move(schema), std::move(arrays), + maybe_table_arrow.ValueUnsafe()); - ASSERT_TRUE(maybe_table_arrow.ValueUnsafe()->schema()->Equals(**maybe_schema, true)); + // Read the roundtripped buffer using nanoarrow + nanoarrow::UniqueSchema roundtripped_schema; + std::vector roundtripped_arrays; + ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(), + &roundtripped_arrays, &error), + NANOARROW_OK); - std::vector> batches; - for (auto& array : arrays) { - auto maybe_batch = ImportRecordBatch(array.get(), *maybe_schema); - FAIL_RESULT_NOT_OK(maybe_batch); + AssertEqualsTable(std::move(roundtripped_schema), std::move(roundtripped_arrays), + maybe_table_arrow.ValueUnsafe()); + } + + Result> ReadTable( + Result> maybe_input_stream) { + ARROW_ASSIGN_OR_RAISE(auto input_stream, maybe_input_stream); + ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchStreamReader::Open(input_stream)); + return reader->ToTable(); + } + + Result> ToTable(nanoarrow::UniqueSchema schema, + std::vector arrays) { + ARROW_ASSIGN_OR_RAISE(auto arrow_schema, ImportSchema(schema.get())); - batches.push_back(std::move(*maybe_batch)); + std::vector> batches(arrays.size()); + size_t i = 0; + for (auto& array : arrays) { + ARROW_ASSIGN_OR_RAISE(auto batch, ImportRecordBatch(array.get(), arrow_schema)); + batches[i++] = std::move(batch); } + return Table::FromRecordBatches(std::move(arrow_schema), std::move(batches)); + } - auto maybe_table = Table::FromRecordBatches(*maybe_schema, batches); + void AssertEqualsTable(nanoarrow::UniqueSchema schema, + std::vector arrays, + const std::shared_ptr& expected) { + auto maybe_table = ToTable(std::move(schema), std::move(arrays)); FAIL_RESULT_NOT_OK(maybe_table); - - EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(**maybe_table_arrow, true)); + ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(), true)); + EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true)); } void TestIPCCheckJSON(const std::string& dir_prefix) { @@ -382,7 +455,8 @@ INSTANTIATE_TEST_SUITE_P( NanoarrowIpcTest, TestFileFixture, ::testing::Values( // Files in data/arrow-ipc-stream/integration/1.0.0-(little|big)endian/ - // should read without error and the data should match Arrow C++'s read + // should read without error and the data should match Arrow C++'s read. + // Also write the stream to a buffer and check Arrow C++'s read of that. TestFile::OK("generated_custom_metadata.stream"), TestFile::OK("generated_datetime.stream"), TestFile::OK("generated_decimal.stream"), diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c index 3227d20a6..5af1af1dc 100644 --- a/src/nanoarrow/ipc/writer.c +++ b/src/nanoarrow/ipc/writer.c @@ -25,6 +25,8 @@ void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src, struct ArrowIpcOutputStream* dst) { + NANOARROW_DCHECK(src != NULL && dst != NULL); + memcpy(dst, src, sizeof(struct ArrowIpcOutputStream)); src->release = NULL; } @@ -55,6 +57,8 @@ static void ArrowIpcOutputStreamBufferRelease(struct ArrowIpcOutputStream* strea ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* stream, struct ArrowBuffer* output) { + NANOARROW_DCHECK(stream != NULL && output != NULL); + struct ArrowIpcOutputStreamBufferPrivate* private_data = (struct ArrowIpcOutputStreamBufferPrivate*)ArrowMalloc( sizeof(struct ArrowIpcOutputStreamBufferPrivate)); @@ -130,6 +134,7 @@ static ArrowErrorCode ArrowIpcOutputStreamFileWrite(struct ArrowIpcOutputStream* ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, void* file_ptr, int close_on_release) { + NANOARROW_DCHECK(stream != NULL); if (file_ptr == NULL) { return EINVAL; } @@ -150,3 +155,156 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, stream->private_data = private_data; return NANOARROW_OK; } + +struct ArrowIpcArrayStreamWriterPrivate { + struct ArrowArrayStream in; + struct ArrowIpcOutputStream output_stream; + struct ArrowIpcEncoder encoder; + struct ArrowSchema schema; + struct ArrowArray array; + struct ArrowArrayView array_view; + struct ArrowBuffer buffer; + struct ArrowBuffer body_buffer; + int64_t buffer_cursor; + int64_t body_buffer_cursor; +}; + +ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter* writer, + struct ArrowArrayStream* in, + struct ArrowIpcOutputStream* output_stream) { + NANOARROW_DCHECK(writer != NULL && in != NULL && output_stream != NULL); + + struct ArrowIpcArrayStreamWriterPrivate* private = + (struct ArrowIpcArrayStreamWriterPrivate*)ArrowMalloc( + sizeof(struct ArrowIpcArrayStreamWriterPrivate)); + + if (private == NULL) { + return ENOMEM; + } + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder)); + ArrowIpcOutputStreamMove(output_stream, &private->output_stream); + ArrowArrayStreamMove(in, &private->in); + private->schema.release = NULL; + private->array.release = NULL; + ArrowArrayViewInitFromType(&private->array_view, NANOARROW_TYPE_UNINITIALIZED); + ArrowBufferInit(&private->buffer); + ArrowBufferInit(&private->body_buffer); + private->buffer_cursor = 0; + private->body_buffer_cursor = 0; + + writer->finished = 0; + writer->private_data = private; + return NANOARROW_OK; +} + +void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer) { + NANOARROW_DCHECK(writer != NULL); + + struct ArrowIpcArrayStreamWriterPrivate* private = + (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data; + + if (private != NULL) { + ArrowIpcEncoderReset(&private->encoder); + ArrowArrayStreamRelease(&private->in); + private->output_stream.release(&private->output_stream); + if (private->schema.release != NULL) { + ArrowSchemaRelease(&private->schema); + } + if (private->array.release != NULL) { + ArrowArrayRelease(&private->array); + } + ArrowArrayViewReset(&private->array_view); + ArrowBufferReset(&private->buffer); + ArrowBufferReset(&private->body_buffer); + + ArrowFree(private); + } + memset(writer, 0, sizeof(struct ArrowIpcArrayStreamWriter)); +} + +static ArrowErrorCode ArrowIpcArrayStreamWriterPush( + struct ArrowIpcArrayStreamWriterPrivate* private, struct ArrowBuffer* buffer, + int* had_bytes_to_push, struct ArrowError* error) { + int64_t* cursor = buffer == &private->buffer // + ? &private->buffer_cursor + : &private->body_buffer_cursor; + + *had_bytes_to_push = *cursor < buffer->size_bytes; + if (*had_bytes_to_push) { + // bytes remain in the buffer; push those + int64_t bytes_written; + NANOARROW_RETURN_NOT_OK(private->output_stream.write( + &private->output_stream, buffer->data + *cursor, buffer->size_bytes - *cursor, + &bytes_written, error)); + *cursor += bytes_written; + + if (*cursor == buffer->size_bytes) { + *cursor = buffer->size_bytes = 0; + } + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcArrayStreamWriterWriteSome( + struct ArrowIpcArrayStreamWriter* writer, struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); + + if (writer->finished) { + ArrowErrorSet(error, "ArrowIpcArrayStreamWriterWriteSome on a finished writer"); + return EINVAL; + } + + struct ArrowIpcArrayStreamWriterPrivate* private = + (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data; + + int had_bytes_to_push = 0; + + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterPush(private, &private->buffer, + &had_bytes_to_push, error)); + if (had_bytes_to_push) { + return NANOARROW_OK; + } + // buffer has no bytes to push, try body_buffer + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterPush(private, &private->body_buffer, + &had_bytes_to_push, error)); + if (had_bytes_to_push) { + return NANOARROW_OK; + } + + // get the next Message + if (private->schema.release == NULL) { + // The schema message has not been buffered yet; do that now. + NANOARROW_RETURN_NOT_OK( + ArrowArrayStreamGetSchema(&private->in, &private->schema, error)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcEncoderEncodeSchema(&private->encoder, &private->schema, error)); + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewInitFromSchema(&private->array_view, &private->schema, error)); + } else { + // Get the next array from the stream + NANOARROW_RETURN_NOT_OK( // XXX does private->array maybe need to be released? + ArrowArrayStreamGetNext(&private->in, &private->array, error)); + if (private->array.release == NULL) { + // The stream is complete, signal the end to the caller + writer->finished = 1; + return NANOARROW_OK; + } + + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewSetArray(&private->array_view, &private->array, error)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( + &private->encoder, &private->array_view, &private->body_buffer, error)); + } + + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, + &private->buffer), + error); + NANOARROW_DCHECK(private->buffer.size_bytes % 8 == 0); + NANOARROW_DCHECK(private->body_buffer.size_bytes % 8 == 0); + // Since we just finalized it, buffer won't be empty + return ArrowIpcArrayStreamWriterPush(private, &private->buffer, &had_bytes_to_push, + error); +} diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index e4856d074..4c83c976d 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -71,6 +71,12 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitFile) #define ArrowIpcOutputStreamMove \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamMove) +#define ArrowIpcArrayStreamWriterInit \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterInit) +#define ArrowIpcArrayStreamWriterReset \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterReset) +#define ArrowIpcArrayStreamWriterWriteSome \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterWriteSome) #endif @@ -491,6 +497,47 @@ ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* strea /// close_on_release and handle closing the file independently from stream. ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, void* file_ptr, int close_on_release); + +/// \brief A stream writer which encodes ArrowArrays into an IPC byte stream +/// +/// This structure is intended to be allocated by the caller, +/// initialized using ArrowIpcArrayStreamWriterInit(), and released with +/// ArrowIpcArrayStreamWriterReset(). +/// +/// Note: although ArrowArrayStream is a sufficient interface to wrap a stream reader, +/// a stream writer cannot be wrapped as an ArrowArrayStream. +struct ArrowIpcArrayStreamWriter { + /// \brief Set to non-zero after the writer is finished. + int finished; + + /// \brief Private resources managed by this library + void* private_data; +}; + +/// \brief Initialize an output stream of bytes from an ArrowArrayStream +/// +/// The writer will not pull from the input array stream or push to the +/// output stream until ArrowIpcArrayStreamWriterContinue(). +/// +/// Returns NANOARROW_OK on success. If NANOARROW_OK is returned the writer +/// takes ownership of both the input array stream and the output byte stream +/// and the caller is responsible for releasing the writer by calling +/// ArrowIpcArrayStreamWriterReset(). +ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter* writer, + struct ArrowArrayStream* in, + struct ArrowIpcOutputStream* output_stream); + +/// \brief Release all resources attached to a writer +void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer); + +/// \brief Write some bytes into the output byte stream +/// +/// The stream will first be queried for its Schema, then for all the RecordBatches in the +/// stream, each of which will be encoded and pushed into the output byte stream. +/// +/// Errors are propagated from the underlying streams. +ArrowErrorCode ArrowIpcArrayStreamWriterWriteSome( + struct ArrowIpcArrayStreamWriter* writer, struct ArrowError* error); /// @} #ifdef __cplusplus diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 8000ca053..792018935 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -95,6 +95,23 @@ inline void release_pointer(struct ArrowIpcOutputStream* data) { } } +template <> +inline void init_pointer(struct ArrowIpcArrayStreamWriter* data) { + data->private_data = nullptr; +} + +template <> +inline void move_pointer(struct ArrowIpcArrayStreamWriter* src, + struct ArrowIpcArrayStreamWriter* dst) { + memcpy(dst, src, sizeof(struct ArrowIpcArrayStreamWriter)); + src->private_data = nullptr; +} + +template <> +inline void release_pointer(struct ArrowIpcArrayStreamWriter* data) { + ArrowIpcArrayStreamWriterReset(data); +} + } // namespace internal } // namespace nanoarrow @@ -121,6 +138,9 @@ using UniqueInputStream = internal::Unique; /// \brief Class wrapping a unique struct ArrowIpcOutputStream using UniqueOutputStream = internal::Unique; +/// \brief Class wrapping a unique struct ArrowIpcArrayStreamWriter +using UniqueArrayStreamWriter = internal::Unique; + /// @} } // namespace ipc From b4dc8ed3e0fc8a2e119f4755a7bc5a51bb0cfee2 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Aug 2024 12:20:12 -0500 Subject: [PATCH 2/4] switch to blocking writes --- src/nanoarrow/ipc/files_test.cc | 58 +++----- src/nanoarrow/ipc/writer.c | 230 ++++++++++++++++++-------------- src/nanoarrow/nanoarrow_ipc.h | 80 ++++++----- src/nanoarrow/nanoarrow_ipc.hpp | 16 +-- 4 files changed, 202 insertions(+), 182 deletions(-) diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc index 271984215..9f6a8722c 100644 --- a/src/nanoarrow/ipc/files_test.cc +++ b/src/nanoarrow/ipc/files_test.cc @@ -194,58 +194,32 @@ class TestFile { return std::make_shared(content_copy_wrapped); } - // Set a trivial release callback which won't free() anything - template - static void SetTrivialRelease(void (**release)(T*)) { - *release = [](T* ptr) { ptr->release = nullptr; }; - } - ArrowErrorCode WriteNanoarrowStream(const nanoarrow::UniqueSchema& schema, const std::vector& arrays, struct ArrowBuffer* buffer, struct ArrowError* error) { - struct { - const nanoarrow::UniqueSchema& schema; - const std::vector& arrays; - size_t i = 0; - } stream_private{schema, arrays}; - - static auto get_private = [](struct ArrowArrayStream* stream) { - return static_cast(stream->private_data); - }; - - struct ArrowArrayStream in; - in.get_schema = [](struct ArrowArrayStream* stream, struct ArrowSchema* out) { - memcpy(out, get_private(stream)->schema.get(), sizeof(struct ArrowSchema)); - SetTrivialRelease(&out->release); - return 0; - }; - in.get_next = [](struct ArrowArrayStream* stream, struct ArrowArray* out) { - size_t i = get_private(stream)->i++; - out->release = nullptr; - if (i < get_private(stream)->arrays.size()) { - memcpy(out, get_private(stream)->arrays[i].get(), sizeof(struct ArrowArray)); - SetTrivialRelease(&out->release); - } - return 0; - }; - in.get_last_error = [](struct ArrowArrayStream*) { return "temp stream error"; }; - in.private_data = &stream_private; - SetTrivialRelease(&in.release); - nanoarrow::ipc::UniqueOutputStream output_stream; NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamInitBuffer(output_stream.get(), buffer)); - nanoarrow::ipc::UniqueArrayStreamWriter writer; + nanoarrow::ipc::UniqueEncoder encoder; + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(encoder.get())); + + nanoarrow::ipc::UniqueWriter writer; NANOARROW_RETURN_NOT_OK( - ArrowIpcArrayStreamWriterInit(writer.get(), &in, output_stream.get())); + ArrowIpcWriterInit(writer.get(), encoder.get(), output_stream.get())); - while (!writer->finished) { - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterWriteSome(writer.get(), error)); + nanoarrow::UniqueArrayView array_view; + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), error)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer.get(), schema.get(), error)); + for (const auto& array : arrays) { + NANOARROW_RETURN_NOT_OK( + ArrowArrayViewSetArray(array_view.get(), array.get(), error)); + NANOARROW_RETURN_NOT_OK( + ArrowIpcWriterWriteArrayView(writer.get(), array_view.get(), error)); } - writer.reset(); - - return NANOARROW_OK; + return ArrowIpcWriterWriteArrayView(writer.get(), nullptr, error); } void TestEqualsArrowCpp(const std::string& dir_prefix) { diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c index 5af1af1dc..a64c61f7a 100644 --- a/src/nanoarrow/ipc/writer.c +++ b/src/nanoarrow/ipc/writer.c @@ -156,155 +156,187 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, return NANOARROW_OK; } -struct ArrowIpcArrayStreamWriterPrivate { - struct ArrowArrayStream in; +struct ArrowIpcWriterPrivate { struct ArrowIpcOutputStream output_stream; struct ArrowIpcEncoder encoder; - struct ArrowSchema schema; - struct ArrowArray array; - struct ArrowArrayView array_view; struct ArrowBuffer buffer; struct ArrowBuffer body_buffer; int64_t buffer_cursor; int64_t body_buffer_cursor; }; -ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter* writer, - struct ArrowArrayStream* in, - struct ArrowIpcOutputStream* output_stream) { - NANOARROW_DCHECK(writer != NULL && in != NULL && output_stream != NULL); +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error) { + while (data.size_bytes != 0) { + int64_t bytes_written = 0; + NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, data.size_bytes, + &bytes_written, error)); + data.size_bytes -= bytes_written; + data.data.as_uint8 += bytes_written; + } + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcEncoder* encoder, + struct ArrowIpcOutputStream* output_stream) { + NANOARROW_DCHECK(writer != NULL && encoder != NULL && output_stream != NULL); - struct ArrowIpcArrayStreamWriterPrivate* private = - (struct ArrowIpcArrayStreamWriterPrivate*)ArrowMalloc( - sizeof(struct ArrowIpcArrayStreamWriterPrivate)); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate)); if (private == NULL) { return ENOMEM; } - NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder)); ArrowIpcOutputStreamMove(output_stream, &private->output_stream); - ArrowArrayStreamMove(in, &private->in); - private->schema.release = NULL; - private->array.release = NULL; - ArrowArrayViewInitFromType(&private->array_view, NANOARROW_TYPE_UNINITIALIZED); + + memcpy(&private->encoder, encoder, sizeof(struct ArrowIpcEncoder)); + encoder->private_data = NULL; + ArrowBufferInit(&private->buffer); ArrowBufferInit(&private->body_buffer); private->buffer_cursor = 0; private->body_buffer_cursor = 0; - writer->finished = 0; writer->private_data = private; return NANOARROW_OK; } -void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer) { +void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) { NANOARROW_DCHECK(writer != NULL); - struct ArrowIpcArrayStreamWriterPrivate* private = - (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data; + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; if (private != NULL) { ArrowIpcEncoderReset(&private->encoder); - ArrowArrayStreamRelease(&private->in); private->output_stream.release(&private->output_stream); - if (private->schema.release != NULL) { - ArrowSchemaRelease(&private->schema); - } - if (private->array.release != NULL) { - ArrowArrayRelease(&private->array); - } - ArrowArrayViewReset(&private->array_view); ArrowBufferReset(&private->buffer); ArrowBufferReset(&private->body_buffer); ArrowFree(private); } - memset(writer, 0, sizeof(struct ArrowIpcArrayStreamWriter)); + memset(writer, 0, sizeof(struct ArrowIpcWriter)); } -static ArrowErrorCode ArrowIpcArrayStreamWriterPush( - struct ArrowIpcArrayStreamWriterPrivate* private, struct ArrowBuffer* buffer, - int* had_bytes_to_push, struct ArrowError* error) { - int64_t* cursor = buffer == &private->buffer // - ? &private->buffer_cursor - : &private->body_buffer_cursor; - - *had_bytes_to_push = *cursor < buffer->size_bytes; - if (*had_bytes_to_push) { - // bytes remain in the buffer; push those - int64_t bytes_written; - NANOARROW_RETURN_NOT_OK(private->output_stream.write( - &private->output_stream, buffer->data + *cursor, buffer->size_bytes - *cursor, - &bytes_written, error)); - *cursor += bytes_written; - - if (*cursor == buffer->size_bytes) { - *cursor = buffer->size_bytes = 0; - } +static struct ArrowBufferView ArrowBufferToBufferView(const struct ArrowBuffer* buffer) { + struct ArrowBufferView buffer_view = { + .data.as_uint8 = buffer->data, + .size_bytes = buffer->size_bytes, + }; + return buffer_view; +} + +// Eventually, it may be necessary to construct an ArrowIpcWriter which doesn't rely on +// blocking writes (ArrowIpcOutputStreamWrite). For example an ArrowIpcOutputStream +// might wrap a socket which is not always able to transmit all bytes of a Message. In +// that case users of ArrowIpcWriter might prefer to do other work until a socket is +// ready rather than blocking, or timeout, or otherwise respond to partial transmission. +// +// This could be handled by: +// - keeping partially sent buffers internal and signalling incomplete transmission by +// raising EAGAIN, returning "bytes actually written", ... +// - when the caller is ready to try again, call ArrowIpcWriterWriteSome() +// - exposing internal buffers which have not been completely sent, deferring +// follow-up transmission to the caller + +ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer, + const struct ArrowSchema* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, error)); + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, + &private->buffer), + error); + + return ArrowIpcOutputStreamWrite(&private->output_stream, + ArrowBufferToBufferView(&private->buffer), error); +} + +ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, + const struct ArrowArrayView* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); + struct ArrowIpcWriterPrivate* private = + (struct ArrowIpcWriterPrivate*)writer->private_data; + + if (in == NULL) { + int32_t eos[] = {-1, 0}; + struct ArrowBufferView buffer_view = { + .data.as_int32 = eos, + .size_bytes = sizeof(eos), + }; + return ArrowIpcOutputStreamWrite(&private->output_stream, buffer_view, error); } + + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); + NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( + &private->encoder, in, &private->body_buffer, error)); + + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->buffer), error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite( + &private->output_stream, ArrowBufferToBufferView(&private->body_buffer), error)); return NANOARROW_OK; } -ArrowErrorCode ArrowIpcArrayStreamWriterWriteSome( - struct ArrowIpcArrayStreamWriter* writer, struct ArrowError* error) { - NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL); +ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, + struct ArrowArrayStream* in, + struct ArrowError* error) { + NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); - if (writer->finished) { - ArrowErrorSet(error, "ArrowIpcArrayStreamWriterWriteSome on a finished writer"); - return EINVAL; - } + ArrowErrorCode error_code = NANOARROW_OK; - struct ArrowIpcArrayStreamWriterPrivate* private = - (struct ArrowIpcArrayStreamWriterPrivate*)writer->private_data; + struct ArrowSchema schema; + ArrowSchemaInit(&schema); - int had_bytes_to_push = 0; + struct ArrowArray array = {.release = NULL}; - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterPush(private, &private->buffer, - &had_bytes_to_push, error)); - if (had_bytes_to_push) { - return NANOARROW_OK; - } - // buffer has no bytes to push, try body_buffer - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamWriterPush(private, &private->body_buffer, - &had_bytes_to_push, error)); - if (had_bytes_to_push) { - return NANOARROW_OK; - } + struct ArrowArrayView array_view; + ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED); - // get the next Message - if (private->schema.release == NULL) { - // The schema message has not been buffered yet; do that now. - NANOARROW_RETURN_NOT_OK( - ArrowArrayStreamGetSchema(&private->in, &private->schema, error)); - NANOARROW_RETURN_NOT_OK( - ArrowIpcEncoderEncodeSchema(&private->encoder, &private->schema, error)); - NANOARROW_RETURN_NOT_OK( - ArrowArrayViewInitFromSchema(&private->array_view, &private->schema, error)); - } else { - // Get the next array from the stream - NANOARROW_RETURN_NOT_OK( // XXX does private->array maybe need to be released? - ArrowArrayStreamGetNext(&private->in, &private->array, error)); - if (private->array.release == NULL) { + while (!error_code) { +#define NANOARROW_BREAK_NOT_OK(expr) \ + error_code = (expr); \ + if (error_code) break; + + if (schema.format == NULL) { + NANOARROW_BREAK_NOT_OK(ArrowArrayStreamGetSchema(in, &schema, error)); + NANOARROW_BREAK_NOT_OK(ArrowArrayViewInitFromSchema(&array_view, &schema, error)); + NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteSchema(writer, &schema, error)); + } + + if (array.release != NULL) { + ArrowArrayRelease(&array); + } + + NANOARROW_BREAK_NOT_OK(ArrowArrayStreamGetNext(in, &array, error)); + NANOARROW_BREAK_NOT_OK(ArrowArrayViewSetArray(&array_view, &array, error)); + + if (array.release == NULL) { // The stream is complete, signal the end to the caller - writer->finished = 1; - return NANOARROW_OK; + NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteArrayView(writer, NULL, error)); + break; } - NANOARROW_RETURN_NOT_OK( - ArrowArrayViewSetArray(&private->array_view, &private->array, error)); + NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteArrayView(writer, &array_view, error)); - NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch( - &private->encoder, &private->array_view, &private->body_buffer, error)); +#undef NANOARROW_BREAK_NOT_OK } - NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1, - &private->buffer), - error); - NANOARROW_DCHECK(private->buffer.size_bytes % 8 == 0); - NANOARROW_DCHECK(private->body_buffer.size_bytes % 8 == 0); - // Since we just finalized it, buffer won't be empty - return ArrowIpcArrayStreamWriterPush(private, &private->buffer, &had_bytes_to_push, - error); + ArrowSchemaRelease(&schema); + ArrowArrayViewReset(&array_view); + if (array.release != NULL) { + ArrowArrayRelease(&array); + } + return error_code; } diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index 4c83c976d..bc3d2c568 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -69,14 +69,18 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitBuffer) #define ArrowIpcOutputStreamInitFile \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamInitFile) +#define ArrowIpcOutputStreamWrite \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamWrite) #define ArrowIpcOutputStreamMove \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcOutputStreamMove) -#define ArrowIpcArrayStreamWriterInit \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterInit) -#define ArrowIpcArrayStreamWriterReset \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterReset) -#define ArrowIpcArrayStreamWriterWriteSome \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamWriterWriteSome) +#define ArrowIpcWriterInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterInit) +#define ArrowIpcWriterReset NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterReset) +#define ArrowIpcWriterWriteSchema \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteSchema) +#define ArrowIpcWriterWriteArrayView \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayView) +#define ArrowIpcWriterWriteArrayStream \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcWriterWriteArrayStream) #endif @@ -498,48 +502,58 @@ ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* strea ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, void* file_ptr, int close_on_release); -/// \brief A stream writer which encodes ArrowArrays into an IPC byte stream +/// \brief Write to a stream, trying again until all are written or the stream errors. +ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, + struct ArrowBufferView data, + struct ArrowError* error); + +/// \brief A stream writer which encodes Schemas and ArrowArrays into an IPC byte stream /// /// This structure is intended to be allocated by the caller, -/// initialized using ArrowIpcArrayStreamWriterInit(), and released with -/// ArrowIpcArrayStreamWriterReset(). -/// -/// Note: although ArrowArrayStream is a sufficient interface to wrap a stream reader, -/// a stream writer cannot be wrapped as an ArrowArrayStream. -struct ArrowIpcArrayStreamWriter { - /// \brief Set to non-zero after the writer is finished. - int finished; - +/// initialized using ArrowIpcWriterInit(), and released with +/// ArrowIpcWriterReset(). +struct ArrowIpcWriter { /// \brief Private resources managed by this library void* private_data; }; /// \brief Initialize an output stream of bytes from an ArrowArrayStream /// -/// The writer will not pull from the input array stream or push to the -/// output stream until ArrowIpcArrayStreamWriterContinue(). -/// /// Returns NANOARROW_OK on success. If NANOARROW_OK is returned the writer -/// takes ownership of both the input array stream and the output byte stream -/// and the caller is responsible for releasing the writer by calling -/// ArrowIpcArrayStreamWriterReset(). -ArrowErrorCode ArrowIpcArrayStreamWriterInit(struct ArrowIpcArrayStreamWriter* writer, - struct ArrowArrayStream* in, - struct ArrowIpcOutputStream* output_stream); +/// takes ownership of the output byte stream and the encoder, and the caller is +/// responsible for releasing the writer by calling ArrowIpcWriterReset(). +ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, + struct ArrowIpcEncoder* encoder, + struct ArrowIpcOutputStream* output_stream); /// \brief Release all resources attached to a writer -void ArrowIpcArrayStreamWriterReset(struct ArrowIpcArrayStreamWriter* writer); +void ArrowIpcWriterReset(struct ArrowIpcWriter* writer); -/// \brief Write some bytes into the output byte stream +/// \brief Write a schema to the output byte stream /// -/// The stream will first be queried for its Schema, then for all the RecordBatches in the -/// stream, each of which will be encoded and pushed into the output byte stream. +/// Errors are propagated from the underlying encoder and output byte stream. +ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer, + const struct ArrowSchema* in, + struct ArrowError* error); + +/// \brief Write an array view to the output byte stream /// -/// Errors are propagated from the underlying streams. -ArrowErrorCode ArrowIpcArrayStreamWriterWriteSome( - struct ArrowIpcArrayStreamWriter* writer, struct ArrowError* error); -/// @} +/// The array view may be NULL, in which case an EOS will be written. +/// The writer does not check that a schema was already written. +/// +/// Errors are propagated from the underlying encoder and output byte stream, +ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, + const struct ArrowArrayView* in, + struct ArrowError* error); +/// \brief Write an entire stream (including EOS) to the output byte stream +/// +/// Errors are propagated from the underlying encoder, array stream, and output byte +/// stream. +ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, + struct ArrowArrayStream* in, + struct ArrowError* error); +/// @} #ifdef __cplusplus } #endif diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 792018935..759ee59f0 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -96,20 +96,20 @@ inline void release_pointer(struct ArrowIpcOutputStream* data) { } template <> -inline void init_pointer(struct ArrowIpcArrayStreamWriter* data) { +inline void init_pointer(struct ArrowIpcWriter* data) { data->private_data = nullptr; } template <> -inline void move_pointer(struct ArrowIpcArrayStreamWriter* src, - struct ArrowIpcArrayStreamWriter* dst) { - memcpy(dst, src, sizeof(struct ArrowIpcArrayStreamWriter)); +inline void move_pointer(struct ArrowIpcWriter* src, + struct ArrowIpcWriter* dst) { + memcpy(dst, src, sizeof(struct ArrowIpcWriter)); src->private_data = nullptr; } template <> -inline void release_pointer(struct ArrowIpcArrayStreamWriter* data) { - ArrowIpcArrayStreamWriterReset(data); +inline void release_pointer(struct ArrowIpcWriter* data) { + ArrowIpcWriterReset(data); } } // namespace internal @@ -138,8 +138,8 @@ using UniqueInputStream = internal::Unique; /// \brief Class wrapping a unique struct ArrowIpcOutputStream using UniqueOutputStream = internal::Unique; -/// \brief Class wrapping a unique struct ArrowIpcArrayStreamWriter -using UniqueArrayStreamWriter = internal::Unique; +/// \brief Class wrapping a unique struct ArrowIpcWriter +using UniqueWriter = internal::Unique; /// @} From 91faa51758687372a1e098471a50a88ebdb5d60b Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Aug 2024 15:10:09 -0500 Subject: [PATCH 3/4] move encoder internal to writer, use Impl() instead of macro --- src/nanoarrow/common/inline_buffer.h | 4 +- src/nanoarrow/common/inline_types.h | 2 +- src/nanoarrow/ipc/encoder.c | 19 ++++-- src/nanoarrow/ipc/files_test.cc | 7 +-- src/nanoarrow/ipc/writer.c | 87 ++++++++++++---------------- src/nanoarrow/nanoarrow_ipc.h | 3 +- src/nanoarrow/nanoarrow_ipc.hpp | 3 +- 7 files changed, 56 insertions(+), 69 deletions(-) diff --git a/src/nanoarrow/common/inline_buffer.h b/src/nanoarrow/common/inline_buffer.h index caa6be4aa..875bfeaae 100644 --- a/src/nanoarrow/common/inline_buffer.h +++ b/src/nanoarrow/common/inline_buffer.h @@ -451,8 +451,8 @@ static inline void ArrowBitClear(uint8_t* bits, int64_t i) { } static inline void ArrowBitSetTo(uint8_t* bits, int64_t i, uint8_t bit_is_set) { - bits[i / 8] ^= - ((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i / 8])) & _ArrowkBitmask[i % 8]; + bits[i / 8] ^= (uint8_t)(((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i / 8])) & + _ArrowkBitmask[i % 8]); } static inline void ArrowBitsSetTo(uint8_t* bits, int64_t start_offset, int64_t length, diff --git a/src/nanoarrow/common/inline_types.h b/src/nanoarrow/common/inline_types.h index c74b1a439..015cc5416 100644 --- a/src/nanoarrow/common/inline_types.h +++ b/src/nanoarrow/common/inline_types.h @@ -314,7 +314,7 @@ static inline void ArrowErrorSetString(struct ArrowError* error, const char* src #define NANOARROW_DCHECK(EXPR) _NANOARROW_DCHECK_IMPL(EXPR, #EXPR) #else #define NANOARROW_ASSERT_OK(EXPR) (void)(EXPR) -#define NANOARROW_DCHECK(EXPR) (void)(EXPR) +#define NANOARROW_DCHECK(EXPR) #endif static inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dst) { diff --git a/src/nanoarrow/ipc/encoder.c b/src/nanoarrow/ipc/encoder.c index 8497616b0..53e3610ea 100644 --- a/src/nanoarrow/ipc/encoder.c +++ b/src/nanoarrow/ipc/encoder.c @@ -230,24 +230,31 @@ static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, case NANOARROW_TYPE_TIMESTAMP: FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error); - FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit), error); + FLATCC_RETURN_UNLESS_0( + Timestamp_unit_add(builder, (ns(TimeUnit_enum_t))schema_view->time_unit), + error); FLATCC_RETURN_UNLESS_0( Timestamp_timezone_create_str(builder, schema_view->timezone), error); FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_TIME32: - FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32), - error); + FLATCC_RETURN_UNLESS_0( + Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit, + 32), + error); return NANOARROW_OK; case NANOARROW_TYPE_TIME64: - FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64), - error); + FLATCC_RETURN_UNLESS_0( + Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit, + 64), + error); return NANOARROW_OK; case NANOARROW_TYPE_DURATION: - FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit), + FLATCC_RETURN_UNLESS_0(Field_type_Duration_create( + builder, (ns(TimeUnit_enum_t))schema_view->time_unit), error); return NANOARROW_OK; diff --git a/src/nanoarrow/ipc/files_test.cc b/src/nanoarrow/ipc/files_test.cc index 9f6a8722c..9f8f65ac5 100644 --- a/src/nanoarrow/ipc/files_test.cc +++ b/src/nanoarrow/ipc/files_test.cc @@ -201,12 +201,8 @@ class TestFile { nanoarrow::ipc::UniqueOutputStream output_stream; NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamInitBuffer(output_stream.get(), buffer)); - nanoarrow::ipc::UniqueEncoder encoder; - NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(encoder.get())); - nanoarrow::ipc::UniqueWriter writer; - NANOARROW_RETURN_NOT_OK( - ArrowIpcWriterInit(writer.get(), encoder.get(), output_stream.get())); + NANOARROW_RETURN_NOT_OK(ArrowIpcWriterInit(writer.get(), output_stream.get())); nanoarrow::UniqueArrayView array_view; NANOARROW_RETURN_NOT_OK( @@ -216,6 +212,7 @@ class TestFile { for (const auto& array : arrays) { NANOARROW_RETURN_NOT_OK( ArrowArrayViewSetArray(array_view.get(), array.get(), error)); + NANOARROW_RETURN_NOT_OK( ArrowIpcWriterWriteArrayView(writer.get(), array_view.get(), error)); } diff --git a/src/nanoarrow/ipc/writer.c b/src/nanoarrow/ipc/writer.c index a64c61f7a..776a22cd8 100644 --- a/src/nanoarrow/ipc/writer.c +++ b/src/nanoarrow/ipc/writer.c @@ -157,12 +157,10 @@ ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream, } struct ArrowIpcWriterPrivate { - struct ArrowIpcOutputStream output_stream; struct ArrowIpcEncoder encoder; + struct ArrowIpcOutputStream output_stream; struct ArrowBuffer buffer; struct ArrowBuffer body_buffer; - int64_t buffer_cursor; - int64_t body_buffer_cursor; }; ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, @@ -179,9 +177,8 @@ ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream, } ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, - struct ArrowIpcEncoder* encoder, struct ArrowIpcOutputStream* output_stream) { - NANOARROW_DCHECK(writer != NULL && encoder != NULL && output_stream != NULL); + NANOARROW_DCHECK(writer != NULL && output_stream != NULL); struct ArrowIpcWriterPrivate* private = (struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate)); @@ -189,16 +186,11 @@ ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, if (private == NULL) { return ENOMEM; } - + NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder)); ArrowIpcOutputStreamMove(output_stream, &private->output_stream); - memcpy(&private->encoder, encoder, sizeof(struct ArrowIpcEncoder)); - encoder->private_data = NULL; - ArrowBufferInit(&private->buffer); ArrowBufferInit(&private->body_buffer); - private->buffer_cursor = 0; - private->body_buffer_cursor = 0; writer->private_data = private; return NANOARROW_OK; @@ -269,11 +261,8 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, if (in == NULL) { int32_t eos[] = {-1, 0}; - struct ArrowBufferView buffer_view = { - .data.as_int32 = eos, - .size_bytes = sizeof(eos), - }; - return ArrowIpcOutputStreamWrite(&private->output_stream, buffer_view, error); + struct ArrowBufferView data = {.data.as_int32 = eos, .size_bytes = sizeof(eos)}; + return ArrowIpcOutputStreamWrite(&private->output_stream, data, error); } NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0)); @@ -289,54 +278,50 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer, return NANOARROW_OK; } +static ArrowErrorCode ArrowIpcWriterWriteArrayStreamImpl( + struct ArrowIpcWriter* writer, struct ArrowArrayStream* in, + struct ArrowSchema* schema, struct ArrowArray* array, + struct ArrowArrayView* array_view, struct ArrowError* error) { + NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(in, schema, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer, schema, error)); + + NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromSchema(array_view, schema, error)); + while (1) { + NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(in, array, error)); + if (array->release == NULL) { + break; + } + + NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view, array, error)); + NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteArrayView(writer, array_view, error)); + ArrowArrayRelease(array); + } + + // The stream is complete, signal the end to the caller + return ArrowIpcWriterWriteArrayView(writer, NULL, error); +} + ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer, struct ArrowArrayStream* in, struct ArrowError* error) { NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL); - ArrowErrorCode error_code = NANOARROW_OK; - - struct ArrowSchema schema; - ArrowSchemaInit(&schema); - + struct ArrowSchema schema = {.release = NULL}; struct ArrowArray array = {.release = NULL}; - struct ArrowArrayView array_view; ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED); - while (!error_code) { -#define NANOARROW_BREAK_NOT_OK(expr) \ - error_code = (expr); \ - if (error_code) break; + NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteArrayStreamImpl(writer, in, &schema, &array, + &array_view, error)); - if (schema.format == NULL) { - NANOARROW_BREAK_NOT_OK(ArrowArrayStreamGetSchema(in, &schema, error)); - NANOARROW_BREAK_NOT_OK(ArrowArrayViewInitFromSchema(&array_view, &schema, error)); - NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteSchema(writer, &schema, error)); - } - - if (array.release != NULL) { - ArrowArrayRelease(&array); - } - - NANOARROW_BREAK_NOT_OK(ArrowArrayStreamGetNext(in, &array, error)); - NANOARROW_BREAK_NOT_OK(ArrowArrayViewSetArray(&array_view, &array, error)); - - if (array.release == NULL) { - // The stream is complete, signal the end to the caller - NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteArrayView(writer, NULL, error)); - break; - } - - NANOARROW_BREAK_NOT_OK(ArrowIpcWriterWriteArrayView(writer, &array_view, error)); - -#undef NANOARROW_BREAK_NOT_OK + if (schema.release != NULL) { + ArrowSchemaRelease(&schema); } - ArrowSchemaRelease(&schema); - ArrowArrayViewReset(&array_view); if (array.release != NULL) { ArrowArrayRelease(&array); } - return error_code; + + ArrowArrayViewReset(&array_view); + return NANOARROW_OK; } diff --git a/src/nanoarrow/nanoarrow_ipc.h b/src/nanoarrow/nanoarrow_ipc.h index bc3d2c568..1e58c8dca 100644 --- a/src/nanoarrow/nanoarrow_ipc.h +++ b/src/nanoarrow/nanoarrow_ipc.h @@ -520,10 +520,9 @@ struct ArrowIpcWriter { /// \brief Initialize an output stream of bytes from an ArrowArrayStream /// /// Returns NANOARROW_OK on success. If NANOARROW_OK is returned the writer -/// takes ownership of the output byte stream and the encoder, and the caller is +/// takes ownership of the output byte stream, and the caller is /// responsible for releasing the writer by calling ArrowIpcWriterReset(). ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer, - struct ArrowIpcEncoder* encoder, struct ArrowIpcOutputStream* output_stream); /// \brief Release all resources attached to a writer diff --git a/src/nanoarrow/nanoarrow_ipc.hpp b/src/nanoarrow/nanoarrow_ipc.hpp index 759ee59f0..6cf7bc228 100644 --- a/src/nanoarrow/nanoarrow_ipc.hpp +++ b/src/nanoarrow/nanoarrow_ipc.hpp @@ -101,8 +101,7 @@ inline void init_pointer(struct ArrowIpcWriter* data) { } template <> -inline void move_pointer(struct ArrowIpcWriter* src, - struct ArrowIpcWriter* dst) { +inline void move_pointer(struct ArrowIpcWriter* src, struct ArrowIpcWriter* dst) { memcpy(dst, src, sizeof(struct ArrowIpcWriter)); src->private_data = nullptr; } From 15caa194cc45dba1c114dede311c9b674e3b7db4 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 8 Aug 2024 10:52:01 -0500 Subject: [PATCH 4/4] suppress valgrind in flatcc_builder_create_cached_vtable --- valgrind.supp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/valgrind.supp b/valgrind.supp index c17530414..ddd0c064d 100644 --- a/valgrind.supp +++ b/valgrind.supp @@ -66,3 +66,10 @@ fun:base64_encode fun:R_base64_encode } + +# TODO https://github.com/apache/arrow-nanoarrow/issues/579 remove this +{ + :flatcc uses realloc() and valgrind thinks something was free'd + Memcheck:Addr4 + fun:flatcc_builder_create_cached_vtable +}