-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Footer decoding #598
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,9 @@ int main(int argc, char** argv) try { | |
} | ||
|
||
struct File { | ||
File(FILE* file) : file_{file} {} | ||
File() = default; | ||
|
||
~File() { | ||
if (file_ != nullptr) { | ||
fclose(file_); | ||
|
@@ -166,35 +169,62 @@ struct MaterializedArrayStream { | |
// Footer). | ||
File ipc_file; | ||
NANOARROW_RETURN_NOT_OK(ipc_file.open(path, "rb", error)); | ||
return FromIpcFile(ipc_file, error); | ||
} | ||
auto bytes = ipc_file.read(); | ||
|
||
ArrowErrorCode FromIpcFile(FILE* ipc_file, struct ArrowError* error) { | ||
char prefix[sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)] = {}; | ||
if (fread(&prefix, 1, sizeof(prefix), ipc_file) < sizeof(prefix)) { | ||
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld", | ||
sizeof(prefix), ftell(ipc_file)); | ||
auto min_size = sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC) + sizeof(int32_t) + | ||
strlen(NANOARROW_IPC_FILE_PADDED_MAGIC); | ||
if (bytes.size() < min_size) { | ||
ArrowErrorSet(error, "Expected file of more than %lu bytes, got %ld", min_size, | ||
bytes.size()); | ||
return EINVAL; | ||
} | ||
|
||
if (memcmp(&prefix, NANOARROW_IPC_FILE_PADDED_MAGIC, sizeof(prefix)) != 0) { | ||
if (memcmp(bytes.data(), NANOARROW_IPC_FILE_PADDED_MAGIC, | ||
sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC)) != 0) { | ||
ArrowErrorSet(error, "File did not begin with 'ARROW1\\0\\0'"); | ||
return EINVAL; | ||
} | ||
|
||
nanoarrow::ipc::UniqueInputStream input_stream; | ||
NANOARROW_RETURN_NOT_OK_WITH_ERROR( | ||
ArrowIpcInputStreamInitFile(input_stream.get(), ipc_file, | ||
/*close_on_release=*/false), | ||
error); | ||
nanoarrow::ipc::UniqueDecoder decoder; | ||
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowIpcDecoderInit(decoder.get()), error); | ||
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyFooter( | ||
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error)); | ||
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeFooter( | ||
decoder.get(), {{bytes.data()}, static_cast<int64_t>(bytes.size())}, error)); | ||
|
||
nanoarrow::UniqueArrayStream array_stream; | ||
NANOARROW_RETURN_NOT_OK_WITH_ERROR( | ||
ArrowIpcArrayStreamReaderInit(array_stream.get(), input_stream.get(), | ||
/*options=*/nullptr), | ||
error); | ||
ArrowSchemaDeepCopy(&decoder->footer->schema, schema.get()), error); | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowIpcDecoderSetSchema(decoder.get(), &decoder->footer->schema, error)); | ||
NANOARROW_RETURN_NOT_OK_WITH_ERROR( | ||
ArrowIpcDecoderSetEndianness(decoder.get(), decoder->endianness), error); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't endianness a flag in the schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Endianness is a flag in the IPC schema message, but not in ArrowSchema which is what |
||
|
||
nanoarrow::UniqueBuffer record_batch_blocks; | ||
ArrowBufferMove(&decoder->footer->record_batch_blocks, record_batch_blocks.get()); | ||
|
||
for (int i = 0; | ||
i < record_batch_blocks->size_bytes / sizeof(struct ArrowIpcFileBlock); i++) { | ||
const auto& block = | ||
reinterpret_cast<struct ArrowIpcFileBlock*>(record_batch_blocks->data)[i]; | ||
struct ArrowBufferView metadata_view = { | ||
{bytes.data() + block.offset}, | ||
block.metadata_length, | ||
}; | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowIpcDecoderDecodeHeader(decoder.get(), metadata_view, error)); | ||
|
||
return From(array_stream.get(), error); | ||
struct ArrowBufferView body_view = { | ||
{metadata_view.data.as_uint8 + metadata_view.size_bytes}, | ||
block.body_length, | ||
}; | ||
nanoarrow::UniqueArray batch; | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowIpcDecoderDecodeArray(decoder.get(), body_view, -1, batch.get(), | ||
NANOARROW_VALIDATION_LEVEL_FULL, error)); | ||
batches.push_back(std::move(batch)); | ||
} | ||
|
||
return NANOARROW_OK; | ||
} | ||
|
||
ArrowErrorCode Write(struct ArrowIpcOutputStream* output_stream, bool write_file, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,6 +56,8 @@ | |
// at the beginning of every message header. | ||
static const int32_t kMessageHeaderPrefixSize = 8; | ||
|
||
#define NANOARROW_IPC_MAGIC "ARROW1" | ||
|
||
// Internal representation of a parsed "Field" from flatbuffers. This | ||
// represents a field in a depth-first walk of column arrays and their | ||
// children. | ||
|
@@ -95,6 +97,8 @@ struct ArrowIpcDecoderPrivate { | |
int64_t n_buffers; | ||
// A pointer to the last flatbuffers message. | ||
const void* last_message; | ||
// Storage for a Footer | ||
struct ArrowIpcFooter footer; | ||
}; | ||
|
||
ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) { | ||
|
@@ -236,6 +240,7 @@ ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) { | |
|
||
memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate)); | ||
private_data->system_endianness = ArrowIpcSystemEndianness(); | ||
ArrowIpcFooterInit(&private_data->footer); | ||
decoder->private_data = private_data; | ||
return NANOARROW_OK; | ||
} | ||
|
@@ -256,6 +261,8 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) { | |
private_data->n_fields = 0; | ||
} | ||
|
||
ArrowIpcFooterReset(&private_data->footer); | ||
|
||
ArrowFree(private_data); | ||
memset(decoder, 0, sizeof(struct ArrowIpcDecoder)); | ||
} | ||
|
@@ -959,6 +966,8 @@ static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decode | |
decoder->codec = 0; | ||
decoder->header_size_bytes = 0; | ||
decoder->body_size_bytes = 0; | ||
decoder->footer = NULL; | ||
ArrowIpcFooterReset(&private_data->footer); | ||
private_data->last_message = NULL; | ||
} | ||
|
||
|
@@ -1053,6 +1062,85 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder, | |
return NANOARROW_OK; | ||
} | ||
|
||
ArrowErrorCode ArrowIpcDecoderPeekFooter(struct ArrowIpcDecoder* decoder, | ||
struct ArrowBufferView data, | ||
struct ArrowError* error) { | ||
struct ArrowIpcDecoderPrivate* private_data = | ||
(struct ArrowIpcDecoderPrivate*)decoder->private_data; | ||
|
||
ArrowIpcDecoderResetHeaderInfo(decoder); | ||
if (data.size_bytes < (int)strlen(NANOARROW_IPC_MAGIC) + (int)sizeof(int32_t)) { | ||
ArrowErrorSet(error, | ||
"Expected data of at least 10 bytes but only %" PRId64 | ||
" bytes are available", | ||
data.size_bytes); | ||
return ESPIPE; | ||
} | ||
|
||
const char* data_end = data.data.as_char + data.size_bytes; | ||
const char* magic = data_end - strlen(NANOARROW_IPC_MAGIC); | ||
const char* footer_size_data = magic - sizeof(int32_t); | ||
|
||
if (memcmp(magic, NANOARROW_IPC_MAGIC, strlen(NANOARROW_IPC_MAGIC)) != 0) { | ||
ArrowErrorSet(error, "Expected file to end with ARROW1 but got %s", data_end); | ||
return EINVAL; | ||
} | ||
|
||
int32_t footer_size; | ||
memcpy(&footer_size, footer_size_data, sizeof(footer_size)); | ||
if (private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG) { | ||
footer_size = bswap32(footer_size); | ||
} | ||
|
||
if (footer_size < 0) { | ||
ArrowErrorSet(error, "Expected footer size > 0 but found footer size of %d bytes", | ||
footer_size); | ||
return EINVAL; | ||
} | ||
|
||
decoder->header_size_bytes = footer_size; | ||
return NANOARROW_OK; | ||
} | ||
|
||
ArrowErrorCode ArrowIpcDecoderVerifyFooter(struct ArrowIpcDecoder* decoder, | ||
struct ArrowBufferView data, | ||
struct ArrowError* error) { | ||
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekFooter(decoder, data, error)); | ||
|
||
// Check that data contains at least the entire footer (return ESPIPE to signal | ||
// that reading more data may help). | ||
int32_t footer_and_size_and_magic_size = | ||
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC); | ||
if (data.size_bytes < footer_and_size_and_magic_size) { | ||
ArrowErrorSet(error, | ||
"Expected >= %d bytes of data but only %" PRId64 | ||
" bytes are in the buffer", | ||
footer_and_size_and_magic_size, data.size_bytes); | ||
return ESPIPE; | ||
} | ||
|
||
const uint8_t* footer_data = | ||
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size; | ||
|
||
// Run flatbuffers verification | ||
if (ns(Footer_verify_as_root(footer_data, decoder->header_size_bytes) != | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. flatcc generates symbols like |
||
flatcc_verify_ok)) { | ||
ArrowErrorSet(error, "Footer flatbuffer verification failed"); | ||
return EINVAL; | ||
} | ||
|
||
// Read some basic information from the message | ||
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data)); | ||
if (ns(Footer_schema(footer)) == NULL) { | ||
ArrowErrorSet(error, "Footer has no schema"); | ||
return EINVAL; | ||
} | ||
|
||
decoder->metadata_version = ns(Footer_version(footer)); | ||
decoder->body_size_bytes = 0; | ||
return NANOARROW_OK; | ||
} | ||
|
||
ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, | ||
struct ArrowBufferView data, | ||
struct ArrowError* error) { | ||
|
@@ -1126,6 +1214,29 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, | |
return NANOARROW_OK; | ||
} | ||
|
||
static ArrowErrorCode ArrowIpcDecoderDecodeSchemaImpl(ns(Schema_table_t) schema, | ||
struct ArrowSchema* out, | ||
struct ArrowError* error) { | ||
ArrowSchemaInit(out); | ||
// Top-level batch schema is typically non-nullable | ||
out->flags = 0; | ||
|
||
ns(Field_vec_t) fields = ns(Schema_fields(schema)); | ||
int64_t n_fields = ns(Schema_vec_len(fields)); | ||
|
||
ArrowErrorCode result = ArrowSchemaSetTypeStruct(out, n_fields); | ||
if (result != NANOARROW_OK) { | ||
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children", | ||
n_fields); | ||
return result; | ||
} | ||
|
||
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(out, fields, error)); | ||
NANOARROW_RETURN_NOT_OK( | ||
ArrowIpcDecoderSetMetadata(out, ns(Schema_custom_metadata(schema)), error)); | ||
return NANOARROW_OK; | ||
} | ||
|
||
ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder, | ||
struct ArrowSchema* out, | ||
struct ArrowError* error) { | ||
|
@@ -1138,37 +1249,47 @@ ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder, | |
return EINVAL; | ||
} | ||
|
||
ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message; | ||
|
||
ns(Field_vec_t) fields = ns(Schema_fields(schema)); | ||
int64_t n_fields = ns(Schema_vec_len(fields)); | ||
|
||
struct ArrowSchema tmp; | ||
ArrowSchemaInit(&tmp); | ||
int result = ArrowSchemaSetTypeStruct(&tmp, n_fields); | ||
if (result != NANOARROW_OK) { | ||
ArrowSchemaRelease(&tmp); | ||
ArrowErrorSet(error, "Failed to allocate struct schema with %" PRId64 " children", | ||
n_fields); | ||
return result; | ||
} | ||
|
||
// Top-level batch schema is typically non-nullable | ||
tmp.flags = 0; | ||
ArrowErrorCode result = ArrowIpcDecoderDecodeSchemaImpl( | ||
(ns(Schema_table_t))private_data->last_message, &tmp, error); | ||
|
||
result = ArrowIpcDecoderSetChildren(&tmp, fields, error); | ||
if (result != NANOARROW_OK) { | ||
ArrowSchemaRelease(&tmp); | ||
return result; | ||
} | ||
ArrowSchemaMove(&tmp, out); | ||
return NANOARROW_OK; | ||
} | ||
|
||
result = ArrowIpcDecoderSetMetadata(&tmp, ns(Schema_custom_metadata(schema)), error); | ||
if (result != NANOARROW_OK) { | ||
ArrowSchemaRelease(&tmp); | ||
return result; | ||
} | ||
ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder, | ||
struct ArrowBufferView data, | ||
struct ArrowError* error) { | ||
struct ArrowIpcDecoderPrivate* private_data = | ||
(struct ArrowIpcDecoderPrivate*)decoder->private_data; | ||
|
||
ArrowSchemaMove(&tmp, out); | ||
int32_t footer_and_size_and_magic_size = | ||
decoder->header_size_bytes + sizeof(int32_t) + strlen(NANOARROW_IPC_MAGIC); | ||
const uint8_t* footer_data = | ||
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size; | ||
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data)); | ||
|
||
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl( | ||
ns(Footer_schema(footer)), &private_data->footer.schema, error)); | ||
|
||
ns(Block_vec_t) blocks = ns(Footer_recordBatches(footer)); | ||
int64_t n = ns(Block_vec_len(blocks)); | ||
NANOARROW_RETURN_NOT_OK(ArrowBufferResize(&private_data->footer.record_batch_blocks, | ||
sizeof(struct ArrowIpcFileBlock) * n, | ||
/*shrink_to_fit=*/0)); | ||
struct ArrowIpcFileBlock* record_batches = | ||
(struct ArrowIpcFileBlock*)private_data->footer.record_batch_blocks.data; | ||
for (int64_t i = 0; i < n; i++) { | ||
record_batches[i].offset = blocks[i].offset; | ||
record_batches[i].metadata_length = blocks[i].metaDataLength; | ||
record_batches[i].body_length = blocks[i].bodyLength; | ||
} | ||
|
||
decoder->footer = &private_data->footer; | ||
return NANOARROW_OK; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should verify and decode be the same function rather than separate functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm following the pattern of schema decoding, where the two are separated. Verification checks offsets to ensure all flatbuffers are in bounds, while decode actually copies flatbuffers into nanoarrow data structures. I'd be fine consolidating them but that should be a follow up
@paleolimbot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely possible to consolidate them...I think I separated them initially in case somebody owned their whole pipeline and wanted to skip it as an optimization (which is probably not actually any faster, and probably nobody will want do do). For some flatbuffer uses this matters ( http://flatgeobuf.org/#why-am-i-not-getting-expected-performance-in-gdal ).