Skip to content

Commit

Permalink
OrderBy threshold spill deprecate (facebookincubator#9059)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#9059

Reviewed By: xiaoxmeng

Differential Revision: D54839512

Pulled By: tanjialiang

fbshipit-source-id: 712797512c11e935c38e638b24eba51f0807d7f4
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Mar 13, 2024
1 parent 0193758 commit 58c89fc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 97 deletions.
3 changes: 1 addition & 2 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ OrderBy::OrderBy(
sortCompareFlags,
pool(),
&nonReclaimableSection_,
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr,
operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold());
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr);
}

void OrderBy::addInput(RowVectorPtr input) {
Expand Down
13 changes: 2 additions & 11 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ SortBuffer::SortBuffer(
const std::vector<CompareFlags>& sortCompareFlags,
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
const common::SpillConfig* spillConfig,
uint64_t spillMemoryThreshold)
const common::SpillConfig* spillConfig)
: input_(input),
sortCompareFlags_(sortCompareFlags),
pool_(pool),
nonReclaimableSection_(nonReclaimableSection),
spillConfig_(spillConfig),
spillMemoryThreshold_(spillMemoryThreshold) {
spillConfig_(spillConfig) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
Expand Down Expand Up @@ -199,14 +197,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) {
return;
}

// If current memory usage exceeds spilling threshold, trigger spilling.
const auto currentMemoryUsage = pool_->currentBytes();
if (spillMemoryThreshold_ != 0 &&
currentMemoryUsage > spillMemoryThreshold_) {
spill();
return;
}

const auto minReservationBytes =
currentMemoryUsage * spillConfig_->minSpillableReservationPct / 100;
const auto availableReservationBytes = pool_->availableReservation();
Expand Down
8 changes: 1 addition & 7 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class SortBuffer {
const std::vector<CompareFlags>& sortCompareFlags,
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
const common::SpillConfig* spillConfig = nullptr,
uint64_t spillMemoryThreshold = 0);
const common::SpillConfig* spillConfig = nullptr);

void addInput(const VectorPtr& input);

Expand Down Expand Up @@ -96,11 +95,6 @@ class SortBuffer {
// execution section or not.
tsan_atomic<bool>* const nonReclaimableSection_;
const common::SpillConfig* const spillConfig_;
// The maximum size that an SortBuffer can hold in memory before spilling.
// Zero indicates no limit.
//
// NOTE: 'spillMemoryThreshold_' only applies if disk spilling is enabled.
const uint64_t spillMemoryThreshold_;

// The column projection map between 'input_' and 'spillerStoreType_' as sort
// buffer stores the sort columns first in 'data_'.
Expand Down
67 changes: 2 additions & 65 deletions velox/exec/tests/OrderByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,11 @@ TEST_F(OrderByTest, spill) {
const auto expectedResult = AssertQueryBuilder(plan).copyResults(pool_.get());

auto spillDirectory = exec::test::TempDirectoryPath::create();
TestScopedSpillInjection scopedSpillInjection(100);
auto task = AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kOrderBySpillEnabled, true)
// Set a small capacity to trigger threshold based spilling
.config(QueryConfig::kOrderBySpillMemoryThreshold, 32 << 20)
.assertResults(expectedResult);
auto taskStats = exec::toPlanStats(task->taskStats());
auto& planStats = taskStats.at(orderNodeId);
Expand All @@ -524,67 +523,6 @@ TEST_F(OrderByTest, spill) {
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(OrderByTest, spillWithMemoryLimit) {
constexpr int32_t kNumRows = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
VectorFuzzer fuzzer({}, pool());
const int32_t numBatches = 5;
std::vector<RowVectorPtr> batches;
for (int32_t i = 0; i < numBatches; ++i) {
batches.push_back(fuzzer.fuzzRow(rowType));
}
struct {
uint64_t orderByMemLimit;
bool expectSpill;

std::string debugString() const {
return fmt::format(
"orderByMemLimit:{}, expectSpill:{}", orderByMemLimit, expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
auto tempDirectory = exec::test::TempDirectoryPath::create();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
queryCtx->testingOverrideMemoryPool(
memory::memoryManager()->addRootPool(queryCtx->queryId(), kMaxBytes));
auto results =
AssertQueryBuilder(
PlanBuilder()
.values(batches)
.orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false)
.planNode())
.queryCtx(queryCtx)
.copyResults(pool_.get());
auto task =
AssertQueryBuilder(
PlanBuilder()
.values(batches)
.orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false)
.planNode())
.queryCtx(queryCtx)
.spillDirectory(tempDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kOrderBySpillEnabled, true)
.config(
QueryConfig::kOrderBySpillMemoryThreshold,
testData.orderByMemLimit)
.assertResults(results);

auto stats = task->taskStats().pipelineStats;
ASSERT_EQ(
testData.expectSpill, stats[0].operatorStats[1].spilledInputBytes > 0);
ASSERT_EQ(testData.expectSpill, stats[0].operatorStats[1].spilledBytes > 0);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) {
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
Expand Down Expand Up @@ -1371,13 +1309,12 @@ TEST_F(OrderByTest, maxSpillBytes) {
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
try {
TestScopedSpillInjection scopedSpillInjection(100);
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.queryCtx(queryCtx)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kOrderBySpillEnabled, true)
// Set a small capacity to trigger threshold based spilling
.config(QueryConfig::kOrderBySpillMemoryThreshold, 5 << 20)
.config(QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes)
.copyResults(pool_.get());
ASSERT_FALSE(testData.expectedExceedLimit);
Expand Down
22 changes: 10 additions & 12 deletions velox/exec/tests/SortBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,7 @@ TEST_F(SortBufferTest, batchOutput) {
sortCompareFlags_,
pool_.get(),
&nonReclaimableSection_,
testData.triggerSpill ? &spillConfig : nullptr,
0);
testData.triggerSpill ? &spillConfig : nullptr);
ASSERT_EQ(sortBuffer->canSpill(), testData.triggerSpill);

const std::shared_ptr<memory::MemoryPool> fuzzerPool =
Expand Down Expand Up @@ -353,15 +352,15 @@ TEST_F(SortBufferTest, spill) {
struct {
bool spillEnabled;
bool memoryReservationFailure;
uint64_t spillMemoryThreshold;
bool triggerSpill;
bool spillTriggered;

std::string debugString() const {
return fmt::format(
"spillEnabled:{}, memoryReservationFailure:{}, spillMemoryThreshold:{}, spillTriggered:{}",
"spillEnabled:{}, memoryReservationFailure:{}, triggerSpill:{}, spillTriggered:{}",
spillEnabled,
memoryReservationFailure,
spillMemoryThreshold,
triggerSpill,
spillTriggered);
}
} testSettings[] = {
Expand All @@ -370,9 +369,8 @@ TEST_F(SortBufferTest, spill) {
true,
0,
false}, // memory reservation failure won't trigger spilling.
{true, false, 1000, true}, // threshold is small, spilling is triggered.
{true, false, 1000000, false} // threshold is too large, not triggered
};
{true, false, true, true},
{true, false, false, false}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
Expand Down Expand Up @@ -404,8 +402,7 @@ TEST_F(SortBufferTest, spill) {
sortCompareFlags_,
pool_.get(),
&nonReclaimableSection_,
testData.spillEnabled ? &spillConfig : nullptr,
testData.spillMemoryThreshold);
testData.spillEnabled ? &spillConfig : nullptr);

const std::shared_ptr<memory::MemoryPool> fuzzerPool =
memory::memoryManager()->addLeafPool("spillSource");
Expand All @@ -416,6 +413,8 @@ TEST_F(SortBufferTest, spill) {
const auto peakSpillMemoryUsage =
memory::spillMemoryPool()->stats().peakBytes;

TestScopedSpillInjection scopedSpillInjection(
testData.triggerSpill ? 100 : 0);
for (int i = 0; i < 3; ++i) {
sortBuffer->addInput(fuzzer.fuzzRow(inputType_));
totalNumInput += 1024;
Expand Down Expand Up @@ -463,8 +462,7 @@ TEST_F(SortBufferTest, emptySpill) {
sortCompareFlags_,
pool_.get(),
&nonReclaimableSection_,
&spillConfig,
0);
&spillConfig);

sortBuffer->spill();
if (hasPostSpillData) {
Expand Down

0 comments on commit 58c89fc

Please sign in to comment.