Skip to content

Commit

Permalink
EXP: apacheGH-44084: [C++] Improve merge step in chunked sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 24, 2024
1 parent 83f35de commit a24e70a
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 101 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ set(ARROW_COMPUTE_SRCS
compute/light_array_internal.cc
compute/ordering.cc
compute/registry.cc
compute/kernels/chunked_internal.cc
compute/kernels/codegen_internal.cc
compute/kernels/ree_util_internal.cc
compute/kernels/scalar_cast_boolean.cc
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

namespace arrow::internal {

using ::arrow::util::span;

namespace {
template <typename T>
int64_t GetLength(const T& array) {
Expand All @@ -42,7 +44,7 @@ int64_t GetLength<std::shared_ptr<RecordBatch>>(
}

template <typename T>
inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
inline std::vector<int64_t> MakeChunksOffsets(span<T> chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
Expand Down Expand Up @@ -112,13 +114,13 @@ void ResolveManyInline(uint32_t num_offsets, const int64_t* signed_offsets,
} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(chunks))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks) noexcept
ChunkResolver::ChunkResolver(span<const Array* const> chunks) noexcept
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const RecordBatchVector& batches) noexcept
: offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}
: offsets_(MakeChunksOffsets(span(batches))), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)),
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
#include "arrow/util/span.h"

namespace arrow::internal {

Expand Down Expand Up @@ -76,7 +77,7 @@ struct ARROW_EXPORT ChunkResolver {

public:
explicit ChunkResolver(const ArrayVector& chunks) noexcept;
explicit ChunkResolver(const std::vector<const Array*>& chunks) noexcept;
explicit ChunkResolver(::arrow::util::span<const Array* const> chunks) noexcept;
explicit ChunkResolver(const RecordBatchVector& batches) noexcept;

/// \brief Construct a ChunkResolver from a vector of chunks.size() + 1 offsets.
Expand Down
82 changes: 62 additions & 20 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,32 @@
#include <algorithm>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/chunk_resolver.h"
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/util/span.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

// The target chunk in a chunked array.
struct ResolvedChunk {
// The target array in chunked array.
const Array* array;
// The index in the target array.
const int64_t index;
int64_t index;

ResolvedChunk(const Array* array, int64_t index) : array(array), index(index) {}

public:
friend bool operator==(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array == right.array && left.index == right.index;
}
friend bool operator!=(const ResolvedChunk& left, const ResolvedChunk& right) {
return left.array != right.array || left.index != right.index;
}

bool IsNull() const { return array->IsNull(index); }

template <typename ArrowType, typename ViewType = GetViewType<ArrowType>>
Expand All @@ -50,34 +56,70 @@ struct ResolvedChunk {
}
};

struct CompressedChunkLocation {
static constexpr int kChunkIndexBits = 24;
static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits;

static constexpr int64_t kMaxChunkIndex = (1LL << kChunkIndexBits) - 1;
static constexpr int64_t kMaxIndexInChunk = (1LL << KIndexInChunkBits) - 1;

uint32_t chunk_index : kChunkIndexBits;
uint64_t index_in_chunk : KIndexInChunkBits;
};

static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation));

class ChunkedArrayResolver {
private:
::arrow::internal::ChunkResolver resolver_;
std::vector<const Array*> chunks_;
util::span<const Array* const> chunks_;
std::vector<const Array*> owned_chunks_;

public:
explicit ChunkedArrayResolver(const std::vector<const Array*>& chunks)
explicit ChunkedArrayResolver(std::vector<const Array*>&& chunks)
: resolver_(chunks), chunks_(chunks), owned_chunks_(std::move(chunks)) {}
explicit ChunkedArrayResolver(util::span<const Array* const> chunks)
: resolver_(chunks), chunks_(chunks) {}

ChunkedArrayResolver(ChunkedArrayResolver&& other) = default;
ChunkedArrayResolver& operator=(ChunkedArrayResolver&& other) = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ChunkedArrayResolver);

ChunkedArrayResolver(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) = default;
ChunkedArrayResolver(const ChunkedArrayResolver& other)
: resolver_(other.resolver_), owned_chunks_(other.owned_chunks_) {
// Rebind span to owned_chunks_ if necessary
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
}
ChunkedArrayResolver& operator=(const ChunkedArrayResolver& other) {
resolver_ = other.resolver_;
owned_chunks_ = other.owned_chunks_;
chunks_ = owned_chunks_.empty() ? other.chunks_ : owned_chunks_;
return *this;
}

ResolvedChunk Resolve(int64_t index) const {
const auto loc = resolver_.Resolve(index);
return {chunks_[loc.chunk_index], loc.index_in_chunk};
}
};

inline std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays) {
std::vector<const Array*> pointers(arrays.size());
std::transform(arrays.begin(), arrays.end(), pointers.begin(),
[&](const std::shared_ptr<Array>& array) { return array.get(); });
return pointers;
}
std::vector<const Array*> GetArrayPointers(const ArrayVector& arrays);

class ChunkedIndexMapper {
public:
ChunkedIndexMapper(util::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end);
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();
Status PhysicalToLogical();

private:
::arrow::internal::ChunkResolver resolver_;
util::span<const Array* const> chunks_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
};

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/kernels/vector_rank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace arrow::compute::internal {

using ::arrow::util::span;

namespace {

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -237,7 +239,7 @@ class Ranker<ChunkedArray> : public RankerMixin<ChunkedArray, Ranker<ChunkedArra
physical_chunks_, order_, null_placement_));

const auto arrays = GetArrayPointers(physical_chunks_);
auto value_selector = [resolver = ChunkedArrayResolver(arrays)](int64_t index) {
auto value_selector = [resolver = ChunkedArrayResolver(span(arrays))](int64_t index) {
return resolver.Resolve(index).Value<InType>();
};
ARROW_ASSIGN_OR_RAISE(*output_, CreateRankings(ctx_, sorted, null_placement_,
Expand Down
96 changes: 62 additions & 34 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace arrow {

using internal::checked_cast;
using internal::ChunkLocation;
using util::span;

namespace compute {
namespace internal {
Expand Down Expand Up @@ -83,6 +84,7 @@ class ChunkedArraySorter : public TypeVisitor {
*output_ = {indices_end_, indices_end_, indices_end_, indices_end_};
return Status::OK();
}
const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
const auto arrays = GetArrayPointers(physical_chunks_);

// Sort each chunk independently and merge to sorted indices.
Expand All @@ -102,45 +104,66 @@ class ChunkedArraySorter : public TypeVisitor {
begin_offset, options, ctx_));
begin_offset = end_offset;
}
DCHECK_EQ(end_offset, indices_end_ - indices_begin_);
DCHECK_EQ(end_offset, num_indices);

// Then merge them by pairs, recursively
if (sorted.size() > 1) {
auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle,
uint64_t* nulls_end, uint64_t* temp_indices,
int64_t null_count) {
ChunkedIndexMapper chunked_mapper(arrays, indices_begin_, indices_end_);
// TODO: s/LogicalToPhysical/LinearToChunked/ ?
ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair,
chunked_mapper.LogicalToPhysical());
auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair;

std::vector<ChunkedNullPartitionResult> chunk_sorted(num_chunks);
for (int i = 0; i < num_chunks; ++i) {
chunk_sorted[i] = ChunkedNullPartitionResult::TranslateFrom(
sorted[i], indices_begin_, chunked_indices_begin);
}

auto merge_nulls = [&](CompressedChunkLocation* nulls_begin,
CompressedChunkLocation* nulls_middle,
CompressedChunkLocation* nulls_end,
CompressedChunkLocation* temp_indices, int64_t null_count) {
if (has_null_like_values<typename ArrayType::TypeClass>::value) {
PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end,
ChunkedArrayResolver(arrays), null_count,
null_placement_);
PartitionNullsOnly<StablePartitioner>(nulls_begin, nulls_end, arrays,
null_count, null_placement_);
}
};
auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle,
uint64_t* range_end, uint64_t* temp_indices) {
MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
temp_indices);
};

MergeImpl merge_impl{null_placement_, std::move(merge_nulls),
std::move(merge_non_nulls)};
auto merge_non_nulls =
[&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) {
MergeNonNulls<ArrayType>(range_begin, range_middle, range_end, arrays,
temp_indices);
};

ChunkedMergeImpl merge_impl{null_placement_, std::move(merge_nulls),
std::move(merge_non_nulls)};
// std::merge is only called on non-null values, so size temp indices accordingly
RETURN_NOT_OK(merge_impl.Init(ctx_, indices_end_ - indices_begin_ - null_count));
RETURN_NOT_OK(merge_impl.Init(ctx_, num_indices - null_count));

while (sorted.size() > 1) {
auto out_it = sorted.begin();
auto it = sorted.begin();
while (it < sorted.end() - 1) {
while (chunk_sorted.size() > 1) {
// Merge all pairs of chunks
auto out_it = chunk_sorted.begin();
auto it = chunk_sorted.begin();
while (it < chunk_sorted.end() - 1) {
const auto& left = *it++;
const auto& right = *it++;
DCHECK_EQ(left.overall_end(), right.overall_begin());
const auto merged = merge_impl.Merge(left, right, null_count);
*out_it++ = merged;
}
if (it < sorted.end()) {
if (it < chunk_sorted.end()) {
*out_it++ = *it++;
}
sorted.erase(out_it, sorted.end());
chunk_sorted.erase(out_it, chunk_sorted.end());
}

// Reverse everything
sorted.resize(1);
sorted[0] = NullPartitionResult::TranslateFrom(
chunk_sorted[0], chunked_indices_begin, indices_begin_);

RETURN_NOT_OK(chunked_mapper.PhysicalToLogical());
}

DCHECK_EQ(sorted.size(), 1);
Expand All @@ -154,34 +177,39 @@ class ChunkedArraySorter : public TypeVisitor {
}

template <typename ArrayType>
void MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, uint64_t* range_end,
const std::vector<const Array*>& arrays, uint64_t* temp_indices) {
void MergeNonNulls(CompressedChunkLocation* range_begin,
CompressedChunkLocation* range_middle,
CompressedChunkLocation* range_end, span<const Array* const> arrays,
CompressedChunkLocation* temp_indices) {
using ArrowType = typename ArrayType::TypeClass;
const ChunkedArrayResolver left_resolver(arrays);
const ChunkedArrayResolver right_resolver(arrays);

if (order_ == SortOrder::Ascending) {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve(left);
const auto chunk_right = right_resolver.Resolve(right);
return chunk_left.Value<ArrowType>() < chunk_right.Value<ArrowType>();
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
return ChunkValue<ArrowType>(arrays, left) <
ChunkValue<ArrowType>(arrays, right);
});
} else {
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](uint64_t left, uint64_t right) {
const auto chunk_left = left_resolver.Resolve(left);
const auto chunk_right = right_resolver.Resolve(right);
[&](CompressedChunkLocation left, CompressedChunkLocation right) {
// We don't use 'left > right' here to reduce required
// operator. If we use 'right < left' here, '<' is only
// required.
return chunk_right.Value<ArrowType>() < chunk_left.Value<ArrowType>();
return ChunkValue<ArrowType>(arrays, right) <
ChunkValue<ArrowType>(arrays, left);
});
}
// Copy back temp area into main buffer
std::copy(temp_indices, temp_indices + (range_end - range_begin), range_begin);
}

template <typename ArrowType>
auto ChunkValue(span<const Array* const> arrays, CompressedChunkLocation loc) const {
return ResolvedChunk(arrays[loc.chunk_index],
static_cast<int64_t>(loc.index_in_chunk))
.template Value<ArrowType>();
}

uint64_t* indices_begin_;
uint64_t* indices_end_;
const std::shared_ptr<DataType>& physical_type_;
Expand Down
Loading

0 comments on commit a24e70a

Please sign in to comment.