Skip to content

Commit

Permalink
Merge pull request apache#16 from Intel-bigdata/wip_chendi
Browse files Browse the repository at this point in the history
[C++] Change to use sort + merge
  • Loading branch information
xuechendi authored Dec 16, 2019
2 parents 708e905 + 2b4969f commit 81f809f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
66 changes: 59 additions & 7 deletions cpp/src/arrow/compute/kernels/sort_arrays_to_indices.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arrow/util/checked_cast.h>

#include <algorithm>
#include <cstdio>
#include <memory>
#include <numeric>
#include <vector>
Expand Down Expand Up @@ -101,6 +102,47 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {

private:
Comparator compare_;
std::vector<std::shared_ptr<ArrayType>> typed_arrays_;

std::pair<ArrayItemIndex*, ArrayItemIndex*> merge(
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>>::iterator
arrays_valid_range_begin,
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>>::iterator
arrays_valid_range_end) {
auto size = arrays_valid_range_end - arrays_valid_range_begin;
std::pair<ArrayItemIndex*, ArrayItemIndex*> left;
std::pair<ArrayItemIndex*, ArrayItemIndex*> right;
if (size > 2) {
auto half_size = size / 2;
auto arrays_valid_range_middle = arrays_valid_range_begin + half_size;
left = merge(arrays_valid_range_begin, arrays_valid_range_middle);
right = merge(arrays_valid_range_middle, arrays_valid_range_end);
} else if (size == 2) {
left = *arrays_valid_range_begin;
right = *(arrays_valid_range_end - 1);
} else {
// only one item
return *arrays_valid_range_begin;
}
auto left_size = left.second - left.first;
auto right_size = right.second - right.first;

ArrayItemIndex* left_tmp = new ArrayItemIndex[left_size];
memcpy(left_tmp, left.first, left_size * sizeof(ArrayItemIndex));
ArrayItemIndex* right_tmp = new ArrayItemIndex[right_size];
memcpy(right_tmp, right.first, right_size * sizeof(ArrayItemIndex));

std::set_union(left_tmp, left_tmp + left_size, right_tmp, right_tmp + right_size,
left.first, [this](ArrayItemIndex left, ArrayItemIndex right) {
return typed_arrays_[left.array_id]->GetView(left.id) <
typed_arrays_[right.array_id]->GetView(right.id);
});
delete[] left_tmp;
delete[] right_tmp;

assert((left.first + left_size + right_size) == right.second);
return std::make_pair(left.first, right.second);
}

Status SortArraysToIndicesImpl(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
Expand All @@ -119,11 +161,16 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total;
std::vector<std::shared_ptr<ArrayType>> typed_arrays;
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>> arrays_valid_range;

int64_t array_id = 0;
int64_t null_count_total = 0;
int64_t indices_i = 0;

for (auto array : values) {
auto typed_array = std::dynamic_pointer_cast<ArrayType>(array);
typed_arrays_.push_back(typed_array);
auto array_begin = indices_begin + indices_i;
for (int64_t i = 0; i < array->length(); i++) {
if (!array->IsNull(i)) {
(indices_begin + indices_i)->array_id = array_id;
Expand All @@ -135,14 +182,19 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
null_count_total++;
}
}
typed_arrays.push_back(std::dynamic_pointer_cast<ArrayType>(array));
// first round sort
auto array_end = indices_begin + indices_i;
std::stable_sort(array_begin, array_end,
[typed_array, this](ArrayItemIndex left, ArrayItemIndex right) {
return typed_array->GetView(left.id) <
typed_array->GetView(right.id);
});
arrays_valid_range.push_back(std::make_pair(array_begin, array_end));
array_id++;
}
auto nulls_begin = indices_begin + items_total - null_count_total;
std::stable_sort(indices_begin, nulls_begin,
[typed_arrays, this](ArrayItemIndex left, ArrayItemIndex right) {
return compare_(typed_arrays, left, right);
});

// merge sort
merge(arrays_valid_range.begin(), arrays_valid_range.end());

*offsets = std::make_shared<FixedSizeBinaryArray>(
std::make_shared<FixedSizeBinaryType>(sizeof(ArrayItemIndex) / sizeof(int32_t)),
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/kernels/sort_arrays_to_indices_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ TYPED_TEST(TestSortToIndicesKernelForIntegral, SortIntegral) {
std::vector<std::string> input;
input.push_back("[10, 12, 4, 50, 50, 32, 11]");
input.push_back("[1, 14, 43, 42, 6, null, 2]");
input.push_back("[3, 64, 15, 7, 9, 19, 33]");
input.push_back("[23, 17, 41, 18, 20, 35, 30]");
input.push_back("[37, null, 22, 13, 8, 59, 21]");
this->SortArraysToIndices(input);
}

Expand Down

0 comments on commit 81f809f

Please sign in to comment.