Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add IPC stream writing #571

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/nanoarrow/common/inline_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ static inline void ArrowBitClear(uint8_t* bits, int64_t i) {
}

static inline void ArrowBitSetTo(uint8_t* bits, int64_t i, uint8_t bit_is_set) {
bits[i / 8] ^=
((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i / 8])) & _ArrowkBitmask[i % 8];
bits[i / 8] ^= (uint8_t)(((uint8_t)(-((uint8_t)(bit_is_set != 0)) ^ bits[i / 8])) &
_ArrowkBitmask[i % 8]);
bkietz marked this conversation as resolved.
Show resolved Hide resolved
}

static inline void ArrowBitsSetTo(uint8_t* bits, int64_t start_offset, int64_t length,
Expand Down
20 changes: 13 additions & 7 deletions src/nanoarrow/ipc/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,24 +230,31 @@ static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder,

case NANOARROW_TYPE_TIMESTAMP:
FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error);
FLATCC_RETURN_UNLESS_0(Timestamp_unit_add(builder, schema_view->time_unit), error);
FLATCC_RETURN_UNLESS_0(
Timestamp_unit_add(builder, (ns(TimeUnit_enum_t))schema_view->time_unit),
error);
FLATCC_RETURN_UNLESS_0(
Timestamp_timezone_create_str(builder, schema_view->timezone), error);
FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error);
return NANOARROW_OK;

case NANOARROW_TYPE_TIME32:
FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 32),
error);
FLATCC_RETURN_UNLESS_0(
Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit,
32),
error);
return NANOARROW_OK;

case NANOARROW_TYPE_TIME64:
FLATCC_RETURN_UNLESS_0(Field_type_Time_create(builder, schema_view->time_unit, 64),
error);
FLATCC_RETURN_UNLESS_0(
Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit,
64),
error);
return NANOARROW_OK;

case NANOARROW_TYPE_DURATION:
FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(builder, schema_view->time_unit),
FLATCC_RETURN_UNLESS_0(Field_type_Duration_create(
builder, (ns(TimeUnit_enum_t))schema_view->time_unit),
error);
return NANOARROW_OK;

Expand Down Expand Up @@ -531,7 +538,6 @@ static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch(
const struct ArrowArrayView* array_view, struct ArrowError* error) {
NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL &&
buffer_encoder != NULL && buffer_encoder->encode_buffer != NULL);

if (array_view->null_count != 0 && ArrowArrayViewComputeNullCount(array_view) != 0) {
ArrowErrorSet(error,
"RecordBatches cannot be constructed from arrays with top level nulls");
Expand Down
141 changes: 93 additions & 48 deletions src/nanoarrow/ipc/files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <gtest/gtest.h>

#include "nanoarrow/nanoarrow.hpp"
#include "nanoarrow/nanoarrow_ipc.h"
#include "nanoarrow/nanoarrow_ipc.hpp"
#include "nanoarrow/nanoarrow_testing.hpp"

#include "flatcc/portable/pendian_detect.h"
Expand Down Expand Up @@ -105,6 +105,29 @@ class TestFile {
return NANOARROW_OK;
}

ArrowErrorCode ReadArrowArrayStreamIPC(const std::string& dir_prefix,
bkietz marked this conversation as resolved.
Show resolved Hide resolved
struct ArrowSchema* schema,
std::vector<nanoarrow::UniqueArray>* arrays,
ArrowError* error) {
nanoarrow::UniqueArrayStream stream;
NANOARROW_RETURN_NOT_OK(GetArrowArrayStreamIPC(dir_prefix, stream.get(), error));

NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(stream.get(), schema, error));

while (true) {
nanoarrow::UniqueArray array;

NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream.get(), array.get(), error));

if (array->release == nullptr) {
break;
}

arrays->push_back(std::move(array));
}
return NANOARROW_OK;
}

ArrowErrorCode GetArrowArrayStreamCheckJSON(const std::string& dir_prefix,
ArrowArrayStream* out, ArrowError* error) {
std::stringstream path_builder;
Expand Down Expand Up @@ -171,6 +194,31 @@ class TestFile {
return std::make_shared<io::BufferReader>(content_copy_wrapped);
}

ArrowErrorCode WriteNanoarrowStream(const nanoarrow::UniqueSchema& schema,
const std::vector<nanoarrow::UniqueArray>& arrays,
struct ArrowBuffer* buffer,
struct ArrowError* error) {
nanoarrow::ipc::UniqueOutputStream output_stream;
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamInitBuffer(output_stream.get(), buffer));

nanoarrow::ipc::UniqueWriter writer;
NANOARROW_RETURN_NOT_OK(ArrowIpcWriterInit(writer.get(), output_stream.get()));

nanoarrow::UniqueArrayView array_view;
NANOARROW_RETURN_NOT_OK(
ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), error));

NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer.get(), schema.get(), error));
for (const auto& array : arrays) {
NANOARROW_RETURN_NOT_OK(
ArrowArrayViewSetArray(array_view.get(), array.get(), error));

NANOARROW_RETURN_NOT_OK(
ArrowIpcWriterWriteArrayView(writer.get(), array_view.get(), error));
}
return ArrowIpcWriterWriteArrayView(writer.get(), nullptr, error);
}

void TestEqualsArrowCpp(const std::string& dir_prefix) {
std::stringstream path_builder;
path_builder << dir_prefix << "/" << path_;
Expand All @@ -179,76 +227,72 @@ class TestFile {
ArrowErrorInit(&error);

// Read using nanoarrow_ipc
nanoarrow::UniqueArrayStream stream;
ASSERT_EQ(GetArrowArrayStreamIPC(dir_prefix, stream.get(), &error), NANOARROW_OK)
<< error.message;

nanoarrow::UniqueSchema schema;
int result = ArrowArrayStreamGetSchema(stream.get(), schema.get(), &error);
std::vector<nanoarrow::UniqueArray> arrays;
int result = ReadArrowArrayStreamIPC(dir_prefix, schema.get(), &arrays, &error);
if (result != NANOARROW_OK) {
if (Check(result, error.message)) {
return;
}

GTEST_FAIL() << MakeError(result, error.message);
}

std::vector<nanoarrow::UniqueArray> arrays;
while (true) {
nanoarrow::UniqueArray array;

result = ArrowArrayStreamGetNext(stream.get(), array.get(), &error);
if (result != NANOARROW_OK) {
if (Check(result, error.message)) {
return;
}

GTEST_FAIL() << MakeError(result, error.message);
}

if (array->release == nullptr) {
break;
}

arrays.push_back(std::move(array));
}

// If the file was supposed to fail the read but did not, fail here
if (expected_return_code_ != NANOARROW_OK) {
GTEST_FAIL() << MakeError(NANOARROW_OK, "");
}

// Read the same file with Arrow C++
nanoarrow::UniqueBuffer content_copy;
ASSERT_EQ(ReadFileBuffer(path_builder.str(), content_copy.get(), &error),
// Write back to a buffer using nanoarrow
nanoarrow::UniqueBuffer roundtripped;
ASSERT_EQ(WriteNanoarrowStream(schema, arrays, roundtripped.get(), &error),
NANOARROW_OK)
<< error.message;
std::shared_ptr<io::InputStream> input_stream = BufferInputStream(content_copy.get());

auto maybe_reader = ipc::RecordBatchStreamReader::Open(input_stream);
FAIL_RESULT_NOT_OK(maybe_reader);

auto maybe_table_arrow = maybe_reader.ValueUnsafe()->ToTable();
// Read the same file with Arrow C++
auto maybe_table_arrow = ReadTable(io::ReadableFile::Open(path_builder.str()));
FAIL_RESULT_NOT_OK(maybe_table_arrow);

// Make a Table from the our vector of arrays
auto maybe_schema = ImportSchema(schema.get());
FAIL_RESULT_NOT_OK(maybe_schema);
AssertEqualsTable(std::move(schema), std::move(arrays),
maybe_table_arrow.ValueUnsafe());

ASSERT_TRUE(maybe_table_arrow.ValueUnsafe()->schema()->Equals(**maybe_schema, true));
// Read the roundtripped buffer using nanoarrow
nanoarrow::UniqueSchema roundtripped_schema;
std::vector<nanoarrow::UniqueArray> roundtripped_arrays;
ASSERT_EQ(ReadArrowArrayStreamIPC(dir_prefix, roundtripped_schema.get(),
&roundtripped_arrays, &error),
NANOARROW_OK);

std::vector<std::shared_ptr<RecordBatch>> batches;
for (auto& array : arrays) {
auto maybe_batch = ImportRecordBatch(array.get(), *maybe_schema);
FAIL_RESULT_NOT_OK(maybe_batch);
AssertEqualsTable(std::move(roundtripped_schema), std::move(roundtripped_arrays),
maybe_table_arrow.ValueUnsafe());
}

batches.push_back(std::move(*maybe_batch));
Result<std::shared_ptr<Table>> ReadTable(
Result<std::shared_ptr<io::InputStream>> maybe_input_stream) {
ARROW_ASSIGN_OR_RAISE(auto input_stream, maybe_input_stream);
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchStreamReader::Open(input_stream));
return reader->ToTable();
}

Result<std::shared_ptr<Table>> ToTable(nanoarrow::UniqueSchema schema,
std::vector<nanoarrow::UniqueArray> arrays) {
ARROW_ASSIGN_OR_RAISE(auto arrow_schema, ImportSchema(schema.get()));

std::vector<std::shared_ptr<RecordBatch>> batches(arrays.size());
size_t i = 0;
for (auto& array : arrays) {
ARROW_ASSIGN_OR_RAISE(auto batch, ImportRecordBatch(array.get(), arrow_schema));
batches[i++] = std::move(batch);
}
return Table::FromRecordBatches(std::move(arrow_schema), std::move(batches));
}

auto maybe_table = Table::FromRecordBatches(*maybe_schema, batches);
void AssertEqualsTable(nanoarrow::UniqueSchema schema,
std::vector<nanoarrow::UniqueArray> arrays,
const std::shared_ptr<Table>& expected) {
auto maybe_table = ToTable(std::move(schema), std::move(arrays));
FAIL_RESULT_NOT_OK(maybe_table);

EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(**maybe_table_arrow, true));
ASSERT_TRUE(expected->schema()->Equals(maybe_table.ValueUnsafe()->schema(), true));
EXPECT_TRUE(maybe_table.ValueUnsafe()->Equals(*expected, true));
}

void TestIPCCheckJSON(const std::string& dir_prefix) {
Expand Down Expand Up @@ -382,7 +426,8 @@ INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, TestFileFixture,
::testing::Values(
// Files in data/arrow-ipc-stream/integration/1.0.0-(little|big)endian/
// should read without error and the data should match Arrow C++'s read
// should read without error and the data should match Arrow C++'s read.
// Also write the stream to a buffer and check Arrow C++'s read of that.
TestFile::OK("generated_custom_metadata.stream"),
TestFile::OK("generated_datetime.stream"),
TestFile::OK("generated_decimal.stream"),
Expand Down
Loading
Loading