Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve row segmenter #10

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 75 additions & 9 deletions cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
Expand All @@ -34,6 +36,9 @@

namespace arrow {

using arrow::Concatenate;
using arrow::ConstantArrayGenerator;
using arrow::gen::Constant;
using compute::Count;
using compute::MinMax;
using compute::Mode;
Expand Down Expand Up @@ -325,7 +330,8 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll<int64_t>)

std::shared_ptr<RecordBatch> RecordBatchFromArrays(
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys) {
std::vector<std::shared_ptr<Field>> fields;
std::vector<std::shared_ptr<Array>> all_arrays;
int64_t length = -1;
Expand All @@ -347,35 +353,53 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
fields.push_back(field("key" + ToChars(key_idx), key->type()));
all_arrays.push_back(key);
}
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
const auto& segment_key = segment_keys[segment_key_idx];
DCHECK_EQ(segment_key->length(), length);
fields.push_back(
field("segment_key" + ToChars(segment_key_idx), segment_key->type()));
all_arrays.push_back(segment_key);
}
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
std::vector<FieldRef> keys, std::vector<FieldRef> segment_keys,
bool use_threads = false, MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys),
std::move(segment_keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
std::shared_ptr<RecordBatch> batch = RecordBatchFromArrays(arguments, keys);
static void BenchmarkGroupBy(
benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<std::shared_ptr<Array>>& segment_keys = {}) {
std::shared_ptr<RecordBatch> batch =
RecordBatchFromArrays(arguments, keys, segment_keys);
std::vector<FieldRef> key_refs;
for (std::size_t key_idx = 0; key_idx < keys.size(); key_idx++) {
key_refs.emplace_back(static_cast<int>(key_idx + arguments.size()));
}
std::vector<FieldRef> segment_key_refs;
for (std::size_t segment_key_idx = 0; segment_key_idx < segment_keys.size();
segment_key_idx++) {
segment_key_refs.emplace_back(
static_cast<int>(segment_key_idx + arguments.size() + keys.size()));
}
for (std::size_t arg_idx = 0; arg_idx < arguments.size(); arg_idx++) {
aggregates[arg_idx].target = {FieldRef(static_cast<int>(arg_idx))};
}
int64_t total_bytes = TotalBufferSize(*batch);
for (auto _ : state) {
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs));
ABORT_NOT_OK(BatchGroupBy(batch, aggregates, key_refs, segment_key_refs));
}
state.SetBytesProcessed(total_bytes * state.iterations());
}
Expand Down Expand Up @@ -866,5 +890,47 @@ BENCHMARK(TDigestKernelDoubleMedian)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleDeciles)->Apply(QuantileKernelArgs);
BENCHMARK(TDigestKernelDoubleCentiles)->Apply(QuantileKernelArgs);

//
// RowSegmenter
//

template <typename... Args>
static void BenchmarkRowSegmenter(benchmark::State& state, Args&&...) {
int64_t num_rows = state.range(0);
int64_t num_segments = state.range(1);
ASSERT_NE(num_segments, 0);
ASSERT_GE(num_rows, num_segments);
int64_t num_segment_keys = state.range(2);
// Adjust num_rows to be a multiple of num_segments.
num_rows = num_rows / num_segments * num_segments;

// A trivial column to count from.
auto arg = ConstantArrayGenerator::Zeroes(num_rows, int64());
// num_segments segments, each having identical num_rows / num_segments rows of the
// associated segment id.
ArrayVector segments(num_segments);
for (int i = 0; i < num_segments; ++i) {
ASSERT_OK_AND_ASSIGN(
segments[i],
Constant(std::make_shared<Int64Scalar>(i))->Generate(num_rows / num_segments));
}
// Concat all segments to form the segment key.
ASSERT_OK_AND_ASSIGN(auto segment_key, Concatenate(segments));
// num_segment_keys copies of the segment key.
ArrayVector segment_keys(num_segment_keys, segment_key);

BenchmarkGroupBy(state, {{"count", ""}}, {arg}, /*keys=*/{}, segment_keys);

state.SetItemsProcessed(num_rows * state.iterations());
}

std::vector<std::string> row_segmenter_argnames = {"Rows", "Segments", "SegmentKeys"};
std::vector<std::vector<int64_t>> row_segmenter_args = {
{32 * 1024}, benchmark::CreateRange(1, 256, 4), benchmark::CreateDenseRange(0, 3, 1)};

BENCHMARK(BenchmarkRowSegmenter)
->ArgNames(row_segmenter_argnames)
->ArgsProduct(row_segmenter_args);

} // namespace acero
} // namespace arrow
9 changes: 3 additions & 6 deletions cpp/src/arrow/acero/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,14 @@ void AggregatesToString(std::stringstream* ss, const Schema& input_schema,
template <typename BatchHandler>
Status HandleSegments(RowSegmenter* segmenter, const ExecBatch& batch,
const std::vector<int>& ids, const BatchHandler& handle_batch) {
int64_t offset = 0;
ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
ExecSpan segment_batch(segment_exec_batch);

while (true) {
ARROW_ASSIGN_OR_RAISE(compute::Segment segment,
segmenter->GetNextSegment(segment_batch, offset));
if (segment.offset >= segment_batch.length) break; // condition of no-next-segment
ARROW_ASSIGN_OR_RAISE(auto segments, segmenter->GetSegments(segment_batch));
for (const auto& segment : segments) {
ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
offset = segment.offset + segment.length;
}

return Status::OK();
}

Expand Down
66 changes: 20 additions & 46 deletions cpp/src/arrow/acero/hash_aggregate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,12 @@ void TestGroupClassSupportedKeys(

void TestSegments(std::unique_ptr<RowSegmenter>& segmenter, const ExecSpan& batch,
std::vector<Segment> expected_segments) {
int64_t offset = 0, segment_num = 0;
for (auto expected_segment : expected_segments) {
SCOPED_TRACE("segment #" + ToChars(segment_num++));
ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_EQ(expected_segment, segment);
offset = segment.offset + segment.length;
ASSERT_OK_AND_ASSIGN(auto actual_segments, segmenter->GetSegments(batch));
ASSERT_EQ(actual_segments.size(), expected_segments.size());
for (size_t i = 0; i < actual_segments.size(); ++i) {
SCOPED_TRACE("segment #" + ToChars(i));
ASSERT_EQ(actual_segments[i], expected_segments[i]);
}
// Assert next is the last (empty) segment.
ASSERT_OK_AND_ASSIGN(auto segment, segmenter->GetNextSegment(batch, offset));
ASSERT_GE(segment.offset, batch.length);
ASSERT_EQ(segment.length, 0);
ASSERT_TRUE(segment.is_open);
ASSERT_TRUE(segment.extends);
}

Result<std::unique_ptr<Grouper>> MakeGrouper(const std::vector<TypeHolder>& key_types) {
Expand Down Expand Up @@ -629,61 +622,47 @@ TEST(RowSegmenter, Basics) {
auto batch2 = ExecBatchFromJSON(types2, "[[1, 1], [1, 2], [2, 2]]");
auto batch1 = ExecBatchFromJSON(types1, "[[1], [1], [2]]");
ExecBatch batch0({}, 3);
{
SCOPED_TRACE("offset");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types0));
ExecSpan span0(batch0);
for (int64_t offset : {-1, 4}) {
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
HasSubstr("invalid grouping segmenter offset"),
segmenter->GetNextSegment(span0, offset));
}
}
{
SCOPED_TRACE("types0 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types0));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 0 "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
ExecSpan span0(batch0);
TestSegments(segmenter, span0, {{0, 3, true, true}, {3, 0, true, true}});
TestSegments(segmenter, span0, {{0, 3, true, true}});
}
{
SCOPED_TRACE("bad_types1 segmenting of batch1");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types1));
ExecSpan span1(batch1);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 0 of type "),
segmenter->GetNextSegment(span1, 0));
segmenter->GetSegments(span1));
}
{
SCOPED_TRACE("types1 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types1));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 1 "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
ExecSpan span1(batch1);
TestSegments(segmenter, span1,
{{0, 2, false, true}, {2, 1, true, false}, {3, 0, true, true}});
TestSegments(segmenter, span1, {{0, 2, false, true}, {2, 1, true, false}});
}
{
SCOPED_TRACE("bad_types2 segmenting of batch2");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types2));
ExecSpan span2(batch2);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 1 of type "),
segmenter->GetNextSegment(span2, 0));
segmenter->GetSegments(span2));
}
{
SCOPED_TRACE("types2 segmenting of batch1");
ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types2));
ExecSpan span1(batch1);
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 2 "),
segmenter->GetNextSegment(span1, 0));
segmenter->GetSegments(span1));
ExecSpan span2(batch2);
TestSegments(segmenter, span2,
{{0, 1, false, true},
{1, 1, false, false},
{2, 1, true, false},
{3, 0, true, true}});
{{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}});
}
}

Expand All @@ -696,8 +675,7 @@ TEST(RowSegmenter, NonOrdered) {
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{4, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
Expand All @@ -707,8 +685,7 @@ TEST(RowSegmenter, NonOrdered) {
{{0, 2, false, true},
{2, 1, false, false},
{3, 1, false, false},
{4, 1, true, false},
{5, 0, true, true}});
{4, 1, true, false}});
}
}

Expand Down Expand Up @@ -767,8 +744,7 @@ TEST(RowSegmenter, MultipleSegments) {
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{8, 1, true, false}});
}
{
std::vector<TypeHolder> types = {int32(), int32()};
Expand All @@ -782,8 +758,7 @@ TEST(RowSegmenter, MultipleSegments) {
{3, 1, false, false},
{4, 2, false, false},
{6, 2, false, false},
{8, 1, true, false},
{9, 0, true, true}});
{8, 1, true, false}});
}
}

Expand Down Expand Up @@ -845,7 +820,7 @@ void TestRowSegmenterConstantBatch(
std::vector<TypeHolder> key_types(types.begin(), types.begin() + size);
ARROW_ASSIGN_OR_RAISE(auto segmenter, make_segmenter(key_types));
for (size_t i = 0; i < repetitions; i++) {
TestSegments(segmenter, ExecSpan(batch), {{0, 3, true, true}, {3, 0, true, true}});
TestSegments(segmenter, ExecSpan(batch), {{0, 3, true, true}});
ARROW_RETURN_NOT_OK(segmenter->Reset());
}
return Status::OK();
Expand Down Expand Up @@ -893,10 +868,9 @@ TEST(RowSegmenter, RowConstantBatch) {
constexpr size_t n = 3;
std::vector<TypeHolder> types = {int32(), int32(), int32()};
auto full_batch = ExecBatchFromJSON(types, "[[1, 1, 1], [2, 2, 2], [3, 3, 3]]");
std::vector<Segment> expected_segments_for_size_0 = {{0, 3, true, true},
{3, 0, true, true}};
std::vector<Segment> expected_segments_for_size_0 = {{0, 3, true, true}};
std::vector<Segment> expected_segments = {
{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}, {3, 0, true, true}};
{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}};
auto test_by_size = [&](size_t size) -> Status {
SCOPED_TRACE("constant-batch with " + ToChars(size) + " key(s)");
std::vector<Datum> values(full_batch.values.begin(),
Expand Down
Loading
Loading