Skip to content

Commit

Permalink
reserve estimate memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Oct 12, 2024
1 parent 55f103c commit d5835c3
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 15 deletions.
11 changes: 11 additions & 0 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ void PrefixSort::extractRowToPrefix(char* row, char* prefix) {
getAddressFromPrefix(prefix) = row;
}

uint32_t PrefixSort::maxRequiredBytes() {
const auto numRows = rowContainer_->numRows();
const auto numPages =
memory::AllocationTraits::numPages(numRows * sortLayout_.entrySize);
// Prefix data size + swap buffer size.
return memory::AllocationTraits::pageBytes(numPages) +
pool_->preferredSize(checkedPlus<size_t>(
sortLayout_.entrySize, AlignedBuffer::kPaddedSize)) +
2 * pool_->alignment();
}

void PrefixSort::sortInternal(
std::vector<char*, memory::StlAllocator<char*>>& rows) {
const auto numRows = rows.size();
Expand Down
25 changes: 25 additions & 0 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,32 @@ class PrefixSort {
prefixSort.sortInternal(rows);
}

/// The stdsort won't require bytes while prefixsort may require buffers
/// such as prefix data. The logic is similar to the above function
/// PrefixSort::sort but returns the maxmium buffer the sort may need.
static uint32_t maxRequiredBytes(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config) {
if (rowContainer->numRows() < config.threshold) {
return 0;
}
VELOX_DCHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeySize);
if (sortLayout.noNormalizedKeys) {
return 0;
}

PrefixSort prefixSort(pool, rowContainer, sortLayout);
return prefixSort.maxRequiredBytes();
}

private:
// The bytes to allocate in the prefix sort process such as prefix buffer and
// swap buffer.
uint32_t maxRequiredBytes();
void sortInternal(std::vector<char*, memory::StlAllocator<char*>>& rows);

int compareAllNormalizedKeys(char* left, char* right);
Expand Down
42 changes: 36 additions & 6 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,8 @@ void SortBuffer::noMoreInput() {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::noMoreInput", this);
VELOX_CHECK(!noMoreInput_);

// It may trigger spill, make sure it's triggered before noMoreInput_ is set.
if (spiller_ == nullptr) {
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
pool_->maybeReserve(1 * 1024 * 1024 * 1024L);
}
ensureSortFits();

noMoreInput_ = true;

Expand Down Expand Up @@ -279,6 +275,40 @@ void SortBuffer::ensureOutputFits() {
<< ", reservation: " << succinctBytes(pool_->reservedBytes());
}

void SortBuffer::ensureSortFits() {
// Check if spilling is enabled or not.
if (spillConfig_ == nullptr) {
return;
}

// Test-only spill path.
if (testingTriggerSpill(pool_->name())) {
spill();
return;
}

if (numInputRows_ == 0 || spiller_ == nullptr) {
return;
}

// The memory for std::vector sorted rows and prefix sort required buffer.
uint64_t sortBufferToReserve =
numInputRows_ * sizeof(char*) +
PrefixSort::maxRequiredBytes(
pool_, data_.get(), sortCompareFlags_, prefixSortConfig_);
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(sortBufferToReserve)) {
return;
}
}

LOG(WARNING) << "Failed to reserve " << succinctBytes(sortBufferToReserve)
<< " for memory pool " << pool_->name()
<< ", usage: " << succinctBytes(pool_->usedBytes())
<< ", reservation: " << succinctBytes(pool_->reservedBytes());
}

void SortBuffer::updateEstimatedOutputRowSize() {
const auto optionalRowSize = data_->estimateRowSize();
if (!optionalRowSize.has_value() || optionalRowSize.value() == 0) {
Expand Down Expand Up @@ -325,7 +355,7 @@ void SortBuffer::spillOutput() {
spillerStoreType_,
spillConfig_,
spillStats_);
auto spillRows = std::vector<char*, memory::StlAllocator<char*>>(
auto spillRows = Spiller::SpillRows(
sortedRows_.begin() + numOutputRows_,
sortedRows_.end(),
*memory::spillMemoryPool());
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class SortBuffer {
// Reserves memory for output processing. If reservation cannot be increased,
// spills enough to make output fit.
void ensureOutputFits();
// Reserves memory for sort. If reservation cannot be increased,
// spills enough to make output fit.
void ensureSortFits();
void updateEstimatedOutputRowSize();
// Invoked to initialize or reset the reusable output buffer to get output.
void prepareOutput(vector_size_t maxOutputRows);
Expand Down Expand Up @@ -110,7 +113,7 @@ class SortBuffer {
uint64_t numInputRows_ = 0;
// Used to store the input data in row format.
std::unique_ptr<RowContainer> data_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;

// The data type of the rows stored in 'data_' and spilled on disk. The
// sort key columns are stored first then the non-sorted data columns.
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*, memory::StlAllocator<char*>>& rows) {
void Spiller::spill(SpillRows& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
VELOX_CHECK(!rows.empty());
Expand Down Expand Up @@ -705,7 +705,7 @@ bool Spiller::fillSpillRuns(RowContainerIterator* iterator) {
return lastRun;
}

void Spiller::fillSpillRun(std::vector<char*, memory::StlAllocator<char*>>& rows) {
void Spiller::fillSpillRun(SpillRows& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeNs{0};
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Spiller {
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*, memory::StlAllocator<char*>>& rows);
void spill(SpillRows& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
Expand Down Expand Up @@ -297,7 +297,7 @@ class Spiller {

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*, memory::StlAllocator<char*>>& rows);
void fillSpillRun(SpillRows& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill(bool lastRun);
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/tests/SpillerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1548,11 +1548,11 @@ TEST_P(OrderByOutputOnly, basic) {
"Unexpected spiller type: ORDER_BY_OUTPUT");
}
{
std::vector<char*, memory::StlAllocator<char*>> emptyRows(*pool_);
Spiller::SpillRows emptyRows(*pool_);
VELOX_ASSERT_THROW(spiller_->spill(emptyRows), "");
}
auto spillRows = std::vector<char*, memory::StlAllocator<char*>>(
rows.begin(), rows.begin() + numListedRows, *pool_);
auto spillRows =
Spiller::SpillRows(rows.begin(), rows.begin() + numListedRows, *pool_);
spiller_->spill(spillRows);
ASSERT_EQ(rowContainer_->numRows(), numRows);
rowContainer_->clear();
Expand Down Expand Up @@ -1630,7 +1630,7 @@ TEST_P(MaxSpillRunTest, basic) {
testData.maxSpillRunRows);
if (type_ == Spiller::Type::kOrderByOutput) {
RowContainerIterator rowIter;
std::vector<char*, memory::StlAllocator<char*>> rows(numRows, *pool_);
Spiller::SpillRows rows(numRows, *pool_);
int numListedRows{0};
numListedRows = rowContainer_->listRows(&rowIter, numRows, rows.data());
ASSERT_EQ(numListedRows, numRows);
Expand Down

0 comments on commit d5835c3

Please sign in to comment.