diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index cdf96545b757..4d9a96955359 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -65,8 +65,7 @@ OrderBy::OrderBy( sortCompareFlags, pool(), &nonReclaimableSection_, - spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr, - operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold()); + spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr); } void OrderBy::addInput(RowVectorPtr input) { diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index 801aef97cc31..f9358f99f3b7 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -25,14 +25,12 @@ SortBuffer::SortBuffer( const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig, - uint64_t spillMemoryThreshold) + const common::SpillConfig* spillConfig) : input_(input), sortCompareFlags_(sortCompareFlags), pool_(pool), nonReclaimableSection_(nonReclaimableSection), - spillConfig_(spillConfig), - spillMemoryThreshold_(spillMemoryThreshold) { + spillConfig_(spillConfig) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); VELOX_CHECK_GT(sortCompareFlags_.size(), 0); VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); @@ -199,14 +197,7 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) { return; } - // If current memory usage exceeds spilling threshold, trigger spilling. const auto currentMemoryUsage = pool_->currentBytes(); - if (spillMemoryThreshold_ != 0 && - currentMemoryUsage > spillMemoryThreshold_) { - spill(); - return; - } - const auto minReservationBytes = currentMemoryUsage * spillConfig_->minSpillableReservationPct / 100; const auto availableReservationBytes = pool_->availableReservation(); diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 791cc8b03636..ca9930e3e95e 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -36,8 +36,7 @@ class SortBuffer { const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, - const common::SpillConfig* spillConfig = nullptr, - uint64_t spillMemoryThreshold = 0); + const common::SpillConfig* spillConfig = nullptr); void addInput(const VectorPtr& input); @@ -96,11 +95,6 @@ class SortBuffer { // execution section or not. tsan_atomic* const nonReclaimableSection_; const common::SpillConfig* const spillConfig_; - // The maximum size that an SortBuffer can hold in memory before spilling. - // Zero indicates no limit. - // - // NOTE: 'spillMemoryThreshold_' only applies if disk spilling is enabled. - const uint64_t spillMemoryThreshold_; // The column projection map between 'input_' and 'spillerStoreType_' as sort // buffer stores the sort columns first in 'data_'. diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index beacf8cb4811..d7e419f50bbf 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -493,12 +493,11 @@ TEST_F(OrderByTest, spill) { const auto expectedResult = AssertQueryBuilder(plan).copyResults(pool_.get()); auto spillDirectory = exec::test::TempDirectoryPath::create(); + TestScopedSpillInjection scopedSpillInjection(100); auto task = AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) - // Set a small capacity to trigger threshold based spilling - .config(QueryConfig::kOrderBySpillMemoryThreshold, 32 << 20) .assertResults(expectedResult); auto taskStats = exec::toPlanStats(task->taskStats()); auto& planStats = taskStats.at(orderNodeId); @@ -524,67 +523,6 @@ TEST_F(OrderByTest, spill) { OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } -TEST_F(OrderByTest, spillWithMemoryLimit) { - constexpr int32_t kNumRows = 2000; - constexpr int64_t kMaxBytes = 1LL << 30; // 1GB - auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); - VectorFuzzer fuzzer({}, pool()); - const int32_t numBatches = 5; - std::vector batches; - for (int32_t i = 0; i < numBatches; ++i) { - batches.push_back(fuzzer.fuzzRow(rowType)); - } - struct { - uint64_t orderByMemLimit; - bool expectSpill; - - std::string debugString() const { - return fmt::format( - "orderByMemLimit:{}, expectSpill:{}", orderByMemLimit, expectSpill); - } - } testSettings[] = {// Memory limit is disabled so spilling is not triggered. - {0, false}, - // Memory limit is too small so always trigger spilling. - {1, true}, - // Memory limit is too large so spilling is not triggered. - {1'000'000'000, false}}; - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - auto tempDirectory = exec::test::TempDirectoryPath::create(); - auto queryCtx = std::make_shared(executor_.get()); - queryCtx->testingOverrideMemoryPool( - memory::memoryManager()->addRootPool(queryCtx->queryId(), kMaxBytes)); - auto results = - AssertQueryBuilder( - PlanBuilder() - .values(batches) - .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) - .planNode()) - .queryCtx(queryCtx) - .copyResults(pool_.get()); - auto task = - AssertQueryBuilder( - PlanBuilder() - .values(batches) - .orderBy({fmt::format("{} ASC NULLS LAST", "c0")}, false) - .planNode()) - .queryCtx(queryCtx) - .spillDirectory(tempDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kOrderBySpillEnabled, true) - .config( - QueryConfig::kOrderBySpillMemoryThreshold, - testData.orderByMemLimit) - .assertResults(results); - - auto stats = task->taskStats().pipelineStats; - ASSERT_EQ( - testData.expectSpill, stats[0].operatorStats[1].spilledInputBytes > 0); - ASSERT_EQ(testData.expectSpill, stats[0].operatorStats[1].spilledBytes > 0); - OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); - } -} - DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringInputProcessing) { constexpr int64_t kMaxBytes = 1LL << 30; // 1GB auto rowType = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()}); @@ -1371,13 +1309,12 @@ TEST_F(OrderByTest, maxSpillBytes) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); try { + TestScopedSpillInjection scopedSpillInjection(100); AssertQueryBuilder(plan) .spillDirectory(spillDirectory->path) .queryCtx(queryCtx) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kOrderBySpillEnabled, true) - // Set a small capacity to trigger threshold based spilling - .config(QueryConfig::kOrderBySpillMemoryThreshold, 5 << 20) .config(QueryConfig::kMaxSpillBytes, testData.maxSpilledBytes) .copyResults(pool_.get()); ASSERT_FALSE(testData.expectedExceedLimit); diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index df4209bafa74..f965652651ac 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -307,8 +307,7 @@ TEST_F(SortBufferTest, batchOutput) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.triggerSpill ? &spillConfig : nullptr, - 0); + testData.triggerSpill ? &spillConfig : nullptr); ASSERT_EQ(sortBuffer->canSpill(), testData.triggerSpill); const std::shared_ptr fuzzerPool = @@ -353,15 +352,15 @@ TEST_F(SortBufferTest, spill) { struct { bool spillEnabled; bool memoryReservationFailure; - uint64_t spillMemoryThreshold; + bool triggerSpill; bool spillTriggered; std::string debugString() const { return fmt::format( - "spillEnabled:{}, memoryReservationFailure:{}, spillMemoryThreshold:{}, spillTriggered:{}", + "spillEnabled:{}, memoryReservationFailure:{}, triggerSpill:{}, spillTriggered:{}", spillEnabled, memoryReservationFailure, - spillMemoryThreshold, + triggerSpill, spillTriggered); } } testSettings[] = { @@ -370,9 +369,8 @@ TEST_F(SortBufferTest, spill) { true, 0, false}, // memory reservation failure won't trigger spilling. - {true, false, 1000, true}, // threshold is small, spilling is triggered. - {true, false, 1000000, false} // threshold is too large, not triggered - }; + {true, false, true, true}, + {true, false, false, false}}; for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); @@ -404,8 +402,7 @@ TEST_F(SortBufferTest, spill) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - testData.spillEnabled ? &spillConfig : nullptr, - testData.spillMemoryThreshold); + testData.spillEnabled ? &spillConfig : nullptr); const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("spillSource"); @@ -416,6 +413,8 @@ TEST_F(SortBufferTest, spill) { const auto peakSpillMemoryUsage = memory::spillMemoryPool()->stats().peakBytes; + TestScopedSpillInjection scopedSpillInjection( + testData.triggerSpill ? 100 : 0); for (int i = 0; i < 3; ++i) { sortBuffer->addInput(fuzzer.fuzzRow(inputType_)); totalNumInput += 1024; @@ -463,8 +462,7 @@ TEST_F(SortBufferTest, emptySpill) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, - &spillConfig, - 0); + &spillConfig); sortBuffer->spill(); if (hasPostSpillData) {