Skip to content

Commit

Permalink
C-export/import: Run-End Encoded Arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Aug 15, 2023
1 parent cd8830b commit 3e0dc4c
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 1 deletion.
20 changes: 20 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ struct SchemaExporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) { return SetFormat("+r"); }

ExportedSchemaPrivateData export_;
int64_t flags_ = 0;
std::vector<std::pair<std::string, std::string>> additional_metadata_;
Expand Down Expand Up @@ -1113,6 +1115,8 @@ struct SchemaImporter {
return ProcessMap();
case 'u':
return ProcessUnion();
case 'r':
return ProcessREE();
}
return f_parser_.Invalid();
}
Expand Down Expand Up @@ -1287,6 +1291,15 @@ struct SchemaImporter {
return Status::OK();
}

Status ProcessREE() {
RETURN_NOT_OK(f_parser_.CheckAtEnd());
RETURN_NOT_OK(CheckNumChildren(2));
ARROW_ASSIGN_OR_RAISE(auto run_ends_field, MakeChildField(0));
ARROW_ASSIGN_OR_RAISE(auto values_field, MakeChildField(1));
type_ = run_end_encoded(run_ends_field->type(), values_field->type());
return Status::OK();
}

Result<std::shared_ptr<Field>> MakeChildField(int64_t child_id) {
const auto& child = child_importers_[child_id];
if (child.c_struct_->name == nullptr) {
Expand Down Expand Up @@ -1601,6 +1614,13 @@ struct ArrayImporter {
return Status::OK();
}

Status Visit(const RunEndEncodedType& type) {
RETURN_NOT_OK(CheckNumChildren(2));
RETURN_NOT_OK(CheckNumBuffers(1));
RETURN_NOT_OK(AllocateArrayData());
return Status::OK();
}

Status ImportFixedSizePrimitive(const FixedWidthType& type) {
RETURN_NOT_OK(CheckNoChildren());
RETURN_NOT_OK(CheckNumBuffers(2));
Expand Down
154 changes: 154 additions & 0 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "arrow/c/util_internal.h"
#include "arrow/compute/api_vector.h"
#include "arrow/ipc/json_simple.h"
#include "arrow/memory_pool.h"
#include "arrow/testing/extension_type.h"
Expand Down Expand Up @@ -443,6 +444,13 @@ TEST_F(TestSchemaExport, Union) {
{ARROW_FLAG_NULLABLE});
}

TEST_F(TestSchemaExport, RunEndEncoded) {
TestNested(run_end_encoded(int16(), uint8()), {"+r", "s", "C"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
TestNested(run_end_encoded(int64(), int32()), {"+r", "l", "i"},
{"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE});
}

std::string GetIndexFormat(Type::type type_id) {
switch (type_id) {
case Type::UINT8:
Expand Down Expand Up @@ -952,6 +960,34 @@ TEST_F(TestArrayExport, Union) {
TestNested(type, data);
}

Result<std::shared_ptr<Array>> REEFromJSON(const std::shared_ptr<DataType>& ree_type,
const std::string& json) {
auto ree_type_ptr = checked_cast<const RunEndEncodedType*>(ree_type.get());
auto array = ArrayFromJSON(ree_type_ptr->value_type(), json);
ARROW_ASSIGN_OR_RAISE(
auto datum,
RunEndEncode(array, compute::RunEndEncodeOptions{ree_type_ptr->run_end_type()}));
return datum.make_array();
}

TEST_F(TestArrayExport, RunEndEncoded) {
auto factory = []() {
return REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]");
};
TestNested(factory);
}

TEST_F(TestArrayExport, RunEndEncodedSliced) {
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestNested(factory);
}

TEST_F(TestArrayExport, Dictionary) {
{
auto factory = []() {
Expand Down Expand Up @@ -1241,6 +1277,15 @@ class TestDeviceArrayExport : public ::testing::Test {
return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); };
}

static std::function<Result<std::shared_ptr<Array>>()> JSONREEArrayFactory(
const std::shared_ptr<MemoryManager>& mm, std::shared_ptr<DataType> type,
const char* json) {
return [=]() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto result, REEFromJSON(type, json));
return ToDevice(mm, *result->data());
};
}

template <typename ArrayFactory, typename ExportCheckFunc>
void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) {
auto orig_bytes = pool_->bytes_allocated();
Expand Down Expand Up @@ -1436,6 +1481,15 @@ TEST_F(TestDeviceArrayExport, Union) {
TestNested(mm, type, data);
}

TEST_F(TestDeviceArrayExport, RunEndEncoded) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();

auto type = run_end_encoded(int32(), int32());
const char* data = "[1, null, 2, 2, 4, 5]";
TestNested(JSONREEArrayFactory(mm, type, data));
}

TEST_F(TestDeviceArrayExport, Extension) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();
Expand Down Expand Up @@ -1590,6 +1644,17 @@ class SchemaStructBuilder {
c->children = NLastChildren(c->n_children, c);
}

void FillRunEndEncoded(struct ArrowSchema* c, const char* format,
const char* name = nullptr, int64_t flags = kDefaultFlags) {
c->flags = flags;
c->format = format;
c->name = name;
c->n_children = 2;
c->children = NLastChildren(2, c);
c->children[0]->name = "run_ends";
c->children[1]->name = "values";
}

void FillPrimitive(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillPrimitive(&c_struct_, format, name, flags);
Expand All @@ -1607,6 +1672,11 @@ class SchemaStructBuilder {
FillStructLike(&c_struct_, format, n_children, name, flags);
}

void FillRunEndEncoded(const char* format, const char* name = nullptr,
int64_t flags = kDefaultFlags) {
FillRunEndEncoded(&c_struct_, format, name, flags);
}

struct ArrowSchema c_struct_;
// Deque elements don't move when the deque is appended to, which allows taking
// stable C pointers to them.
Expand Down Expand Up @@ -1872,6 +1942,13 @@ TEST_F(TestSchemaImport, Map) {
CheckImport(expected);
}

TEST_F(TestSchemaImport, RunEndEncoded) {
FillPrimitive(AddChild(), "s", "run_ends");
FillPrimitive(AddChild(), "I", "values");
FillRunEndEncoded("+r");
CheckImport(run_end_encoded(int16(), uint32()));
}

TEST_F(TestSchemaImport, Dictionary) {
FillPrimitive(AddChild(), "u");
FillPrimitive("c");
Expand Down Expand Up @@ -2148,6 +2225,9 @@ static const void* timestamp_buffers_no_nulls2[2] = {nullptr, timestamp_data_buf
static const void* timestamp_buffers_no_nulls3[2] = {nullptr, timestamp_data_buffer3};
static const void* timestamp_buffers_no_nulls4[2] = {nullptr, timestamp_data_buffer4};

static const uint16_t run_ends_data_buffer5[5] = {1, 2, 4, 7, 9};
static const void* run_ends_buffers5[2] = {nullptr, run_ends_data_buffer5};

static const uint8_t string_data_buffer1[] = "foobarquuxxyzzy";

static const int32_t string_offsets_buffer1[] = {0, 3, 3, 6, 10, 15};
Expand Down Expand Up @@ -2324,6 +2404,21 @@ class TestArrayImport : public ::testing::Test {
legacy);
}

void FillRunEndEncoded(int64_t length, int64_t offset) {
FillRunEndEncoded(&c_struct_, length, offset);
}

void FillRunEndEncoded(struct ArrowArray* c, int64_t length, int64_t offset) {
static const void* buffers[1] = {nullptr};
c->length = length;
c->null_count = 0;
c->offset = offset;
c->n_buffers = 1;
c->buffers = buffers;
c->n_children = 2;
c->children = NLastChildren(2, c);
}

void CheckImport(const std::shared_ptr<Array>& expected) {
ArrayReleaseCallback cb(&c_struct_);

Expand Down Expand Up @@ -2674,6 +2769,49 @@ TEST_F(TestArrayImport, Struct) {
CheckImport(expected);
}

TEST_F(TestArrayImport, RunEndEncoded) {
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(9, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(run_end_encoded(int16(), float32()),
"[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]"));
ASSERT_OK(expected->ValidateFull());
CheckImport(expected);
}

TEST_F(TestArrayImport, RunEndEncodedWithOffset) {
auto ree_type = run_end_encoded(int16(), float32());
// Offset in children
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(7, 0);
ASSERT_OK_AND_ASSIGN(auto expected,
REEFromJSON(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Ofsset in parent
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(5, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]"));
CheckImport(expected);

// Length in parent that cuts last run
FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5);
FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);

// Offset in both children and parent
FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5);
FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5);
FillRunEndEncoded(4, 2);
ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]"));
CheckImport(expected);
}

TEST_F(TestArrayImport, SparseUnion) {
auto type = sparse_union({field("strs", utf8()), field("ints", int8())}, {43, 42});
auto expected =
Expand Down Expand Up @@ -3179,6 +3317,10 @@ TEST_F(TestSchemaRoundtrip, Union) {
TestWithTypeFactory([&]() { return dense_union({f1, f2}, type_codes); });
}

TEST_F(TestSchemaRoundtrip, RunEndEncoded) {
TestWithTypeFactory([]() { return run_end_encoded(int16(), float32()); });
}

TEST_F(TestSchemaRoundtrip, Dictionary) {
for (auto index_ty : all_dictionary_index_types()) {
TestWithTypeFactory([&]() { return dictionary(index_ty, utf8()); });
Expand Down Expand Up @@ -3470,6 +3612,18 @@ TEST_F(TestArrayRoundtrip, Union) {
}
}

TEST_F(TestArrayRoundtrip, RunEndEncoded) {
{
auto factory = []() -> Result<std::shared_ptr<Array>> {
ARROW_ASSIGN_OR_RAISE(auto ree_array,
REEFromJSON(run_end_encoded(int32(), int8()),
"[1, 2, 2, 3, null, null, null, 4]"));
return ree_array->Slice(1, 5);
};
TestWithArrayFactory(factory);
}
}

TEST_F(TestArrayRoundtrip, Dictionary) {
{
auto factory = []() {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ std::shared_ptr<DataType> struct_(
return std::make_shared<StructType>(MakeFields(fields));
}

std::shared_ptr<DataType> run_end_encoded(std::shared_ptr<arrow::DataType> run_end_type,
std::shared_ptr<DataType> run_end_encoded(std::shared_ptr<DataType> run_end_type,
std::shared_ptr<DataType> value_type) {
return std::make_shared<RunEndEncodedType>(std::move(run_end_type),
std::move(value_type));
Expand Down

0 comments on commit 3e0dc4c

Please sign in to comment.