Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Footer decoding #598

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions src/nanoarrow/integration/ipc_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ int main(int argc, char** argv) try {
}

struct File {
File(FILE* file) : file_{file} {}
File() = default;

~File() {
if (file_ != nullptr) {
fclose(file_);
Expand Down Expand Up @@ -166,35 +169,62 @@ struct MaterializedArrayStream {
// Footer).
File ipc_file;
NANOARROW_RETURN_NOT_OK(ipc_file.open(path, "rb", error));
return FromIpcFile(ipc_file, error);
}
auto bytes = ipc_file.read();

ArrowErrorCode FromIpcFile(FILE* ipc_file, struct ArrowError* error) {
char prefix[sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)] = {};
if (fread(&prefix, 1, sizeof(prefix), ipc_file) < sizeof(prefix)) {
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld",
sizeof(prefix), ftell(ipc_file));
auto min_size = sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC) + sizeof(int32_t) +
strlen(NANOARROW_IPC_FILE_PADDED_MAGIC);
if (bytes.size() < min_size) {
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld", min_size,
bytes.size());
return EINVAL;
}

if (memcmp(&prefix, NANOARROW_IPC_FILE_PADDED_MAGIC, sizeof(prefix)) != 0) {
if (memcmp(bytes.data(), NANOARROW_IPC_FILE_PADDED_MAGIC,
sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)) != 0) {
ArrowErrorSet(error, "File did not begin with 'ARROW1\\0\\0'");
return EINVAL;
}

nanoarrow::ipc::UniqueInputStream input_stream;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcInputStreamInitFile(input_stream.get(), ipc_file,
/*close_on_release=*/false),
error);
nanoarrow::ipc::UniqueDecoder decoder;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowIpcDecoderInit(decoder.get()), error);
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyFooter(
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeFooter(
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error));
Comment on lines +190 to +193
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should verify and decode be the same function rather than separate functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following the pattern of schema decoding, where the two are separated. Verification checks offsets to ensure all flatbuffers are in bounds, while decode actually copies flatbuffers into nanoarrow data structures. I'd be fine consolidating them but that should be a follow up

@paleolimbot

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely possible to consolidate them...I think I separated them initially in case somebody owned their whole pipeline and wanted to skip it as an optimization (which is probably not actually any faster, and probably nobody will want do do). For some flatbuffer uses this matters ( http://flatgeobuf.org/#why-am-i-not-getting-expected-performance-in-gdal ).


nanoarrow::UniqueArrayStream array_stream;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcArrayStreamReaderInit(array_stream.get(), input_stream.get(),
/*options=*/nullptr),
error);
ArrowSchemaDeepCopy(&decoder->footer->schema, schema.get()), error);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderSetSchema(decoder.get(), &decoder->footer->schema, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcDecoderSetEndianness(decoder.get(), decoder->endianness), error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't endianness a flag in the schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endianness is a flag in the IPC schema message, but not in ArrowSchema which is what decoder->footer->schema is, so we need to set it up separately


nanoarrow::UniqueBuffer record_batch_blocks;
ArrowBufferMove(&decoder->footer->record_batch_blocks, record_batch_blocks.get());

for (int i = 0;
i < record_batch_blocks->size_bytes / sizeof(struct ArrowIpcFileBlock); i++) {
const auto& block =
reinterpret_cast<struct ArrowIpcFileBlock*>(record_batch_blocks->data)[i];
struct ArrowBufferView metadata_view = {
{bytes.data() + block.offset},
block.metadata_length,
};
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeHeader(decoder.get(), metadata_view, error));

return From(array_stream.get(), error);
struct ArrowBufferView body_view = {
{metadata_view.data.as_uint8 + metadata_view.size_bytes},
block.body_length,
};
nanoarrow::UniqueArray batch;
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeArray(decoder.get(), body_view, -1, batch.get(),
NANOARROW_VALIDATION_LEVEL_FULL, error));
batches.push_back(std::move(batch));
}

return NANOARROW_OK;
}

ArrowErrorCode Write(struct ArrowIpcOutputStream* output_stream, bool write_file,
Expand Down
167 changes: 144 additions & 23 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
// at the beginning of every message header.
static const int32_t kMessageHeaderPrefixSize = 8;

#define NANOARROW_IPC_MAGIC "ARROW1"

// Internal representation of a parsed "Field" from flatbuffers. This
// represents a field in a depth-first walk of column arrays and their
// children.
Expand Down Expand Up @@ -95,6 +97,8 @@ struct ArrowIpcDecoderPrivate {
int64_t n_buffers;
// A pointer to the last flatbuffers message.
const void* last_message;
// Storage for a Footer
struct ArrowIpcFooter footer;
};

ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) {
Expand Down Expand Up @@ -236,6 +240,7 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) {

memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate));
private_data->system_endianness = ArrowIpcSystemEndianness();
ArrowIpcFooterInit(&private_data->footer);
decoder->private_data = private_data;
return NANOARROW_OK;
}
Expand All @@ -256,6 +261,8 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
private_data->n_fields = 0;
}

ArrowIpcFooterReset(&private_data->footer);

ArrowFree(private_data);
memset(decoder, 0, sizeof(struct ArrowIpcDecoder));
}
Expand Down Expand Up @@ -959,6 +966,8 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode
decoder->codec = 0;
decoder->header_size_bytes = 0;
decoder->body_size_bytes = 0;
decoder->footer = NULL;
ArrowIpcFooterReset(&private_data->footer);
private_data->last_message = NULL;
}

Expand Down Expand Up @@ -1053,6 +1062,85 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderPeekFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
if (data.size_bytes < (int)strlen(NANOARROW_IPC_MAGIC) + (int)sizeof(int32_t)) {
ArrowErrorSet(error,
"Expected data of at least 10 bytes but only %" PRId64
" bytes are available",
data.size_bytes);
return ESPIPE;
}

const char* data_end = data.data.as_char + data.size_bytes;
const char* magic = data_end - strlen(NANOARROW_IPC_MAGIC);
const char* footer_size_data = magic - sizeof(int32_t);

if (memcmp(magic, NANOARROW_IPC_MAGIC, strlen(NANOARROW_IPC_MAGIC)) != 0) {
ArrowErrorSet(error, "Expected file to end with ARROW1 but got %s", data_end);
return EINVAL;
}

int32_t footer_size;
memcpy(&footer_size, footer_size_data, sizeof(footer_size));
if (private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG) {
footer_size = bswap32(footer_size);
}

if (footer_size < 0) {
ArrowErrorSet(error, "Expected footer size > 0 but found footer size of %d bytes",
footer_size);
return EINVAL;
}

decoder->header_size_bytes = footer_size;
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderVerifyFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekFooter(decoder, data, error));

// Check that data contains at least the entire footer (return ESPIPE to signal
// that reading more data may help).
int32_t footer_and_size_and_magic_size =
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC);
if (data.size_bytes < footer_and_size_and_magic_size) {
ArrowErrorSet(error,
"Expected >= %d bytes of data but only %" PRId64
" bytes are in the buffer",
footer_and_size_and_magic_size, data.size_bytes);
return ESPIPE;
}

const uint8_t* footer_data =
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;

// Run flatbuffers verification
if (ns(Footer_verify_as_root(footer_data, decoder->header_size_bytes) !=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's ns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatcc generates symbols like org_apache_arrow_flatbuf_Footer_verify_as_root. ns() is a macro which prepends that namespace

flatcc_verify_ok)) {
ArrowErrorSet(error, "Footer flatbuffer verification failed");
return EINVAL;
}

// Read some basic information from the message
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));
if (ns(Footer_schema(footer)) == NULL) {
ArrowErrorSet(error, "Footer has no schema");
return EINVAL;
}

decoder->metadata_version = ns(Footer_version(footer));
decoder->body_size_bytes = 0;
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
Expand Down Expand Up @@ -1126,6 +1214,29 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
return NANOARROW_OK;
}

static ArrowErrorCode ArrowIpcDecoderDecodeSchemaImpl(ns(Schema_table_t) schema,
struct ArrowSchema* out,
struct ArrowError* error) {
ArrowSchemaInit(out);
// Top-level batch schema is typically non-nullable
out->flags = 0;

ns(Field_vec_t) fields = ns(Schema_fields(schema));
int64_t n_fields = ns(Schema_vec_len(fields));

ArrowErrorCode result = ArrowSchemaSetTypeStruct(out, n_fields);
if (result != NANOARROW_OK) {
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children",
n_fields);
return result;
}

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(out, fields, error));
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderSetMetadata(out, ns(Schema_custom_metadata(schema)), error));
return NANOARROW_OK;
}

ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
struct ArrowSchema* out,
struct ArrowError* error) {
Expand All @@ -1138,37 +1249,47 @@ ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
return EINVAL;
}

ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message;

ns(Field_vec_t) fields = ns(Schema_fields(schema));
int64_t n_fields = ns(Schema_vec_len(fields));

struct ArrowSchema tmp;
ArrowSchemaInit(&tmp);
int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children",
n_fields);
return result;
}

// Top-level batch schema is typically non-nullable
tmp.flags = 0;
ArrowErrorCode result = ArrowIpcDecoderDecodeSchemaImpl(
(ns(Schema_table_t))private_data->last_message, &tmp, error);

result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
return result;
}
ArrowSchemaMove(&tmp, out);
return NANOARROW_OK;
}

result = ArrowIpcDecoderSetMetadata(&tmp, ns(Schema_custom_metadata(schema)), error);
if (result != NANOARROW_OK) {
ArrowSchemaRelease(&tmp);
return result;
}
ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowSchemaMove(&tmp, out);
int32_t footer_and_size_and_magic_size =
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC);
const uint8_t* footer_data =
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl(
ns(Footer_schema(footer)), &private_data->footer.schema, error));

ns(Block_vec_t) blocks = ns(Footer_recordBatches(footer));
int64_t n = ns(Block_vec_len(blocks));
NANOARROW_RETURN_NOT_OK(ArrowBufferResize(&private_data->footer.record_batch_blocks,
sizeof(struct ArrowIpcFileBlock) * n,
/*shrink_to_fit=*/0));
struct ArrowIpcFileBlock* record_batches =
(struct ArrowIpcFileBlock*)private_data->footer.record_batch_blocks.data;
for (int64_t i = 0; i < n; i++) {
record_batches[i].offset = blocks[i].offset;
record_batches[i].metadata_length = blocks[i].metaDataLength;
record_batches[i].body_length = blocks[i].bodyLength;
}

decoder->footer = &private_data->footer;
return NANOARROW_OK;
}

Expand Down
Loading
Loading