From 275871b37c4adb31089461dab865c5ea3a906888 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 25 Sep 2024 10:24:33 +0200 Subject: [PATCH] Also improve Table sorting --- .../arrow/compute/kernels/chunked_internal.cc | 62 +++-- .../arrow/compute/kernels/chunked_internal.h | 50 ++-- cpp/src/arrow/compute/kernels/vector_sort.cc | 214 +++++++++--------- .../compute/kernels/vector_sort_internal.h | 15 +- 4 files changed, 187 insertions(+), 154 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.cc b/cpp/src/arrow/compute/kernels/chunked_internal.cc index d69aa3b953681..d1b234bfc48ef 100644 --- a/cpp/src/arrow/compute/kernels/chunked_internal.cc +++ b/cpp/src/arrow/compute/kernels/chunked_internal.cc @@ -19,6 +19,7 @@ #include +#include "arrow/record_batch.h" #include "arrow/util/logging.h" namespace arrow::compute::internal { @@ -32,25 +33,37 @@ std::vector GetArrayPointers(const ArrayVector& arrays) { return pointers; } -ChunkedIndexMapper::ChunkedIndexMapper(util::span chunks, - uint64_t* indices_begin, uint64_t* indices_end) - : resolver_(chunks), - chunks_(chunks), - indices_begin_(indices_begin), - indices_end_(indices_end) {} +std::vector ChunkedIndexMapper::GetChunkLengths( + util::span chunks) { + std::vector chunk_lengths(chunks.size()); + for (int64_t i = 0; i < static_cast(chunks.size()); ++i) { + chunk_lengths[i] = chunks[i]->length(); + } + return chunk_lengths; +} + +std::vector ChunkedIndexMapper::GetChunkLengths( + const RecordBatchVector& chunks) { + std::vector chunk_lengths(chunks.size()); + for (int64_t i = 0; i < static_cast(chunks.size()); ++i) { + chunk_lengths[i] = chunks[i]->num_rows(); + } + return chunk_lengths; +} -Result> +Result> ChunkedIndexMapper::LogicalToPhysical() { - // Check that indices would fall in bounds for CompressedChunkLocation - if (ARROW_PREDICT_FALSE(chunks_.size() > CompressedChunkLocation::kMaxChunkIndex + 1)) { + // Check that indices would fall in bounds for ResolvedChunkIndex + if (ARROW_PREDICT_FALSE(chunk_lengths_.size() > + ResolvedChunkIndex::kMaxChunkIndex + 1)) { return Status::NotImplemented("Chunked array has more than ", - CompressedChunkLocation::kMaxChunkIndex + 1, " chunks"); + ResolvedChunkIndex::kMaxChunkIndex + 1, " chunks"); } - for (const Array* chunk : chunks_) { - if (ARROW_PREDICT_FALSE(static_cast(chunk->length()) > - CompressedChunkLocation::kMaxIndexInChunk + 1)) { + for (int64_t chunk_length : chunk_lengths_) { + if (ARROW_PREDICT_FALSE(static_cast(chunk_length) > + ResolvedChunkIndex::kMaxIndexInChunk + 1)) { return Status::NotImplemented("Individual chunk in chunked array has more than ", - CompressedChunkLocation::kMaxIndexInChunk + 1, + ResolvedChunkIndex::kMaxIndexInChunk + 1, " elements"); } } @@ -59,10 +72,10 @@ ChunkedIndexMapper::LogicalToPhysical() { std::array, kMaxBatchSize> batch; const int64_t num_indices = static_cast(indices_end_ - indices_begin_); - CompressedChunkLocation* physical_begin = - reinterpret_cast(indices_begin_); + ResolvedChunkIndex* physical_begin = + reinterpret_cast(indices_begin_); DCHECK_EQ(physical_begin + num_indices, - reinterpret_cast(indices_end_)); + reinterpret_cast(indices_end_)); for (int64_t i = 0; i < num_indices; i += kMaxBatchSize) { const int64_t batch_size = std::min(kMaxBatchSize, num_indices - i); @@ -71,8 +84,7 @@ ChunkedIndexMapper::LogicalToPhysical() { DCHECK(ok) << "ResolveMany unexpectedly failed (invalid logical index?)"; for (int64_t j = 0; j < batch_size; ++j) { const auto loc = batch[j]; - physical_begin[i + j] = - CompressedChunkLocation{loc.chunk_index, loc.index_in_chunk}; + physical_begin[i + j] = ResolvedChunkIndex{loc.chunk_index, loc.index_in_chunk}; } } @@ -80,23 +92,23 @@ ChunkedIndexMapper::LogicalToPhysical() { } Status ChunkedIndexMapper::PhysicalToLogical() { - std::vector chunk_offsets(chunks_.size()); + std::vector chunk_offsets(chunk_lengths_.size()); { int64_t offset = 0; - for (int64_t i = 0; i < static_cast(chunks_.size()); ++i) { + for (int64_t i = 0; i < static_cast(chunk_lengths_.size()); ++i) { chunk_offsets[i] = offset; - offset += chunks_[i]->length(); + offset += chunk_lengths_[i]; } } const int64_t num_indices = static_cast(indices_end_ - indices_begin_); - CompressedChunkLocation* physical_begin = - reinterpret_cast(indices_begin_); + ResolvedChunkIndex* physical_begin = + reinterpret_cast(indices_begin_); for (int64_t i = 0; i < num_indices; ++i) { const auto loc = physical_begin[i]; DCHECK_LT(loc.chunk_index(), chunk_offsets.size()); DCHECK_LT(loc.index_in_chunk(), - static_cast(chunks_[loc.chunk_index()]->length())); + static_cast(chunk_lengths_[loc.chunk_index()])); indices_begin_[i] = chunk_offsets[loc.chunk_index()] + static_cast(loc.index_in_chunk()); } diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.h b/cpp/src/arrow/compute/kernels/chunked_internal.h index 391a8a6af15b3..b254540f5e4b6 100644 --- a/cpp/src/arrow/compute/kernels/chunked_internal.h +++ b/cpp/src/arrow/compute/kernels/chunked_internal.h @@ -30,6 +30,9 @@ namespace arrow::compute::internal { +using ::arrow::internal::ChunkResolver; +using ::arrow::internal::TypedChunkLocation; + // The target chunk in a chunked array. struct ResolvedChunk { // The target array in chunked array. @@ -56,33 +59,38 @@ struct ResolvedChunk { } }; -// TODO rename to something shorter? (e.g. ResolvedChunkIndex, CompressedChunkLoc...) -struct CompressedChunkLocation { +struct ResolvedChunkIndex { static constexpr int kChunkIndexBits = 24; static constexpr int KIndexInChunkBits = 64 - kChunkIndexBits; static constexpr uint64_t kMaxChunkIndex = (1ULL << kChunkIndexBits) - 1; static constexpr uint64_t kMaxIndexInChunk = (1ULL << KIndexInChunkBits) - 1; - CompressedChunkLocation() = default; + ResolvedChunkIndex() = default; constexpr uint64_t chunk_index() const { return data_ & kMaxChunkIndex; } constexpr uint64_t index_in_chunk() const { return data_ >> kChunkIndexBits; } - explicit constexpr CompressedChunkLocation(uint64_t chunk_index, - uint64_t index_in_chunk) + + explicit constexpr ResolvedChunkIndex(uint64_t chunk_index, uint64_t index_in_chunk) : data_((index_in_chunk << kChunkIndexBits) | chunk_index) {} + template + explicit operator TypedChunkLocation() { + return {static_cast(chunk_index()), + static_cast(index_in_chunk())}; + } + private: uint64_t data_; }; -// CompressedChunkLocation must be the same size of a logical index, to -// enable in-place resolution. -static_assert(sizeof(uint64_t) == sizeof(CompressedChunkLocation)); +// ResolvedChunkIndex must be the same size of a logical index, to +// enable in-place resolution in ChunkedIndexMapper.s +static_assert(sizeof(uint64_t) == sizeof(ResolvedChunkIndex)); class ChunkedArrayResolver { private: - ::arrow::internal::ChunkResolver resolver_; + ChunkResolver resolver_; util::span chunks_; std::vector owned_chunks_; @@ -116,20 +124,32 @@ std::vector GetArrayPointers(const ArrayVector& arrays); class ChunkedIndexMapper { public: - ChunkedIndexMapper(util::span chunks, uint64_t* indices_begin, - uint64_t* indices_end); ChunkedIndexMapper(const std::vector& chunks, uint64_t* indices_begin, uint64_t* indices_end) : ChunkedIndexMapper(util::span(chunks), indices_begin, indices_end) {} + ChunkedIndexMapper(util::span chunks, uint64_t* indices_begin, + uint64_t* indices_end) + : resolver_(chunks), + chunk_lengths_(GetChunkLengths(chunks)), + indices_begin_(indices_begin), + indices_end_(indices_end) {} + ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin, + uint64_t* indices_end) + : resolver_(chunks), + chunk_lengths_(GetChunkLengths(chunks)), + indices_begin_(indices_begin), + indices_end_(indices_end) {} - Result> - LogicalToPhysical(); + Result> LogicalToPhysical(); Status PhysicalToLogical(); private: - ::arrow::internal::ChunkResolver resolver_; - util::span chunks_; + static std::vector GetChunkLengths(util::span chunks); + static std::vector GetChunkLengths(const RecordBatchVector& chunks); + + ChunkResolver resolver_; + std::vector chunk_lengths_; uint64_t* indices_begin_; uint64_t* indices_end_; }; diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc b/cpp/src/arrow/compute/kernels/vector_sort.cc index 290814dac1b11..5cf17d1af41d7 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort.cc +++ b/cpp/src/arrow/compute/kernels/vector_sort.cc @@ -120,18 +120,18 @@ class ChunkedArraySorter : public TypeVisitor { sorted[i], indices_begin_, chunked_indices_begin); } - auto merge_nulls = [&](CompressedChunkLocation* nulls_begin, - CompressedChunkLocation* nulls_middle, - CompressedChunkLocation* nulls_end, - CompressedChunkLocation* temp_indices, int64_t null_count) { + auto merge_nulls = [&](ResolvedChunkIndex* nulls_begin, + ResolvedChunkIndex* nulls_middle, + ResolvedChunkIndex* nulls_end, + ResolvedChunkIndex* temp_indices, int64_t null_count) { if (has_null_like_values::value) { PartitionNullsOnly(nulls_begin, nulls_end, arrays, null_count, null_placement_); } }; auto merge_non_nulls = - [&](CompressedChunkLocation* range_begin, CompressedChunkLocation* range_middle, - CompressedChunkLocation* range_end, CompressedChunkLocation* temp_indices) { + [&](ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle, + ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) { MergeNonNulls(range_begin, range_middle, range_end, arrays, temp_indices); }; @@ -177,21 +177,20 @@ class ChunkedArraySorter : public TypeVisitor { } template - void MergeNonNulls(CompressedChunkLocation* range_begin, - CompressedChunkLocation* range_middle, - CompressedChunkLocation* range_end, span arrays, - CompressedChunkLocation* temp_indices) { + void MergeNonNulls(ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle, + ResolvedChunkIndex* range_end, span arrays, + ResolvedChunkIndex* temp_indices) { using ArrowType = typename ArrayType::TypeClass; if (order_ == SortOrder::Ascending) { std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](CompressedChunkLocation left, CompressedChunkLocation right) { + [&](ResolvedChunkIndex left, ResolvedChunkIndex right) { return ChunkValue(arrays, left) < ChunkValue(arrays, right); }); } else { std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](CompressedChunkLocation left, CompressedChunkLocation right) { + [&](ResolvedChunkIndex left, ResolvedChunkIndex right) { // We don't use 'left > right' here to reduce required // operator. If we use 'right < left' here, '<' is only // required. @@ -204,7 +203,7 @@ class ChunkedArraySorter : public TypeVisitor { } template - auto ChunkValue(span arrays, CompressedChunkLocation loc) const { + auto ChunkValue(span arrays, ResolvedChunkIndex loc) const { return ResolvedChunk(arrays[loc.chunk_index()], static_cast(loc.index_in_chunk())) .template Value(); @@ -639,8 +638,6 @@ class TableSorter { batches_(MakeBatches(table, &status_)), options_(options), null_placement_(options.null_placement), - left_resolver_(batches_), - right_resolver_(batches_), sort_keys_(ResolveSortKeys(table, batches_, options.sort_keys, &status_)), indices_begin_(indices_begin), indices_end_(indices_end), @@ -703,14 +700,25 @@ class TableSorter { // Then merge them by pairs, recursively if (sorted.size() > 1) { + ChunkedIndexMapper chunked_mapper(batches_, indices_begin_, indices_end_); + ARROW_ASSIGN_OR_RAISE(auto chunked_indices_pair, + chunked_mapper.LogicalToPhysical()); + auto [chunked_indices_begin, chunked_indices_end] = chunked_indices_pair; + + std::vector chunk_sorted(num_batches); + for (int64_t i = 0; i < num_batches; ++i) { + chunk_sorted[i] = ChunkedNullPartitionResult::TranslateFrom( + sorted[i], indices_begin_, chunked_indices_begin); + } + struct Visitor { TableSorter* sorter; - std::vector* sorted; + std::vector* chunk_sorted; int64_t null_count; -#define VISIT(TYPE) \ - Status Visit(const TYPE& type) { \ - return sorter->MergeInternal(std::move(*sorted), null_count); \ +#define VISIT(TYPE) \ + Status Visit(const TYPE& type) { \ + return sorter->MergeInternal(chunk_sorted, null_count); \ } VISIT_SORTABLE_PHYSICAL_TYPES(VISIT) @@ -722,104 +730,99 @@ class TableSorter { type.ToString()); } }; - Visitor visitor{this, &sorted, null_count}; + Visitor visitor{this, &chunk_sorted, null_count}; RETURN_NOT_OK(VisitTypeInline(*sort_keys_[0].type, &visitor)); + + DCHECK_EQ(chunk_sorted.size(), 1); + DCHECK_EQ(chunk_sorted[0].overall_begin(), chunked_indices_begin); + DCHECK_EQ(chunk_sorted[0].overall_end(), chunked_indices_end); + + RETURN_NOT_OK(chunked_mapper.PhysicalToLogical()); } return Status::OK(); } // Recursive merge routine, typed on the first sort key - template - Status MergeInternal(std::vector sorted, int64_t null_count) { - auto merge_nulls = [&](uint64_t* nulls_begin, uint64_t* nulls_middle, - uint64_t* nulls_end, uint64_t* temp_indices, - int64_t null_count) { - MergeNulls(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); - }; - auto merge_non_nulls = [&](uint64_t* range_begin, uint64_t* range_middle, - uint64_t* range_end, uint64_t* temp_indices) { - MergeNonNulls(range_begin, range_middle, range_end, temp_indices); + template + Status MergeInternal(std::vector* sorted, + int64_t null_count) { + auto merge_nulls = [&](ResolvedChunkIndex* nulls_begin, + ResolvedChunkIndex* nulls_middle, + ResolvedChunkIndex* nulls_end, + ResolvedChunkIndex* temp_indices, int64_t null_count) { + MergeNulls(nulls_begin, nulls_middle, nulls_end, temp_indices, + null_count); }; - - MergeImpl merge_impl(options_.null_placement, std::move(merge_nulls), - std::move(merge_non_nulls)); + auto merge_non_nulls = + [&](ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle, + ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) { + MergeNonNulls(range_begin, range_middle, range_end, temp_indices); + }; + + ChunkedMergeImpl merge_impl(options_.null_placement, std::move(merge_nulls), + std::move(merge_non_nulls)); RETURN_NOT_OK(merge_impl.Init(ctx_, table_.num_rows())); - while (sorted.size() > 1) { - auto out_it = sorted.begin(); - auto it = sorted.begin(); - while (it < sorted.end() - 1) { + while (sorted->size() > 1) { + auto out_it = sorted->begin(); + auto it = sorted->begin(); + while (it < sorted->end() - 1) { const auto& left = *it++; const auto& right = *it++; DCHECK_EQ(left.overall_end(), right.overall_begin()); *out_it++ = merge_impl.Merge(left, right, null_count); } - if (it < sorted.end()) { + if (it < sorted->end()) { *out_it++ = *it++; } - sorted.erase(out_it, sorted.end()); + sorted->erase(out_it, sorted->end()); } - DCHECK_EQ(sorted.size(), 1); - DCHECK_EQ(sorted[0].overall_begin(), indices_begin_); - DCHECK_EQ(sorted[0].overall_end(), indices_end_); return comparator_.status(); } - // Merge rows with a null or a null-like in the first sort key - template - enable_if_t::value> MergeNulls(uint64_t* nulls_begin, - uint64_t* nulls_middle, - uint64_t* nulls_end, - uint64_t* temp_indices, - int64_t null_count) { - auto& comparator = comparator_; - const auto& first_sort_key = sort_keys_[0]; - - ChunkLocation left_loc; - ChunkLocation right_loc; - std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, - [&](uint64_t left, uint64_t right) { - // First column is either null or nan - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); - auto chunk_left = first_sort_key.GetChunk(left_loc); - auto chunk_right = first_sort_key.GetChunk(right_loc); - const auto left_is_null = chunk_left.IsNull(); - const auto right_is_null = chunk_right.IsNull(); - if (left_is_null == right_is_null) { - return comparator.Compare(left_loc, right_loc, 1); - } else if (options_.null_placement == NullPlacement::AtEnd) { - return right_is_null; - } else { - return left_is_null; - } - }); - // Copy back temp area into main buffer - std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); - } - - template - enable_if_t::value> MergeNulls(uint64_t* nulls_begin, - uint64_t* nulls_middle, - uint64_t* nulls_end, - uint64_t* temp_indices, - int64_t null_count) { - MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); + template + void MergeNulls(ResolvedChunkIndex* nulls_begin, ResolvedChunkIndex* nulls_middle, + ResolvedChunkIndex* nulls_end, ResolvedChunkIndex* temp_indices, + int64_t null_count) { + if constexpr (has_null_like_values::value) { + // Merge rows with a null or a null-like in the first sort key + auto& comparator = comparator_; + const auto& first_sort_key = sort_keys_[0]; + + std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, + [&](ResolvedChunkIndex left, ResolvedChunkIndex right) { + // First column is either null or nan + const auto left_loc = ChunkLocation{left}; + const auto right_loc = ChunkLocation{right}; + const auto chunk_left = first_sort_key.GetChunk(left_loc); + const auto chunk_right = first_sort_key.GetChunk(right_loc); + const auto left_is_null = chunk_left.IsNull(); + const auto right_is_null = chunk_right.IsNull(); + if (left_is_null == right_is_null) { + return comparator.Compare(left_loc, right_loc, 1); + } else if (options_.null_placement == NullPlacement::AtEnd) { + return right_is_null; + } else { + return left_is_null; + } + }); + // Copy back temp area into main buffer + std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); + } else { + MergeNullsOnly(nulls_begin, nulls_middle, nulls_end, temp_indices, null_count); + } } - void MergeNullsOnly(uint64_t* nulls_begin, uint64_t* nulls_middle, uint64_t* nulls_end, - uint64_t* temp_indices, int64_t null_count) { + void MergeNullsOnly(ResolvedChunkIndex* nulls_begin, ResolvedChunkIndex* nulls_middle, + ResolvedChunkIndex* nulls_end, ResolvedChunkIndex* temp_indices, + int64_t null_count) { // Untyped implementation auto& comparator = comparator_; - ChunkLocation left_loc; - ChunkLocation right_loc; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, - [&](uint64_t left, uint64_t right) { + [&](ResolvedChunkIndex left, ResolvedChunkIndex right) { // First column is always null - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); - return comparator.Compare(left_loc, right_loc, 1); + return comparator.Compare(ChunkLocation{left}, ChunkLocation{right}, 1); }); // Copy back temp area into main buffer std::copy(temp_indices, temp_indices + (nulls_end - nulls_begin), nulls_begin); @@ -828,27 +831,24 @@ class TableSorter { // // Merge rows with a non-null in the first sort key // - template - enable_if_t::value> MergeNonNulls(uint64_t* range_begin, - uint64_t* range_middle, - uint64_t* range_end, - uint64_t* temp_indices) { + template + enable_if_t::value> MergeNonNulls( + ResolvedChunkIndex* range_begin, ResolvedChunkIndex* range_middle, + ResolvedChunkIndex* range_end, ResolvedChunkIndex* temp_indices) { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; - ChunkLocation left_loc; - ChunkLocation right_loc; std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, - [&](uint64_t left, uint64_t right) { + [&](ResolvedChunkIndex left, ResolvedChunkIndex right) { // Both values are never null nor NaN. - left_loc = left_resolver_.ResolveWithHint(left, /*hint=*/left_loc); - right_loc = right_resolver_.ResolveWithHint(right, /*hint=*/right_loc); + const auto left_loc = ChunkLocation{left}; + const auto right_loc = ChunkLocation{right}; auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); DCHECK(!chunk_left.IsNull()); DCHECK(!chunk_right.IsNull()); - auto value_left = chunk_left.Value(); - auto value_right = chunk_right.Value(); + const auto value_left = chunk_left.Value(); + const auto value_right = chunk_right.Value(); if (value_left == value_right) { // If the left value equals to the right value, // we need to compare the second and following @@ -863,13 +863,16 @@ class TableSorter { } } }); + // Copy back temp area into main buffer std::copy(temp_indices, temp_indices + (range_end - range_begin), range_begin); } - template - enable_if_null MergeNonNulls(uint64_t* range_begin, uint64_t* range_middle, - uint64_t* range_end, uint64_t* temp_indices) { + template + enable_if_null MergeNonNulls(ResolvedChunkIndex* range_begin, + ResolvedChunkIndex* range_middle, + ResolvedChunkIndex* range_end, + ResolvedChunkIndex* temp_indices) { const int64_t null_count = range_end - range_begin; MergeNullsOnly(range_begin, range_middle, range_end, temp_indices, null_count); } @@ -880,7 +883,6 @@ class TableSorter { const RecordBatchVector batches_; const SortOptions& options_; const NullPlacement null_placement_; - const ::arrow::internal::ChunkResolver left_resolver_, right_resolver_; const std::vector sort_keys_; uint64_t* indices_begin_; uint64_t* indices_end_; diff --git a/cpp/src/arrow/compute/kernels/vector_sort_internal.h b/cpp/src/arrow/compute/kernels/vector_sort_internal.h index d9ff78f9f053c..3854b9c3a857f 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort_internal.h +++ b/cpp/src/arrow/compute/kernels/vector_sort_internal.h @@ -209,7 +209,7 @@ struct GenericNullPartitionResult { }; using NullPartitionResult = GenericNullPartitionResult; -using ChunkedNullPartitionResult = GenericNullPartitionResult; +using ChunkedNullPartitionResult = GenericNullPartitionResult; // Move nulls (not null-like values) to end of array. // @@ -289,7 +289,7 @@ NullPartitionResult PartitionNulls(uint64_t* indices_begin, uint64_t* indices_en // // Null partitioning on chunked arrays, in two flavors: // 1) with uint64_t indices and ChunkedArrayResolver -// 2) with CompressedChunkLocation and span of chunks +// 2) with ResolvedChunkIndex and span of chunks // template @@ -316,8 +316,8 @@ NullPartitionResult PartitionNullsOnly(uint64_t* indices_begin, uint64_t* indice } template -ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation* locations_begin, - CompressedChunkLocation* locations_end, +ChunkedNullPartitionResult PartitionNullsOnly(ResolvedChunkIndex* locations_begin, + ResolvedChunkIndex* locations_end, util::span chunks, int64_t null_count, NullPlacement null_placement) { @@ -328,7 +328,7 @@ ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation* locations Partitioner partitioner; if (null_placement == NullPlacement::AtStart) { auto nulls_end = - partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) { + partitioner(locations_begin, locations_end, [&](ResolvedChunkIndex loc) { return chunks[loc.chunk_index()]->IsNull( static_cast(loc.index_in_chunk())); }); @@ -336,7 +336,7 @@ ChunkedNullPartitionResult PartitionNullsOnly(CompressedChunkLocation* locations nulls_end); } else { auto nulls_begin = - partitioner(locations_begin, locations_end, [&](CompressedChunkLocation loc) { + partitioner(locations_begin, locations_end, [&](ResolvedChunkIndex loc) { return !chunks[loc.chunk_index()]->IsNull( static_cast(loc.index_in_chunk())); }); @@ -501,8 +501,7 @@ struct GenericMergeImpl { }; using MergeImpl = GenericMergeImpl; -using ChunkedMergeImpl = - GenericMergeImpl; +using ChunkedMergeImpl = GenericMergeImpl; // TODO make this usable if indices are non trivial on input // (see ConcreteRecordBatchColumnSorter)