From 6d15c10371f80570305973649e623872357df5a8 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 7 Oct 2024 12:54:09 -0700 Subject: [PATCH] Free hash table after grouping set/row number spill to release memory plus a hash table fix (#11180) Summary: Found in shadow testing that hash aggregation can use non-trivial amount of memory like a couple hundred MB after reclaim because the hash table held by grouping set. Currently we only clear the hash table in grouping set but not free the table inside (only free groups). Similar for row number operator. This PR change includes (1) free table after spill for both row number and grouping set to make memory reclamation or arbitration efficient and see significant improvement in global arbitration shadow testing. (2) free row number result vector in row number spill to have more strict test check and we assume a single vector is small and just free 1MB per operator in real workload. (3) fix free table in hash table which doesn't reset capacity and add unit test to cover Reviewed By: oerling Differential Revision: D63964822 --- velox/exec/GroupingSet.cpp | 10 +++---- velox/exec/GroupingSet.h | 5 ++-- velox/exec/HashAggregation.cpp | 2 +- velox/exec/HashTable.cpp | 3 +- velox/exec/HashTable.h | 4 +++ velox/exec/RowNumber.cpp | 8 ++++-- velox/exec/tests/AggregationTest.cpp | 27 ++++++++++-------- velox/exec/tests/HashTableTest.cpp | 41 ++++++++++++++++++++++++++-- velox/exec/tests/RowNumberTest.cpp | 2 ++ 9 files changed, 77 insertions(+), 25 deletions(-) diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index cd7a012b2050..5f2970213f68 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -742,7 +742,7 @@ bool GroupingSet::getOutput( : 0; if (numGroups == 0) { if (table_ != nullptr) { - table_->clear(); + table_->clear(/*freeTable=*/true); } return false; } @@ -789,9 +789,9 @@ void GroupingSet::extractGroups( } } -void GroupingSet::resetTable() { +void GroupingSet::resetTable(bool freeTable) { if (table_ != nullptr) { - table_->clear(); + table_->clear(freeTable); } } @@ -1012,7 +1012,7 @@ void GroupingSet::spill() { if (sortedAggregations_) { sortedAggregations_->clear(); } - table_->clear(); + table_->clear(/*freeTable=*/true); } void GroupingSet::spill(const RowContainerIterator& rowIterator) { @@ -1038,7 +1038,7 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { // guarantee we don't accidentally enter an unsafe situation. rows->stringAllocator().freezeAndExecute( [&]() { spiller_->spill(rowIterator); }); - table_->clear(); + table_->clear(/*freeTable=*/true); } bool GroupingSet::getOutputWithSpill( diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index bd88b7d1077f..f9e1da45085b 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -78,8 +78,9 @@ class GroupingSet { /// Resets the hash table inside the grouping set when partial aggregation /// is full or reclaims memory from distinct aggregation after it has received - /// all the inputs. - void resetTable(); + /// all the inputs. If 'freeTable' is false, then hash table itself is not + /// freed but only table content. + void resetTable(bool freeTable = false); /// Returns true if 'this' should start producing partial /// aggregation results. Checks the memory consumption against diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 05c2f73c5fd2..73a5b4863d75 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -412,7 +412,7 @@ void HashAggregation::reclaim( } if (isDistinct_) { // Since we have seen all the input, we can safely reset the hash table. - groupingSet_->resetTable(); + groupingSet_->resetTable(/*freeTable=*/true); // Release the minimum reserved memory. pool()->release(); return; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 6abede7ccafc..dc349a445091 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -728,7 +728,7 @@ void HashTable::allocateTables( memory::AllocationTraits::numPages(size * tableSlotSize()); rows_->pool()->allocateContiguous(numPages, tableAllocation_); table_ = tableAllocation_.data(); - memset(table_, 0, capacity_ * sizeof(char*)); + ::memset(table_, 0, capacity_ * sizeof(char*)); } template @@ -743,6 +743,7 @@ void HashTable::clear(bool freeTable) { } else { rows_->pool()->freeContiguous(tableAllocation_); table_ = nullptr; + capacity_ = 0; } } numDistinct_ = 0; diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 69ffa567e779..d70fc1bf8722 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -643,6 +643,10 @@ class HashTable : public BaseHashTable { return rehashSize(); } + char** testingTable() const { + return table_; + } + void extractColumn( folly::Range rows, int32_t columnIndex, diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 4836a1f8b836..3c4429d9156f 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -327,7 +327,7 @@ RowVectorPtr RowNumber::getOutput() { addInput(std::move(unspilledInput)); } else { spillInputReader_ = nullptr; - table_->clear(); + table_->clear(/*freeTable=*/true); restoreNextSpillPartition(); } } @@ -412,7 +412,7 @@ SpillPartitionNumSet RowNumber::spillHashTable() { hashTableSpiller->spill(); hashTableSpiller->finishSpill(spillHashTablePartitionSet_); - table_->clear(); + table_->clear(/*freeTable=*/true); pool()->release(); return hashTableSpiller->state().spilledPartitionSet(); } @@ -455,6 +455,10 @@ void RowNumber::spill() { spillInput(input_, memory::spillMemoryPool()); input_ = nullptr; } + if (generateRowNumber_) { + results_.clear(); + results_.resize(1); + } } void RowNumber::spillInput( diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 1d1ef1597763..cf15d69d06c1 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -2028,6 +2028,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } } testSettings[] = { {0, true, true}, {0, false, false}, {1, true, true}, {1, false, false}}; + for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); @@ -2045,12 +2046,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { .copyResults(pool_.get()); folly::EventCount driverWait; - auto driverWaitKey = driverWait.prepareWait(); + std::atomic_bool driverWaitFlag{true}; folly::EventCount testWait; - auto testWaitKey = testWait.prepareWait(); + std::atomic_bool testWaitFlag{true}; std::atomic_int numInputs{0}; - Operator* op; + Operator* op{nullptr}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", std::function(([&](Operator* testOp) { @@ -2079,8 +2080,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } else { ASSERT_EQ(reclaimableBytes, 0); } - testWait.notify(); - driverWait.wait(driverWaitKey); + testWaitFlag = false; + testWait.notifyAll(); + driverWait.await([&] { return !driverWaitFlag.load(); }); }))); std::thread taskThread([&]() { @@ -2108,11 +2110,13 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } }); - testWait.wait(testWaitKey); + testWait.await([&]() { return !testWaitFlag.load(); }); ASSERT_TRUE(op != nullptr); auto task = op->testingOperatorCtx()->task(); auto taskPauseWait = task->requestPause(); - driverWait.notify(); + + driverWaitFlag = false; + driverWait.notifyAll(); taskPauseWait.wait(); uint64_t reclaimableBytes{0}; @@ -2126,7 +2130,6 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - const auto usedMemory = op->pool()->usedBytes(); { memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( @@ -2137,9 +2140,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); reclaimerStats_.reset(); - // The hash table itself in the grouping set is not cleared so it still - // uses some memory. - ASSERT_LT(op->pool()->usedBytes(), usedMemory); + // We expect all the memory has been freed from the hash table. + ASSERT_EQ(op->pool()->usedBytes(), 0); } else { { memory::ScopedMemoryArbitrationContext ctx(op->pool()); @@ -3140,7 +3142,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_EQ(stats.reclaimedBytes, 0); + // We expect to reclaim the memory from the hash table. + ASSERT_GT(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } }))); diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index 5bde20a135a5..afd51b6d6e71 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -632,7 +632,7 @@ TEST_P(HashTableTest, mixed6Sparse) { } // It should be safe to call clear() before we insert any data into HashTable -TEST_P(HashTableTest, clear) { +TEST_P(HashTableTest, clearBeforeInsert) { std::vector> keyHashers; keyHashers.push_back(std::make_unique(BIGINT(), 0 /*channel*/)); core::QueryConfig config({}); @@ -644,9 +644,46 @@ TEST_P(HashTableTest, clear) { config); for (const bool clearTable : {false, true}) { - auto table = HashTable::createForAggregation( + const auto table = HashTable::createForAggregation( std::move(keyHashers), {Accumulator{aggregate.get(), nullptr}}, pool()); ASSERT_NO_THROW(table->clear(clearTable)); + if (clearTable) { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } else { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } + } +} + +TEST_P(HashTableTest, clearAfterInsert) { + const auto rowType = + ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()}); + const auto numKeys = 4; + + const int numBatches = 5; + std::vector inputBatches; + for (int i = 0; i < numBatches; ++i) { + VectorFuzzer fuzzer({}, pool()); + inputBatches.push_back(fuzzer.fuzzRow(rowType)); + } + for (const bool clearTable : {false, true}) { + const auto table = createHashTableForAggregation(rowType, numKeys); + auto lookup = std::make_unique(table->hashers()); + for (const auto& batch : inputBatches) { + lookup->reset(batch->size()); + insertGroups(*batch, *lookup, *table); + } + const uint64_t capacityBeforeInsert = table->capacity(); + ASSERT_NO_THROW(table->clear(clearTable)); + if (clearTable) { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } else { + ASSERT_NE(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), capacityBeforeInsert); + } } } diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 635263d169cc..2b7b74c6b5ff 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -390,6 +390,8 @@ DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) { } testingRunArbitration(op->pool(), 0); + // We expect all the memory to be freed after the spill. + ASSERT_EQ(op->pool()->usedBytes(), 0); }))); core::PlanNodeId rowNumberPlanNodeId;