From 0193758e85b375e46537bc747e64b3119acba427 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Wed, 13 Mar 2024 11:06:19 -0700 Subject: [PATCH] Deprecate threshold based spill in HashAggregation (#9058) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9058 Reviewed By: xiaoxmeng Differential Revision: D54839084 Pulled By: tanjialiang fbshipit-source-id: a4dab80c5a8301f9e1b5e914097aec6b68d5e3fb --- velox/exec/GroupingSet.cpp | 8 -- velox/exec/GroupingSet.h | 4 - velox/exec/tests/AggregationTest.cpp | 116 +-------------------------- 3 files changed, 3 insertions(+), 125 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 15efe8cbaf0e..fe160dbe9ee9 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -62,9 +62,6 @@ GroupingSet::GroupingSet( aggregates_(std::move(aggregates)), masks_(extractMaskChannels(aggregates_)), ignoreNullKeys_(ignoreNullKeys), - spillMemoryThreshold_(operatorCtx->driverCtx() - ->queryConfig() - .aggregationSpillMemoryThreshold()), globalGroupingSets_(globalGroupingSets), groupIdChannel_(groupIdChannel), spillConfig_(spillConfig), @@ -829,11 +826,6 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) { } const auto currentUsage = pool_.currentBytes(); - if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) { - spill(); - return; - } - const auto minReservationBytes = currentUsage * spillConfig_->minSpillableReservationPct / 100; const auto availableReservationBytes = pool_.availableReservation(); diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index db42f6f4190a..bf489c4621af 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -282,10 +282,6 @@ class GroupingSet { uint64_t numInputRows_ = 0; - // The maximum memory usage that a final aggregation can hold before spilling. - // If it is zero, then there is no such limit. - const uint64_t spillMemoryThreshold_; - // List of global grouping set numbers, if being used with a GROUPING SET. const std::vector globalGroupingSets_; // Column for groupId for a GROUPING SET. diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 9687f5011ab8..9d5688ffaf04 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1085,58 +1085,6 @@ TEST_F(AggregationTest, partialAggregationMaybeReservationReleaseCheck) { EXPECT_GT(kMaxPartialMemoryUsage, task->pool()->currentBytes()); } -TEST_F(AggregationTest, spillWithMemoryLimit) { - constexpr int32_t kNumDistinct = 2000; - constexpr int64_t kMaxBytes = 1LL << 30; // 1GB - rng_.seed(1); - rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); - auto batches = makeVectors(rowType_, 100, 5); - - core::PlanNodeId aggrNodeId; - const auto plan = PlanBuilder() - .values(batches) - .singleAggregation({"c0"}, {}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode(); - const auto expectedResults = - AssertQueryBuilder(plan).copyResults(pool_.get()); - - struct { - uint64_t aggregationMemLimit; - bool expectSpill; - - std::string debugString() const { - return fmt::format( - "aggregationMemLimit:{}, expectSpill:{}", - aggregationMemLimit, - expectSpill); - } - } testSettings[] = {// Memory limit is disabled so spilling is not triggered. - {0, false}, - // Memory limit is too small so always trigger spilling. - {1, true}, - // Memory limit is too large so spilling is not triggered. - {1'000'000'000, false}}; - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - - auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto task = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(QueryConfig::kSpillEnabled, true) - .config(QueryConfig::kAggregationSpillEnabled, true) - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(testData.aggregationMemLimit)) - .assertResults(expectedResults); - - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& stats = taskStats.at(aggrNodeId); - checkSpillStats(stats, testData.expectSpill); - OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); - } -} - TEST_F(AggregationTest, spillAll) { auto inputs = makeVectors(rowType_, 100, 10); @@ -1157,13 +1105,11 @@ TEST_F(AggregationTest, spillAll) { auto tempDirectory = exec::test::TempDirectoryPath::create(); auto queryCtx = std::make_shared(executor_.get()); + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan) .spillDirectory(tempDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - // Set one spill partition to avoid the test flakiness. - // Set the memory trigger limit to be a very small value. - .config(QueryConfig::kAggregationSpillMemoryThreshold, "1024") .assertResults(results); auto stats = task->taskStats().pipelineStats; @@ -1608,13 +1554,12 @@ TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) { createDuckDbTable(inputs); auto tempDirectory = exec::test::TempDirectoryPath::create(); core::PlanNodeId aggrNodeId; + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(duckDbQueryRunner_) .spillDirectory(tempDirectory->path) .config(QueryConfig::kSpillEnabled, true) .config(QueryConfig::kAggregationSpillEnabled, true) - // Set the memory trigger limit to be a very small value. - .config(QueryConfig::kAggregationSpillMemoryThreshold, "1") .config( QueryConfig::kPreferredOutputBatchBytes, std::to_string(testData.maxOutputBytes)) @@ -1903,60 +1848,6 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) { plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1"); } -TEST_F(AggregationTest, distinctSpillWithMemoryLimit) { - rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); - VectorFuzzer fuzzer({}, pool()); - auto batches = makeVectors(rowType_, 100, 5); - - core::PlanNodeId aggrNodeId; - const auto plan = PlanBuilder() - .values(batches) - .singleAggregation({"c0"}, {}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode(); - const auto expectedResults = - AssertQueryBuilder(PlanBuilder() - .values(batches) - .singleAggregation({"c0"}, {}, {}) - .planNode()) - .copyResults(pool_.get()); - - struct { - uint64_t aggregationMemLimit; - bool expectSpill; - - std::string debugString() const { - return fmt::format( - "aggregationMemLimit:{}, expectSpill:{}", - aggregationMemLimit, - expectSpill); - } - } testSettings[] = {// Memory limit is disabled so spilling is not triggered. - {0, false}, - // Memory limit is too small so always trigger spilling. - {1, true}, - // Memory limit is too large so spilling is not triggered. - {1'000'000'000, false}}; - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - - auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto task = AssertQueryBuilder(plan) - .spillDirectory(spillDirectory->path) - .config(QueryConfig::kSpillEnabled, true) - .config(QueryConfig::kAggregationSpillEnabled, true) - .config( - QueryConfig::kAggregationSpillMemoryThreshold, - std::to_string(testData.aggregationMemLimit)) - .assertResults(expectedResults); - - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& stats = taskStats.at(aggrNodeId); - checkSpillStats(stats, testData.expectSpill); - OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); - } -} - TEST_F(AggregationTest, preGroupedAggregationWithSpilling) { std::vector vectors; int64_t val = 0; @@ -3276,13 +3167,12 @@ TEST_F(AggregationTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kAggregationSpillEnabled, true) - // Set a small capacity to trigger threshold based spilling - .config(QueryConfig::kAggregationSpillMemoryThreshold, 5 << 20) .config(QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) .copyResults(pool_.get()); ASSERT_FALSE(testData.expectedExceedLimit);