diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index cd7a012b2050..5f2970213f68 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -742,7 +742,7 @@ bool GroupingSet::getOutput( : 0; if (numGroups == 0) { if (table_ != nullptr) { - table_->clear(); + table_->clear(/*freeTable=*/true); } return false; } @@ -789,9 +789,9 @@ void GroupingSet::extractGroups( } } -void GroupingSet::resetTable() { +void GroupingSet::resetTable(bool freeTable) { if (table_ != nullptr) { - table_->clear(); + table_->clear(freeTable); } } @@ -1012,7 +1012,7 @@ void GroupingSet::spill() { if (sortedAggregations_) { sortedAggregations_->clear(); } - table_->clear(); + table_->clear(/*freeTable=*/true); } void GroupingSet::spill(const RowContainerIterator& rowIterator) { @@ -1038,7 +1038,7 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) { // guarantee we don't accidentally enter an unsafe situation. rows->stringAllocator().freezeAndExecute( [&]() { spiller_->spill(rowIterator); }); - table_->clear(); + table_->clear(/*freeTable=*/true); } bool GroupingSet::getOutputWithSpill( diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index bd88b7d1077f..f9e1da45085b 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -78,8 +78,9 @@ class GroupingSet { /// Resets the hash table inside the grouping set when partial aggregation /// is full or reclaims memory from distinct aggregation after it has received - /// all the inputs. - void resetTable(); + /// all the inputs. If 'freeTable' is false, then hash table itself is not + /// freed but only table content. + void resetTable(bool freeTable = false); /// Returns true if 'this' should start producing partial /// aggregation results. Checks the memory consumption against diff --git a/velox/exec/HashAggregation.cpp b/velox/exec/HashAggregation.cpp index 05c2f73c5fd2..73a5b4863d75 100644 --- a/velox/exec/HashAggregation.cpp +++ b/velox/exec/HashAggregation.cpp @@ -412,7 +412,7 @@ void HashAggregation::reclaim( } if (isDistinct_) { // Since we have seen all the input, we can safely reset the hash table. - groupingSet_->resetTable(); + groupingSet_->resetTable(/*freeTable=*/true); // Release the minimum reserved memory. pool()->release(); return; diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 6abede7ccafc..dc349a445091 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -728,7 +728,7 @@ void HashTable::allocateTables( memory::AllocationTraits::numPages(size * tableSlotSize()); rows_->pool()->allocateContiguous(numPages, tableAllocation_); table_ = tableAllocation_.data(); - memset(table_, 0, capacity_ * sizeof(char*)); + ::memset(table_, 0, capacity_ * sizeof(char*)); } template @@ -743,6 +743,7 @@ void HashTable::clear(bool freeTable) { } else { rows_->pool()->freeContiguous(tableAllocation_); table_ = nullptr; + capacity_ = 0; } } numDistinct_ = 0; diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 69ffa567e779..d70fc1bf8722 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -643,6 +643,10 @@ class HashTable : public BaseHashTable { return rehashSize(); } + char** testingTable() const { + return table_; + } + void extractColumn( folly::Range rows, int32_t columnIndex, diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index 4836a1f8b836..3c4429d9156f 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -327,7 +327,7 @@ RowVectorPtr RowNumber::getOutput() { addInput(std::move(unspilledInput)); } else { spillInputReader_ = nullptr; - table_->clear(); + table_->clear(/*freeTable=*/true); restoreNextSpillPartition(); } } @@ -412,7 +412,7 @@ SpillPartitionNumSet RowNumber::spillHashTable() { hashTableSpiller->spill(); hashTableSpiller->finishSpill(spillHashTablePartitionSet_); - table_->clear(); + table_->clear(/*freeTable=*/true); pool()->release(); return hashTableSpiller->state().spilledPartitionSet(); } @@ -455,6 +455,10 @@ void RowNumber::spill() { spillInput(input_, memory::spillMemoryPool()); input_ = nullptr; } + if (generateRowNumber_) { + results_.clear(); + results_.resize(1); + } } void RowNumber::spillInput( diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 1d1ef1597763..cf15d69d06c1 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -2028,6 +2028,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } } testSettings[] = { {0, true, true}, {0, false, false}, {1, true, true}, {1, false, false}}; + for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); @@ -2045,12 +2046,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { .copyResults(pool_.get()); folly::EventCount driverWait; - auto driverWaitKey = driverWait.prepareWait(); + std::atomic_bool driverWaitFlag{true}; folly::EventCount testWait; - auto testWaitKey = testWait.prepareWait(); + std::atomic_bool testWaitFlag{true}; std::atomic_int numInputs{0}; - Operator* op; + Operator* op{nullptr}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", std::function(([&](Operator* testOp) { @@ -2079,8 +2080,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } else { ASSERT_EQ(reclaimableBytes, 0); } - testWait.notify(); - driverWait.wait(driverWaitKey); + testWaitFlag = false; + testWait.notifyAll(); + driverWait.await([&] { return !driverWaitFlag.load(); }); }))); std::thread taskThread([&]() { @@ -2108,11 +2110,13 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } }); - testWait.wait(testWaitKey); + testWait.await([&]() { return !testWaitFlag.load(); }); ASSERT_TRUE(op != nullptr); auto task = op->testingOperatorCtx()->task(); auto taskPauseWait = task->requestPause(); - driverWait.notify(); + + driverWaitFlag = false; + driverWait.notifyAll(); taskPauseWait.wait(); uint64_t reclaimableBytes{0}; @@ -2126,7 +2130,6 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { } if (testData.expectedReclaimable) { - const auto usedMemory = op->pool()->usedBytes(); { memory::ScopedMemoryArbitrationContext ctx(op->pool()); op->pool()->reclaim( @@ -2137,9 +2140,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) { ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); reclaimerStats_.reset(); - // The hash table itself in the grouping set is not cleared so it still - // uses some memory. - ASSERT_LT(op->pool()->usedBytes(), usedMemory); + // We expect all the memory has been freed from the hash table. + ASSERT_EQ(op->pool()->usedBytes(), 0); } else { { memory::ScopedMemoryArbitrationContext ctx(op->pool()); @@ -3140,7 +3142,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) { task->pool()->reclaim(kMaxBytes, 0, stats); ASSERT_EQ(stats.numNonReclaimableAttempts, 0); ASSERT_GT(stats.reclaimExecTimeUs, 0); - ASSERT_EQ(stats.reclaimedBytes, 0); + // We expect to reclaim the memory from the hash table. + ASSERT_GT(stats.reclaimedBytes, 0); ASSERT_GT(stats.reclaimWaitTimeUs, 0); } }))); diff --git a/velox/exec/tests/HashTableTest.cpp b/velox/exec/tests/HashTableTest.cpp index 5bde20a135a5..afd51b6d6e71 100644 --- a/velox/exec/tests/HashTableTest.cpp +++ b/velox/exec/tests/HashTableTest.cpp @@ -632,7 +632,7 @@ TEST_P(HashTableTest, mixed6Sparse) { } // It should be safe to call clear() before we insert any data into HashTable -TEST_P(HashTableTest, clear) { +TEST_P(HashTableTest, clearBeforeInsert) { std::vector> keyHashers; keyHashers.push_back(std::make_unique(BIGINT(), 0 /*channel*/)); core::QueryConfig config({}); @@ -644,9 +644,46 @@ TEST_P(HashTableTest, clear) { config); for (const bool clearTable : {false, true}) { - auto table = HashTable::createForAggregation( + const auto table = HashTable::createForAggregation( std::move(keyHashers), {Accumulator{aggregate.get(), nullptr}}, pool()); ASSERT_NO_THROW(table->clear(clearTable)); + if (clearTable) { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } else { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } + } +} + +TEST_P(HashTableTest, clearAfterInsert) { + const auto rowType = + ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()}); + const auto numKeys = 4; + + const int numBatches = 5; + std::vector inputBatches; + for (int i = 0; i < numBatches; ++i) { + VectorFuzzer fuzzer({}, pool()); + inputBatches.push_back(fuzzer.fuzzRow(rowType)); + } + for (const bool clearTable : {false, true}) { + const auto table = createHashTableForAggregation(rowType, numKeys); + auto lookup = std::make_unique(table->hashers()); + for (const auto& batch : inputBatches) { + lookup->reset(batch->size()); + insertGroups(*batch, *lookup, *table); + } + const uint64_t capacityBeforeInsert = table->capacity(); + ASSERT_NO_THROW(table->clear(clearTable)); + if (clearTable) { + ASSERT_EQ(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), 0); + } else { + ASSERT_NE(reinterpret_cast(table->testingTable()), 0); + ASSERT_EQ(table->capacity(), capacityBeforeInsert); + } } } diff --git a/velox/exec/tests/RowNumberTest.cpp b/velox/exec/tests/RowNumberTest.cpp index 635263d169cc..2b7b74c6b5ff 100644 --- a/velox/exec/tests/RowNumberTest.cpp +++ b/velox/exec/tests/RowNumberTest.cpp @@ -390,6 +390,8 @@ DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) { } testingRunArbitration(op->pool(), 0); + // We expect all the memory to be freed after the spill. + ASSERT_EQ(op->pool()->usedBytes(), 0); }))); core::PlanNodeId rowNumberPlanNodeId;