Skip to content

Commit

Permalink
fix: IPC streams did not include RecordBatch headers (#582)
Browse files Browse the repository at this point in the history
- As noted
#571 (comment)
RecordBatch Messages were not being written during IPC streaming. This
has been corrected
- Testing uses one more round trip case where we have arrow C++ read
from a stream written by nanoarrow.
- During testing it also became apparent that encapsulated messages were
not always stored with the correct size; metadata_size should include
padding (but not the continuation or size itself)
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
  • Loading branch information
bkietz authored Aug 12, 2024
1 parent cfae94b commit 16f4306
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
40 changes: 22 additions & 18 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,38 @@ void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) {
memset(encoder, 0, sizeof(struct ArrowIpcEncoder));
}

static ArrowErrorCode ArrowIpcEncoderWriteContinuationAndSize(struct ArrowBuffer* out,
size_t size) {
_NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);
NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(out, -1));

if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
return ArrowBufferAppendInt32(out, (int32_t)bswap32((uint32_t)size));
} else {
return ArrowBufferAppendInt32(out, (int32_t)size);
}
}

ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
char encapsulate, struct ArrowBuffer* out) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != NULL);
struct ArrowIpcEncoderPrivate* private =
(struct ArrowIpcEncoderPrivate*)encoder->private_data;

size_t size = flatcc_builder_get_buffer_size(&private->builder);
_NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX);

int32_t header[] = {-1, (int32_t)size};
if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
header[1] = (int32_t)bswap32((uint32_t)size);
if (encapsulate) {
int64_t padded_size = _ArrowRoundUpToMultipleOf8(size);
NANOARROW_RETURN_NOT_OK(
ArrowBufferReserve(out, sizeof(int32_t) + sizeof(int32_t) + padded_size));
NANOARROW_ASSERT_OK(ArrowIpcEncoderWriteContinuationAndSize(out, padded_size));
} else {
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
}

if (size == 0) {
// Finalizing an empty flatcc_builder_t triggers an assertion
return encapsulate ? ArrowBufferAppend(out, &header, sizeof(header)) : NANOARROW_OK;
}

if (encapsulate) {
int64_t encapsulated_size =
_ArrowRoundUpToMultipleOf8(sizeof(int32_t) + sizeof(int32_t) + size);
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, encapsulated_size));
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(out, &header, sizeof(header)));
} else {
NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size));
return NANOARROW_OK;
}

void* data =
Expand All @@ -112,11 +118,9 @@ ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder,
NANOARROW_UNUSED(data);
out->size_bytes += size;

if (encapsulate) {
while (encapsulate && out->size_bytes % 8 != 0) {
// zero padding bytes, if any
int64_t padded_size = _ArrowRoundUpToMultipleOf8(out->size_bytes);
memset(out->data + out->size_bytes, 0, padded_size - out->size_bytes);
out->size_bytes = padded_size;
out->data[out->size_bytes++] = 0;
}

// don't deallocate yet, just wipe the builder's current Message
Expand Down
81 changes: 56 additions & 25 deletions src/nanoarrow/ipc/files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ class TestFile {
return path_.substr(0, dot_pos) + std::string(".json.gz");
}

ArrowErrorCode GetArrowArrayStreamIPC(struct ArrowBuffer* content,
ArrowArrayStream* out, ArrowError* error) {
nanoarrow::ipc::UniqueInputStream input;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcInputStreamInitBuffer(input.get(), content), error);
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcArrayStreamReaderInit(out, input.get(), nullptr), error);
return NANOARROW_OK;
}

ArrowErrorCode GetArrowArrayStreamIPC(const std::string& dir_prefix,
ArrowArrayStream* out, ArrowError* error) {
std::stringstream path_builder;
Expand All @@ -96,28 +106,19 @@ class TestFile {
// Read using nanoarrow_ipc
nanoarrow::UniqueBuffer content;
NANOARROW_RETURN_NOT_OK(ReadFileBuffer(path_builder.str(), content.get(), error));

struct ArrowIpcInputStream input;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcInputStreamInitBuffer(&input, content.get()), error);
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcArrayStreamReaderInit(out, &input, nullptr), error);
return NANOARROW_OK;
return GetArrowArrayStreamIPC(content.get(), out, error);
}

ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
ArrowErrorCode ReadArrowArrayStreamIPC(struct ArrowArrayStream* stream,
struct ArrowSchema* schema,
std::vector<nanoarrow::UniqueArray>* arrays,
ArrowError* error) {
nanoarrow::UniqueArrayStream stream;
NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), error));

NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema, error));
NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream, schema, error));

while (true) {
nanoarrow::UniqueArray array;

NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(), array.get(), error));
NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream, array.get(), error));

if (array->release == nullptr) {
break;
Expand All @@ -128,6 +129,15 @@ class TestFile {
return NANOARROW_OK;
}

ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
struct ArrowSchema* schema,
std::vector<nanoarrow::UniqueArray>* arrays,
ArrowError* error) {
nanoarrow::UniqueArrayStream stream;
NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), error));
return ReadArrowArrayStreamIPC(stream.get(), schema, arrays, error);
}

ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix,
ArrowArrayStream* out, ArrowError* error) {
std::stringstream path_builder;
Expand Down Expand Up @@ -250,20 +260,36 @@ class TestFile {

// Read the same file with Arrow C++
auto maybe_table_arrow = ReadTable(io::ReadableFile::Open(path_builder.str()));
FAIL_RESULT_NOT_OK(maybe_table_arrow);
{
SCOPED_TRACE("Read the same file with Arrow C++");
FAIL_RESULT_NOT_OK(maybe_table_arrow);
AssertEqualsTable(std::move(schema), std::move(arrays),
maybe_table_arrow.ValueUnsafe());
}

AssertEqualsTable(std::move(schema), std::move(arrays),
maybe_table_arrow.ValueUnsafe());
auto maybe_table_roundtripped = ReadTable(BufferInputStream(roundtripped.get()));
{
SCOPED_TRACE("Read the roundtripped buffer using Arrow C++");
FAIL_RESULT_NOT_OK(maybe_table_roundtripped);

AssertEqualsTable(maybe_table_roundtripped.ValueUnsafe(),
maybe_table_arrow.ValueUnsafe());
}

// Read the roundtripped buffer using nanoarrow
nanoarrow::UniqueSchema roundtripped_schema;
std::vector<nanoarrow::UniqueArray> roundtripped_arrays;
ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(),
&roundtripped_arrays, &error),
NANOARROW_OK);

AssertEqualsTable(std::move(roundtripped_schema), std::move(roundtripped_arrays),
maybe_table_arrow.ValueUnsafe());
{
SCOPED_TRACE("Read the roundtripped buffer using nanoarrow");
nanoarrow::UniqueArrayStream array_stream;
ASSERT_EQ(GetArrowArrayStreamIPC(roundtripped.get(), array_stream.get(), &error),
NANOARROW_OK);
ASSERT_EQ(ReadArrowArrayStreamIPC(array_stream.get(), roundtripped_schema.get(),
&roundtripped_arrays, &error),
NANOARROW_OK);

AssertEqualsTable(std::move(roundtripped_schema), std::move(roundtripped_arrays),
maybe_table_arrow.ValueUnsafe());
}
}

Result<std::shared_ptr<Table>> ReadTable(
Expand All @@ -286,13 +312,18 @@ class TestFile {
return Table::FromRecordBatches(std::move(arrow_schema), std::move(batches));
}

void AssertEqualsTable(const std::shared_ptr<Table>& actual,
const std::shared_ptr<Table>& expected) {
ASSERT_TRUE(actual->schema()->Equals(expected->schema(), true));
EXPECT_TRUE(actual->Equals(*expected, true));
}

void AssertEqualsTable(nanoarrow::UniqueSchema schema,
std::vector<nanoarrow::UniqueArray> arrays,
const std::shared_ptr<Table>& expected) {
auto maybe_table = ToTable(std::move(schema), std::move(arrays));
FAIL_RESULT_NOT_OK(maybe_table);
ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(), true));
EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true));
AssertEqualsTable(maybe_table.ValueUnsafe(), expected);
}

void TestIPCCheckJSON(const std::string& dir_prefix) {
Expand Down
4 changes: 4 additions & 0 deletions src/nanoarrow/ipc/writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,

NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
&private->encoder, in, &private->body_buffer, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
&private->buffer),
error);

NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
&private->output_stream, ArrowBufferToBufferView(&private->buffer), error));
Expand Down

0 comments on commit 16f4306

Please sign in to comment.