From 3e0dc4ce6bccc8da93330c69da28267531c481b9 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Tue, 15 Aug 2023 16:19:21 +0200 Subject: [PATCH] C-export/import: Run-End Encoded Arrays --- cpp/src/arrow/c/bridge.cc | 20 +++++ cpp/src/arrow/c/bridge_test.cc | 154 +++++++++++++++++++++++++++++++++ cpp/src/arrow/type.cc | 2 +- 3 files changed, 175 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 13355dd6d05ae..5733c6ef907f1 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -478,6 +478,8 @@ struct SchemaExporter { return Status::OK(); } + Status Visit(const RunEndEncodedType& type) { return SetFormat("+r"); } + ExportedSchemaPrivateData export_; int64_t flags_ = 0; std::vector> additional_metadata_; @@ -1113,6 +1115,8 @@ struct SchemaImporter { return ProcessMap(); case 'u': return ProcessUnion(); + case 'r': + return ProcessREE(); } return f_parser_.Invalid(); } @@ -1287,6 +1291,15 @@ struct SchemaImporter { return Status::OK(); } + Status ProcessREE() { + RETURN_NOT_OK(f_parser_.CheckAtEnd()); + RETURN_NOT_OK(CheckNumChildren(2)); + ARROW_ASSIGN_OR_RAISE(auto run_ends_field, MakeChildField(0)); + ARROW_ASSIGN_OR_RAISE(auto values_field, MakeChildField(1)); + type_ = run_end_encoded(run_ends_field->type(), values_field->type()); + return Status::OK(); + } + Result> MakeChildField(int64_t child_id) { const auto& child = child_importers_[child_id]; if (child.c_struct_->name == nullptr) { @@ -1601,6 +1614,13 @@ struct ArrayImporter { return Status::OK(); } + Status Visit(const RunEndEncodedType& type) { + RETURN_NOT_OK(CheckNumChildren(2)); + RETURN_NOT_OK(CheckNumBuffers(1)); + RETURN_NOT_OK(AllocateArrayData()); + return Status::OK(); + } + Status ImportFixedSizePrimitive(const FixedWidthType& type) { RETURN_NOT_OK(CheckNoChildren()); RETURN_NOT_OK(CheckNumBuffers(2)); diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 5c7de8e4a0783..6c7738536ba1a 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -31,6 +31,7 @@ #include "arrow/c/bridge.h" #include "arrow/c/helpers.h" #include "arrow/c/util_internal.h" +#include "arrow/compute/api_vector.h" #include "arrow/ipc/json_simple.h" #include "arrow/memory_pool.h" #include "arrow/testing/extension_type.h" @@ -443,6 +444,13 @@ TEST_F(TestSchemaExport, Union) { {ARROW_FLAG_NULLABLE}); } +TEST_F(TestSchemaExport, RunEndEncoded) { + TestNested(run_end_encoded(int16(), uint8()), {"+r", "s", "C"}, + {"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE}); + TestNested(run_end_encoded(int64(), int32()), {"+r", "l", "i"}, + {"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE}); +} + std::string GetIndexFormat(Type::type type_id) { switch (type_id) { case Type::UINT8: @@ -952,6 +960,34 @@ TEST_F(TestArrayExport, Union) { TestNested(type, data); } +Result> REEFromJSON(const std::shared_ptr& ree_type, + const std::string& json) { + auto ree_type_ptr = checked_cast(ree_type.get()); + auto array = ArrayFromJSON(ree_type_ptr->value_type(), json); + ARROW_ASSIGN_OR_RAISE( + auto datum, + RunEndEncode(array, compute::RunEndEncodeOptions{ree_type_ptr->run_end_type()})); + return datum.make_array(); +} + +TEST_F(TestArrayExport, RunEndEncoded) { + auto factory = []() { + return REEFromJSON(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]"); + }; + TestNested(factory); +} + +TEST_F(TestArrayExport, RunEndEncodedSliced) { + auto factory = []() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto ree_array, + REEFromJSON(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]")); + return ree_array->Slice(1, 5); + }; + TestNested(factory); +} + TEST_F(TestArrayExport, Dictionary) { { auto factory = []() { @@ -1241,6 +1277,15 @@ class TestDeviceArrayExport : public ::testing::Test { return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; } + static std::function>()> JSONREEArrayFactory( + const std::shared_ptr& mm, std::shared_ptr type, + const char* json) { + return [=]() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto result, REEFromJSON(type, json)); + return ToDevice(mm, *result->data()); + }; + } + template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { auto orig_bytes = pool_->bytes_allocated(); @@ -1436,6 +1481,15 @@ TEST_F(TestDeviceArrayExport, Union) { TestNested(mm, type, data); } +TEST_F(TestDeviceArrayExport, RunEndEncoded) { + std::shared_ptr device = std::make_shared(1); + auto mm = device->default_memory_manager(); + + auto type = run_end_encoded(int32(), int32()); + const char* data = "[1, null, 2, 2, 4, 5]"; + TestNested(JSONREEArrayFactory(mm, type, data)); +} + TEST_F(TestDeviceArrayExport, Extension) { std::shared_ptr device = std::make_shared(1); auto mm = device->default_memory_manager(); @@ -1590,6 +1644,17 @@ class SchemaStructBuilder { c->children = NLastChildren(c->n_children, c); } + void FillRunEndEncoded(struct ArrowSchema* c, const char* format, + const char* name = nullptr, int64_t flags = kDefaultFlags) { + c->flags = flags; + c->format = format; + c->name = name; + c->n_children = 2; + c->children = NLastChildren(2, c); + c->children[0]->name = "run_ends"; + c->children[1]->name = "values"; + } + void FillPrimitive(const char* format, const char* name = nullptr, int64_t flags = kDefaultFlags) { FillPrimitive(&c_struct_, format, name, flags); @@ -1607,6 +1672,11 @@ class SchemaStructBuilder { FillStructLike(&c_struct_, format, n_children, name, flags); } + void FillRunEndEncoded(const char* format, const char* name = nullptr, + int64_t flags = kDefaultFlags) { + FillRunEndEncoded(&c_struct_, format, name, flags); + } + struct ArrowSchema c_struct_; // Deque elements don't move when the deque is appended to, which allows taking // stable C pointers to them. @@ -1872,6 +1942,13 @@ TEST_F(TestSchemaImport, Map) { CheckImport(expected); } +TEST_F(TestSchemaImport, RunEndEncoded) { + FillPrimitive(AddChild(), "s", "run_ends"); + FillPrimitive(AddChild(), "I", "values"); + FillRunEndEncoded("+r"); + CheckImport(run_end_encoded(int16(), uint32())); +} + TEST_F(TestSchemaImport, Dictionary) { FillPrimitive(AddChild(), "u"); FillPrimitive("c"); @@ -2148,6 +2225,9 @@ static const void* timestamp_buffers_no_nulls2[2] = {nullptr, timestamp_data_buf static const void* timestamp_buffers_no_nulls3[2] = {nullptr, timestamp_data_buffer3}; static const void* timestamp_buffers_no_nulls4[2] = {nullptr, timestamp_data_buffer4}; +static const uint16_t run_ends_data_buffer5[5] = {1, 2, 4, 7, 9}; +static const void* run_ends_buffers5[2] = {nullptr, run_ends_data_buffer5}; + static const uint8_t string_data_buffer1[] = "foobarquuxxyzzy"; static const int32_t string_offsets_buffer1[] = {0, 3, 3, 6, 10, 15}; @@ -2324,6 +2404,21 @@ class TestArrayImport : public ::testing::Test { legacy); } + void FillRunEndEncoded(int64_t length, int64_t offset) { + FillRunEndEncoded(&c_struct_, length, offset); + } + + void FillRunEndEncoded(struct ArrowArray* c, int64_t length, int64_t offset) { + static const void* buffers[1] = {nullptr}; + c->length = length; + c->null_count = 0; + c->offset = offset; + c->n_buffers = 1; + c->buffers = buffers; + c->n_children = 2; + c->children = NLastChildren(2, c); + } + void CheckImport(const std::shared_ptr& expected) { ArrayReleaseCallback cb(&c_struct_); @@ -2674,6 +2769,49 @@ TEST_F(TestArrayImport, Struct) { CheckImport(expected); } +TEST_F(TestArrayImport, RunEndEncoded) { + FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); + FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); + FillRunEndEncoded(9, 0); + ASSERT_OK_AND_ASSIGN(auto expected, + REEFromJSON(run_end_encoded(int16(), float32()), + "[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]")); + ASSERT_OK(expected->ValidateFull()); + CheckImport(expected); +} + +TEST_F(TestArrayImport, RunEndEncodedWithOffset) { + auto ree_type = run_end_encoded(int16(), float32()); + // Offset in children + FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5); + FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5); + FillRunEndEncoded(7, 0); + ASSERT_OK_AND_ASSIGN(auto expected, + REEFromJSON(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]")); + CheckImport(expected); + + // Ofsset in parent + FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); + FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); + FillRunEndEncoded(5, 2); + ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]")); + CheckImport(expected); + + // Length in parent that cuts last run + FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); + FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); + FillRunEndEncoded(4, 2); + ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); + CheckImport(expected); + + // Offset in both children and parent + FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5); + FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5); + FillRunEndEncoded(4, 2); + ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); + CheckImport(expected); +} + TEST_F(TestArrayImport, SparseUnion) { auto type = sparse_union({field("strs", utf8()), field("ints", int8())}, {43, 42}); auto expected = @@ -3179,6 +3317,10 @@ TEST_F(TestSchemaRoundtrip, Union) { TestWithTypeFactory([&]() { return dense_union({f1, f2}, type_codes); }); } +TEST_F(TestSchemaRoundtrip, RunEndEncoded) { + TestWithTypeFactory([]() { return run_end_encoded(int16(), float32()); }); +} + TEST_F(TestSchemaRoundtrip, Dictionary) { for (auto index_ty : all_dictionary_index_types()) { TestWithTypeFactory([&]() { return dictionary(index_ty, utf8()); }); @@ -3470,6 +3612,18 @@ TEST_F(TestArrayRoundtrip, Union) { } } +TEST_F(TestArrayRoundtrip, RunEndEncoded) { + { + auto factory = []() -> Result> { + ARROW_ASSIGN_OR_RAISE(auto ree_array, + REEFromJSON(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]")); + return ree_array->Slice(1, 5); + }; + TestWithArrayFactory(factory); + } +} + TEST_F(TestArrayRoundtrip, Dictionary) { { auto factory = []() { diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc index 21c47e17239cb..3d294a3fa8642 100644 --- a/cpp/src/arrow/type.cc +++ b/cpp/src/arrow/type.cc @@ -2658,7 +2658,7 @@ std::shared_ptr struct_( return std::make_shared(MakeFields(fields)); } -std::shared_ptr run_end_encoded(std::shared_ptr run_end_type, +std::shared_ptr run_end_encoded(std::shared_ptr run_end_type, std::shared_ptr value_type) { return std::make_shared(std::move(run_end_type), std::move(value_type));