Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37173: [C++][Go][Format] C-export/import Run-End Encoded Arrays #37174

Merged
merged 16 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ struct SchemaExporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) { return SetFormat("+r"); }

ExportedSchemaPrivateData export_;
int64_t flags_ = 0;
std::vector<std::pair<std::string, std::string>> additional_metadata_;
Expand Down Expand Up @@ -1106,6 +1108,8 @@ struct SchemaImporter {
return ProcessMap();
case 'u':
return ProcessUnion();
case 'r':
return ProcessREE();
}
return f_parser_.Invalid();
}
Expand Down Expand Up @@ -1280,6 +1284,22 @@ struct SchemaImporter {
return Status::OK();
}

Status ProcessREE() {
RETURN_NOT_OK(f_parser_.CheckAtEnd());
RETURN_NOT_OK(CheckNumChildren(2));
ARROW_ASSIGN_OR_RAISE(auto run_ends_field, MakeChildField(0));
ARROW_ASSIGN_OR_RAISE(auto values_field, MakeChildField(1));
if (!is_run_end_type(run_ends_field->type()->id())) {
return Status::Invalid("Expected a valid run-end integer type, but struct has ",
run_ends_field->type()->ToString());
}
if (values_field->type()->id() == Type::RUN_END_ENCODED) {
return Status::Invalid("ArrowArray struct contains a nested run-end encoded array");
}
type_ = run_end_encoded(run_ends_field->type(), values_field->type());
return Status::OK();
}

Result<std::shared_ptr<Field>> MakeChildField(int64_t child_id) {
const auto& child = child_importers_[child_id];
if (child.c_struct_->name == nullptr) {
Expand Down Expand Up @@ -1601,6 +1621,17 @@ struct ArrayImporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) {
RETURN_NOT_OK(CheckNumChildren(2));
RETURN_NOT_OK(CheckNumBuffers(0));
RETURN_NOT_OK(AllocateArrayData());
// Always have a null bitmap buffer as much of the code in arrow assumes
// the buffers vector to have at least one entry on every array format.
data_->buffers.emplace_back(nullptr);
data_->null_count = 0;
return Status::OK();
}

Status ImportFixedSizePrimitive(const FixedWidthType& type) {
RETURN_NOT_OK(CheckNoChildren());
RETURN_NOT_OK(CheckNumBuffers(2));
Expand Down
230 changes: 225 additions & 5 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"

// TODO(GH-37221): Remove these ifdef checks when compute dependency is removed
#ifdef ARROW_COMPUTE
#include "arrow/compute/api_vector.h"
#endif

namespace arrow {

using internal::ArrayExportGuard;
Expand Down Expand Up @@ -443,6 +448,20 @@ TEST_F(TestSchemaExport, Union) {
{ARROW_FLAG_NULLABLE});
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaExport, RunEndEncoded) {
TestNested(run_end_encoded(int16(), uint8()), {"+r", "s", "C"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int32(), float64()), {"+r", "i", "g"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int64(), utf8()), {"+r", "l", "u"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int32(), list(utf8())), {"+r", "i", "+l", "u"},
{"", "run_ends", "values", "item"},
{ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE, ARROW_FLAG_NULLABLE});
}
#endif

std::string GetIndexFormat(Type::type type_id) {
switch (type_id) {
case Type::UINT8:
Expand Down Expand Up @@ -952,6 +971,36 @@ TEST_F(TestArrayExport, Union) {
TestNested(type, data);
}

#ifdef ARROW_COMPUTE
Result<std::shared_ptr<Array>> REEFromJSON(const std::shared_ptr<DataType>& ree_type,
const std::string& json) {
auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
ARROW_ASSIGN_OR_RAISE(
auto datum,
RunEndEncode(array, compute::RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
return datum.make_array();
}

TEST_F(TestArrayExport, RunEndEncoded) {
auto factory = []() {
return REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]");
};
TestNested(factory);
}

TEST_F(TestArrayExport, RunEndEncodedSliced) {
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestNested(factory);
}
#endif

TEST_F(TestArrayExport, Dictionary) {
{
auto factory = []() {
Expand Down Expand Up @@ -1269,6 +1318,17 @@ class TestDeviceArrayExport : public ::testing::Test {
return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
}

#ifdef ARROW_COMPUTE
static std::function<Result<std::shared_ptr<Array>>()> JSONREEArrayFactory(
const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
const char* json) {
return [=]() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto result, REEFromJSON(type, json));
return ToDevice(mm, *result->data());
};
}
#endif

template <typename ArrayFactory, typename ExportCheckFunc>
void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) {
auto orig_bytes = pool_->bytes_allocated();
Expand Down Expand Up @@ -1465,6 +1525,17 @@ TEST_F(TestDeviceArrayExport, Union) {
TestNested(mm, type, data);
}

#ifdef ARROW_COMPUTE
TEST_F(TestDeviceArrayExport, RunEndEncoded) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();

auto type = run_end_encoded(int32(), int32());
const char* data = "[1, null, 2, 2, 4, 5]";
TestNested(JSONREEArrayFactory(mm, type, data));
}
#endif

TEST_F(TestDeviceArrayExport, Extension) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();
Expand Down Expand Up @@ -1564,11 +1635,10 @@ class SchemaStructBuilder {

// Create a new ArrowSchema struct with a stable C pointer
struct ArrowSchema* AddChild() {
nested_structs_.emplace_back();
struct ArrowSchema* result = &nested_structs_.back();
memset(result, 0, sizeof(*result));
result->release = NoOpSchemaRelease;
return result;
auto& result = nested_structs_.emplace_back();
memset(&result, 0, sizeof(result));
result.release = NoOpSchemaRelease;
return &result;
}

// Create a stable C pointer to the N last structs in nested_structs_
Expand Down Expand Up @@ -1620,6 +1690,17 @@ class SchemaStructBuilder {
c->children = NLastChildren(c->n_children, c);
}

void FillRunEndEncoded(struct ArrowSchema* c, const char* format,
const char* name = nullptr, int64_t flags = kDefaultFlags) {
c->flags = flags;
c->format = format;
c->name = name;
c->n_children = 2;
c->children = NLastChildren(2, c);
c->children[0]->name = "run_ends";
c->children[1]->name = "values";
}

void FillPrimitive(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillPrimitive(&c_struct_, format, name, flags);
Expand All @@ -1637,6 +1718,11 @@ class SchemaStructBuilder {
FillStructLike(&c_struct_, format, n_children, name, flags);
}

void FillRunEndEncoded(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillRunEndEncoded(&c_struct_, format, name, flags);
}

struct ArrowSchema c_struct_;
// Deque elements don't move when the deque is appended to, which allows taking
// stable C pointers to them.
Expand Down Expand Up @@ -1902,6 +1988,15 @@ TEST_F(TestSchemaImport, Map) {
CheckImport(expected);
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaImport, RunEndEncoded) {
FillPrimitive(AddChild(), "s", "run_ends");
FillPrimitive(AddChild(), "I", "values");
FillRunEndEncoded("+r");
CheckImport(run_end_encoded(int16(), uint32()));
felipecrv marked this conversation as resolved.
Show resolved Hide resolved
}
#endif

TEST_F(TestSchemaImport, Dictionary) {
FillPrimitive(AddChild(), "u");
FillPrimitive("c");
Expand Down Expand Up @@ -2021,6 +2116,33 @@ TEST_F(TestSchemaImport, UnionError) {
CheckImportError();
}

TEST_F(TestSchemaImport, RunEndEncodedError) {
// Bad run-end type
FillPrimitive(AddChild(), "c", "run_ends");
FillPrimitive(AddChild(), "u", "values");
FillRunEndEncoded("+r");
CheckImportError();

// REE of a REE also causes an error
ArrowSchema* run_ends = AddChild();
ArrowSchema* values;
FillPrimitive(run_ends, "i", "run_ends");
{
FillPrimitive(AddChild(), "i", "run_ends");
FillPrimitive(AddChild(), "u", "values");
values = AddChild();
FillRunEndEncoded(values, "+r", "values");
}
// Fill the top-level REE
ArrowSchema* children[2] = {run_ends, values};
c_struct_.flags = kDefaultFlags;
c_struct_.format = "+r";
c_struct_.name = "";
c_struct_.n_children = 2;
c_struct_.children = children;
CheckImportError();
}

TEST_F(TestSchemaImport, DictionaryError) {
// Bad index type
FillPrimitive(AddChild(), "c");
Expand Down Expand Up @@ -2178,6 +2300,10 @@ static const void* timestamp_buffers_no_nulls2[2] = {nullptr, timestamp_data_buf
static const void* timestamp_buffers_no_nulls3[2] = {nullptr, timestamp_data_buffer3};
static const void* timestamp_buffers_no_nulls4[2] = {nullptr, timestamp_data_buffer4};

static const uint16_t run_ends_data_buffer5[5] = {1, 2, 4, 7, 9};
[[maybe_unused]] static const void* run_ends_buffers5[2] = {nullptr,
run_ends_data_buffer5};

static const uint8_t string_data_buffer1[] = "foobarquuxxyzzy";

static const int32_t string_offsets_buffer1[] = {0, 3, 3, 6, 10, 15};
Expand Down Expand Up @@ -2354,6 +2480,20 @@ class TestArrayImport : public ::testing::Test {
legacy);
}

void FillRunEndEncoded(int64_t length, int64_t offset) {
FillRunEndEncoded(&c_struct_, length, offset);
}

void FillRunEndEncoded(struct ArrowArray* c, int64_t length, int64_t offset) {
c->length = length;
c->null_count = 0;
c->offset = offset;
c->n_buffers = 0;
c->buffers = nullptr;
c->n_children = 2;
c->children = NLastChildren(2, c);
}

void CheckImport(const std::shared_ptr<Array>& expected) {
ArrayReleaseCallback cb(&c_struct_);

Expand Down Expand Up @@ -2704,6 +2844,51 @@ TEST_F(TestArrayImport, Struct) {
CheckImport(expected);
}

#ifdef ARROW_COMPUTE
TEST_F(TestArrayImport, RunEndEncoded) {
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(9, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(run_end_encoded(int16(), float32()),
"[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]"));
ASSERT_OK(expected->ValidateFull());
CheckImport(expected);
}

TEST_F(TestArrayImport, RunEndEncodedWithOffset) {
auto ree_type = run_end_encoded(int16(), float32());
// Offset in children
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(7, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Ofsset in parent
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(5, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Length in parent that cuts last run
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);

// Offset in both children and parent
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);
}
#endif

TEST_F(TestArrayImport, SparseUnion) {
auto type = sparse_union({field("strs", utf8()), field("ints", int8())}, {43, 42});
auto expected =
Expand Down Expand Up @@ -3209,6 +3394,13 @@ TEST_F(TestSchemaRoundtrip, Union) {
TestWithTypeFactory([&]() { return dense_union({f1, f2}, type_codes); });
}

#ifdef ARROW_COMPUTE
TEST_F(TestSchemaRoundtrip, RunEndEncoded) {
TestWithTypeFactory([]() { return run_end_encoded(int16(), float32()); });
TestWithTypeFactory([]() { return run_end_encoded(int32(), list(float32())); });
}
#endif

TEST_F(TestSchemaRoundtrip, Dictionary) {
for (auto index_ty : all_dictionary_index_types()) {
TestWithTypeFactory([&]() { return dictionary(index_ty, utf8()); });
Expand Down Expand Up @@ -3500,6 +3692,34 @@ TEST_F(TestArrayRoundtrip, Union) {
}
}

#ifdef ARROW_COMPUTE
TEST_F(TestArrayRoundtrip, RunEndEncoded) {
{
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestWithArrayFactory(factory);
}
{
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(
auto ree_array,
RunEndEncodedArray::Make(
run_end_encoded(int64(), list(utf8())), 8,
ArrayFromJSON(int64(), "[1, 3, 4, 7, 8]"),
ArrayFromJSON(list(utf8()),
R"([["abc", "def"], ["efg"], [], null, ["efg", "hij"]])")));
RETURN_NOT_OK(ree_array->ValidateFull());
return ree_array;
};
TestWithArrayFactory(factory);
}
}
#endif

TEST_F(TestArrayRoundtrip, Dictionary) {
{
auto factory = []() {
Expand Down
Loading
Loading