Skip to content

Commit

Permalink
Merge pull request apache#19 from Intel-bigdata/wip_chendi
Browse files Browse the repository at this point in the history
[C++] Use one buffer for merge instead of multiple slices
  • Loading branch information
xuechendi authored Dec 17, 2019
2 parents f7ba4fd + 786bd67 commit 1d01a6b
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions cpp/src/arrow/compute/kernels/sort_arrays_to_indices.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,22 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
private:
Comparator compare_;
std::vector<std::shared_ptr<ArrayType>> typed_arrays_;
uint64_t merge_time = 0;

std::pair<ArrayItemIndex*, ArrayItemIndex*> merge(
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>>::iterator
arrays_valid_range_begin,
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>>::iterator
arrays_valid_range_end) {
std::pair<uint64_t, uint64_t> merge(
ArrayItemIndex* arrow_buffer, ArrayItemIndex* tmp_buffer,
std::vector<std::pair<uint64_t, uint64_t>>::iterator arrays_valid_range_begin,
std::vector<std::pair<uint64_t, uint64_t>>::iterator arrays_valid_range_end) {
auto size = arrays_valid_range_end - arrays_valid_range_begin;
std::pair<ArrayItemIndex*, ArrayItemIndex*> left;
std::pair<ArrayItemIndex*, ArrayItemIndex*> right;
std::pair<uint64_t, uint64_t> left;
std::pair<uint64_t, uint64_t> right;
if (size > 2) {
auto half_size = size / 2;
auto arrays_valid_range_middle = arrays_valid_range_begin + half_size;
left = merge(arrays_valid_range_begin, arrays_valid_range_middle);
right = merge(arrays_valid_range_middle, arrays_valid_range_end);
left = merge(arrow_buffer, tmp_buffer, arrays_valid_range_begin,
arrays_valid_range_middle);
right = merge(arrow_buffer, tmp_buffer, arrays_valid_range_middle,
arrays_valid_range_end);
} else if (size == 2) {
left = *arrays_valid_range_begin;
right = *(arrays_valid_range_end - 1);
Expand All @@ -127,20 +129,19 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
auto left_size = left.second - left.first;
auto right_size = right.second - right.first;

ArrayItemIndex* left_tmp = new ArrayItemIndex[left_size];
memcpy(left_tmp, left.first, left_size * sizeof(ArrayItemIndex));
ArrayItemIndex* right_tmp = new ArrayItemIndex[right_size];
memcpy(right_tmp, right.first, right_size * sizeof(ArrayItemIndex));
memcpy(tmp_buffer + left.first, arrow_buffer + left.first,
left_size * sizeof(ArrayItemIndex));
memcpy(tmp_buffer + right.first, arrow_buffer + right.first,
right_size * sizeof(ArrayItemIndex));

std::set_union(left_tmp, left_tmp + left_size, right_tmp, right_tmp + right_size,
left.first, [this](ArrayItemIndex left, ArrayItemIndex right) {
std::set_union(tmp_buffer + left.first, tmp_buffer + left.second,
tmp_buffer + right.first, tmp_buffer + right.second,
arrow_buffer + left.first,
[this](ArrayItemIndex left, ArrayItemIndex right) {
return typed_arrays_[left.array_id]->GetView(left.id) <
typed_arrays_[right.array_id]->GetView(right.id);
});
delete[] left_tmp;
delete[] right_tmp;

assert((left.first + left_size + right_size) == right.second);
return std::make_pair(left.first, right.second);
}

Expand All @@ -161,16 +162,17 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total;
std::vector<std::pair<ArrayItemIndex*, ArrayItemIndex*>> arrays_valid_range;
std::vector<std::pair<uint64_t, uint64_t>> arrays_valid_range;

int64_t array_id = 0;
int64_t null_count_total = 0;
int64_t indices_i = 0;
uint64_t array_sort = 0;

for (auto array : values) {
auto typed_array = std::dynamic_pointer_cast<ArrayType>(array);
typed_arrays_.push_back(typed_array);
auto array_begin = indices_begin + indices_i;
auto array_begin = indices_i;
for (int64_t i = 0; i < array->length(); i++) {
if (!array->IsNull(i)) {
(indices_begin + indices_i)->array_id = array_id;
Expand All @@ -183,8 +185,8 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
}
}
// first round sort
auto array_end = indices_begin + indices_i;
std::stable_sort(array_begin, array_end,
auto array_end = indices_i;
std::stable_sort(indices_begin + array_begin, indices_begin + array_end,
[typed_array, this](ArrayItemIndex left, ArrayItemIndex right) {
return typed_array->GetView(left.id) <
typed_array->GetView(right.id);
Expand All @@ -194,7 +196,10 @@ class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
}

// merge sort
merge(arrays_valid_range.begin(), arrays_valid_range.end());
ArrayItemIndex* tmp_buffer_begin = new ArrayItemIndex[indices_i]();
merge(indices_begin, tmp_buffer_begin, arrays_valid_range.begin(),
arrays_valid_range.end());
delete[] tmp_buffer_begin;

*offsets = std::make_shared<FixedSizeBinaryArray>(
std::make_shared<FixedSizeBinaryType>(sizeof(ArrayItemIndex) / sizeof(int32_t)),
Expand Down

0 comments on commit 1d01a6b

Please sign in to comment.