Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free hash table after grouping set/row number spill to release memory plus a hash table fix #11180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ bool GroupingSet::getOutput(
: 0;
if (numGroups == 0) {
if (table_ != nullptr) {
table_->clear();
table_->clear(/*freeTable=*/true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also check if HashBuild needs this change (putting true to clear() method)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more intuitive to have HashTable::clear() take true as default instead of false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by partial aggregation. And hash build doesn't need it as we haven't build table until the final stage and probe side always clear the entire table.

}
return false;
}
Expand Down Expand Up @@ -789,9 +789,9 @@ void GroupingSet::extractGroups(
}
}

void GroupingSet::resetTable() {
void GroupingSet::resetTable(bool freeTable) {
if (table_ != nullptr) {
table_->clear();
table_->clear(freeTable);
}
}

Expand Down Expand Up @@ -1012,7 +1012,7 @@ void GroupingSet::spill() {
if (sortedAggregations_) {
sortedAggregations_->clear();
}
table_->clear();
table_->clear(/*freeTable=*/true);
}

void GroupingSet::spill(const RowContainerIterator& rowIterator) {
Expand All @@ -1038,7 +1038,7 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
// guarantee we don't accidentally enter an unsafe situation.
rows->stringAllocator().freezeAndExecute(
[&]() { spiller_->spill(rowIterator); });
table_->clear();
table_->clear(/*freeTable=*/true);
}

bool GroupingSet::getOutputWithSpill(
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ class GroupingSet {

/// Resets the hash table inside the grouping set when partial aggregation
/// is full or reclaims memory from distinct aggregation after it has received
/// all the inputs.
void resetTable();
/// all the inputs. If 'freeTable' is false, then hash table itself is not
/// freed but only table content.
void resetTable(bool freeTable = false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since there very few instances of its use, would it make sense to get rid of the default value?, so that the caller makes an explicit decision and future uses do not inadvertently skip freeing the table if required.


/// Returns true if 'this' should start producing partial
/// aggregation results. Checks the memory consumption against
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ void HashAggregation::reclaim(
}
if (isDistinct_) {
// Since we have seen all the input, we can safely reset the hash table.
groupingSet_->resetTable();
groupingSet_->resetTable(/*freeTable=*/true);
// Release the minimum reserved memory.
pool()->release();
return;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ void HashTable<ignoreNullKeys>::allocateTables(
memory::AllocationTraits::numPages(size * tableSlotSize());
rows_->pool()->allocateContiguous(numPages, tableAllocation_);
table_ = tableAllocation_.data<char*>();
memset(table_, 0, capacity_ * sizeof(char*));
::memset(table_, 0, capacity_ * sizeof(char*));
}

template <bool ignoreNullKeys>
Expand All @@ -743,6 +743,7 @@ void HashTable<ignoreNullKeys>::clear(bool freeTable) {
} else {
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
capacity_ = 0;
}
}
numDistinct_ = 0;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,10 @@ class HashTable : public BaseHashTable {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ RowVectorPtr RowNumber::getOutput() {
addInput(std::move(unspilledInput));
} else {
spillInputReader_ = nullptr;
table_->clear();
table_->clear(/*freeTable=*/true);
restoreNextSpillPartition();
}
}
Expand Down Expand Up @@ -412,7 +412,7 @@ SpillPartitionNumSet RowNumber::spillHashTable() {
hashTableSpiller->spill();
hashTableSpiller->finishSpill(spillHashTablePartitionSet_);

table_->clear();
table_->clear(/*freeTable=*/true);
pool()->release();
return hashTableSpiller->state().spilledPartitionSet();
}
Expand Down Expand Up @@ -455,6 +455,10 @@ void RowNumber::spill() {
spillInput(input_, memory::spillMemoryPool());
input_ = nullptr;
}
if (generateRowNumber_) {
results_.clear();
results_.resize(1);
}
}

void RowNumber::spillInput(
Expand Down
27 changes: 15 additions & 12 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}
} testSettings[] = {
{0, true, true}, {0, false, false}, {1, true, true}, {1, false, false}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

Expand All @@ -2045,12 +2046,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
.copyResults(pool_.get());

folly::EventCount driverWait;
auto driverWaitKey = driverWait.prepareWait();
std::atomic_bool driverWaitFlag{true};
folly::EventCount testWait;
auto testWaitKey = testWait.prepareWait();
std::atomic_bool testWaitFlag{true};

std::atomic_int numInputs{0};
Operator* op;
Operator* op{nullptr};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::addInput",
std::function<void(Operator*)>(([&](Operator* testOp) {
Expand Down Expand Up @@ -2079,8 +2080,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
} else {
ASSERT_EQ(reclaimableBytes, 0);
}
testWait.notify();
driverWait.wait(driverWaitKey);
testWaitFlag = false;
testWait.notifyAll();
driverWait.await([&] { return !driverWaitFlag.load(); });
})));

std::thread taskThread([&]() {
Expand Down Expand Up @@ -2108,11 +2110,13 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}
});

testWait.wait(testWaitKey);
testWait.await([&]() { return !testWaitFlag.load(); });
ASSERT_TRUE(op != nullptr);
auto task = op->testingOperatorCtx()->task();
auto taskPauseWait = task->requestPause();
driverWait.notify();

driverWaitFlag = false;
driverWait.notifyAll();
taskPauseWait.wait();

uint64_t reclaimableBytes{0};
Expand All @@ -2126,7 +2130,6 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}

if (testData.expectedReclaimable) {
const auto usedMemory = op->pool()->usedBytes();
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
Expand All @@ -2137,9 +2140,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// The hash table itself in the grouping set is not cleared so it still
// uses some memory.
ASSERT_LT(op->pool()->usedBytes(), usedMemory);
// We expect all the memory has been freed from the hash table.
ASSERT_EQ(op->pool()->usedBytes(), 0);
} else {
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
Expand Down Expand Up @@ -3140,7 +3142,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) {
task->pool()->reclaim(kMaxBytes, 0, stats);
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
ASSERT_GT(stats.reclaimExecTimeUs, 0);
ASSERT_EQ(stats.reclaimedBytes, 0);
// We expect to reclaim the memory from the hash table.
ASSERT_GT(stats.reclaimedBytes, 0);
ASSERT_GT(stats.reclaimWaitTimeUs, 0);
}
})));
Expand Down
41 changes: 39 additions & 2 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ TEST_P(HashTableTest, mixed6Sparse) {
}

// It should be safe to call clear() before we insert any data into HashTable
TEST_P(HashTableTest, clear) {
TEST_P(HashTableTest, clearBeforeInsert) {
std::vector<std::unique_ptr<VectorHasher>> keyHashers;
keyHashers.push_back(std::make_unique<VectorHasher>(BIGINT(), 0 /*channel*/));
core::QueryConfig config({});
Expand All @@ -644,9 +644,46 @@ TEST_P(HashTableTest, clear) {
config);

for (const bool clearTable : {false, true}) {
auto table = HashTable<true>::createForAggregation(
const auto table = HashTable<true>::createForAggregation(
std::move(keyHashers), {Accumulator{aggregate.get(), nullptr}}, pool());
ASSERT_NO_THROW(table->clear(clearTable));
if (clearTable) {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
} else {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
}
}
}

TEST_P(HashTableTest, clearAfterInsert) {
const auto rowType =
ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()});
const auto numKeys = 4;

const int numBatches = 5;
std::vector<RowVectorPtr> inputBatches;
for (int i = 0; i < numBatches; ++i) {
VectorFuzzer fuzzer({}, pool());
inputBatches.push_back(fuzzer.fuzzRow(rowType));
}
for (const bool clearTable : {false, true}) {
const auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
for (const auto& batch : inputBatches) {
lookup->reset(batch->size());
insertGroups(*batch, *lookup, *table);
}
const uint64_t capacityBeforeInsert = table->capacity();
ASSERT_NO_THROW(table->clear(clearTable));
if (clearTable) {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
} else {
ASSERT_NE(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), capacityBeforeInsert);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) {
}

testingRunArbitration(op->pool(), 0);
// We expect all the memory to be freed after the spill.
ASSERT_EQ(op->pool()->usedBytes(), 0);
})));

core::PlanNodeId rowNumberPlanNodeId;
Expand Down
Loading