Skip to content

Commit

Permalink
Deprecate threshold based spill in HashAggregation (facebookincubator…
Browse files Browse the repository at this point in the history
…#9058)

Summary: Pull Request resolved: facebookincubator#9058

Reviewed By: xiaoxmeng

Differential Revision: D54839084

Pulled By: tanjialiang

fbshipit-source-id: a4dab80c5a8301f9e1b5e914097aec6b68d5e3fb
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Mar 13, 2024
1 parent 81c609a commit 0193758
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 125 deletions.
8 changes: 0 additions & 8 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ GroupingSet::GroupingSet(
aggregates_(std::move(aggregates)),
masks_(extractMaskChannels(aggregates_)),
ignoreNullKeys_(ignoreNullKeys),
spillMemoryThreshold_(operatorCtx->driverCtx()
->queryConfig()
.aggregationSpillMemoryThreshold()),
globalGroupingSets_(globalGroupingSets),
groupIdChannel_(groupIdChannel),
spillConfig_(spillConfig),
Expand Down Expand Up @@ -829,11 +826,6 @@ void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
}

const auto currentUsage = pool_.currentBytes();
if (spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) {
spill();
return;
}

const auto minReservationBytes =
currentUsage * spillConfig_->minSpillableReservationPct / 100;
const auto availableReservationBytes = pool_.availableReservation();
Expand Down
4 changes: 0 additions & 4 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,6 @@ class GroupingSet {

uint64_t numInputRows_ = 0;

// The maximum memory usage that a final aggregation can hold before spilling.
// If it is zero, then there is no such limit.
const uint64_t spillMemoryThreshold_;

// List of global grouping set numbers, if being used with a GROUPING SET.
const std::vector<vector_size_t> globalGroupingSets_;
// Column for groupId for a GROUPING SET.
Expand Down
116 changes: 3 additions & 113 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,58 +1085,6 @@ TEST_F(AggregationTest, partialAggregationMaybeReservationReleaseCheck) {
EXPECT_GT(kMaxPartialMemoryUsage, task->pool()->currentBytes());
}

TEST_F(AggregationTest, spillWithMemoryLimit) {
constexpr int32_t kNumDistinct = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
rng_.seed(1);
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
auto batches = makeVectors(rowType_, 100, 5);

core::PlanNodeId aggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.singleAggregation({"c0"}, {}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
const auto expectedResults =
AssertQueryBuilder(plan).copyResults(pool_.get());

struct {
uint64_t aggregationMemLimit;
bool expectSpill;

std::string debugString() const {
return fmt::format(
"aggregationMemLimit:{}, expectSpill:{}",
aggregationMemLimit,
expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(testData.aggregationMemLimit))
.assertResults(expectedResults);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(aggrNodeId);
checkSpillStats(stats, testData.expectSpill);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

TEST_F(AggregationTest, spillAll) {
auto inputs = makeVectors(rowType_, 100, 10);

Expand All @@ -1157,13 +1105,11 @@ TEST_F(AggregationTest, spillAll) {

auto tempDirectory = exec::test::TempDirectoryPath::create();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(plan)
.spillDirectory(tempDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
// Set one spill partition to avoid the test flakiness.
// Set the memory trigger limit to be a very small value.
.config(QueryConfig::kAggregationSpillMemoryThreshold, "1024")
.assertResults(results);

auto stats = task->taskStats().pipelineStats;
Expand Down Expand Up @@ -1608,13 +1554,12 @@ TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) {
createDuckDbTable(inputs);
auto tempDirectory = exec::test::TempDirectoryPath::create();
core::PlanNodeId aggrNodeId;
TestScopedSpillInjection scopedSpillInjection(100);
auto task =
AssertQueryBuilder(duckDbQueryRunner_)
.spillDirectory(tempDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
// Set the memory trigger limit to be a very small value.
.config(QueryConfig::kAggregationSpillMemoryThreshold, "1")
.config(
QueryConfig::kPreferredOutputBatchBytes,
std::to_string(testData.maxOutputBytes))
Expand Down Expand Up @@ -1903,60 +1848,6 @@ TEST_F(AggregationTest, spillingForAggrsWithSorting) {
plan, "SELECT c0 % 7, array_agg(c1 ORDER BY c1) FROM tmp GROUP BY 1");
}

TEST_F(AggregationTest, distinctSpillWithMemoryLimit) {
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
VectorFuzzer fuzzer({}, pool());
auto batches = makeVectors(rowType_, 100, 5);

core::PlanNodeId aggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.singleAggregation({"c0"}, {}, {})
.capturePlanNodeId(aggrNodeId)
.planNode();
const auto expectedResults =
AssertQueryBuilder(PlanBuilder()
.values(batches)
.singleAggregation({"c0"}, {}, {})
.planNode())
.copyResults(pool_.get());

struct {
uint64_t aggregationMemLimit;
bool expectSpill;

std::string debugString() const {
return fmt::format(
"aggregationMemLimit:{}, expectSpill:{}",
aggregationMemLimit,
expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, true)
.config(QueryConfig::kAggregationSpillEnabled, true)
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(testData.aggregationMemLimit))
.assertResults(expectedResults);

auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(aggrNodeId);
checkSpillStats(stats, testData.expectSpill);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

TEST_F(AggregationTest, preGroupedAggregationWithSpilling) {
std::vector<RowVectorPtr> vectors;
int64_t val = 0;
Expand Down Expand Up @@ -3276,13 +3167,12 @@ TEST_F(AggregationTest, maxSpillBytes) {
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
try {
TestScopedSpillInjection scopedSpillInjection(100);
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.queryCtx(queryCtx)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kAggregationSpillEnabled, true)
// Set a small capacity to trigger threshold based spilling
.config(QueryConfig::kAggregationSpillMemoryThreshold, 5 << 20)
.config(QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes)
.copyResults(pool_.get());
ASSERT_FALSE(testData.expectedExceedLimit);
Expand Down

0 comments on commit 0193758

Please sign in to comment.