Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
binwei committed Sep 29, 2024
1 parent 589a891 commit 3830f04
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
6 changes: 5 additions & 1 deletion velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ void PrefixSort::extractRowToPrefix(char* row, char* prefix) {
getAddressFromPrefix(prefix) = row;
}

void PrefixSort::sortInternal(std::vector<char*>& rows) {
template <typename VectorType>
void PrefixSort::sortInternal(VectorType& rows) {
const auto numRows = rows.size();
const auto entrySize = sortLayout_.entrySize;
memory::ContiguousAllocation prefixAllocation;
Expand Down Expand Up @@ -254,4 +255,7 @@ void PrefixSort::sortInternal(std::vector<char*>& rows) {
}
}

template void PrefixSort::sortInternal(std::vector<char*>& rows);
template void PrefixSort::sortInternal(std::vector<char*, memory::StlAllocator<char*>>& rows);

} // namespace facebook::velox::exec
9 changes: 6 additions & 3 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ namespace facebook::velox::exec {

namespace detail {

template <typename VectorType>
FOLLY_ALWAYS_INLINE void stdSort(
std::vector<char*>& rows,
VectorType& rows,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags) {
std::sort(
Expand Down Expand Up @@ -119,8 +120,9 @@ class PrefixSort {
///
/// @param rows The result of RowContainer::listRows(), assuming that the
/// caller (SortBuffer etc.) has already got the result.
template <typename VectorType>
FOLLY_ALWAYS_INLINE static void sort(
std::vector<char*>& rows,
VectorType& rows,
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
Expand All @@ -144,7 +146,8 @@ class PrefixSort {
}

private:
void sortInternal(std::vector<char*>& rows);
template <typename VectorType>
void sortInternal(VectorType& rows);

int compareAllNormalizedKeys(char* left, char* right);

Expand Down
16 changes: 13 additions & 3 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ SortBuffer::SortBuffer(
nonReclaimableSection_(nonReclaimableSection),
prefixSortConfig_(prefixSortConfig),
spillConfig_(spillConfig),
spillStats_(spillStats) {
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
Expand Down Expand Up @@ -106,6 +107,14 @@ void SortBuffer::noMoreInput() {
velox::common::testutil::TestValue::adjust(
"facebook::velox::exec::SortBuffer::noMoreInput", this);
VELOX_CHECK(!noMoreInput_);

//It may trigger spill, make sure it's triggered before noMoreInput_ is set
if (spiller_ == nullptr) {
//force to trigger spill
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
pool_->maybeReserve(1*1024*1024*1024L);
}

noMoreInput_ = true;

// No data.
Expand Down Expand Up @@ -317,11 +326,12 @@ void SortBuffer::spillOutput() {
spillerStoreType_,
spillConfig_,
spillStats_);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
auto spillRows = std::vector<char*, memory::StlAllocator<char*>>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end(), memory::StlAllocator<char*>(*memory::spillMemoryPool()));
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
sortedRows_.shrink_to_fit();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class SortBuffer {
uint64_t numInputRows_ = 0;
// Used to store the input data in row format.
std::unique_ptr<RowContainer> data_;
std::vector<char*> sortedRows_;
std::vector<char*, memory::StlAllocator<char*>> sortedRows_;

// The data type of the rows stored in 'data_' and spilled on disk. The
// sort key columns are stored first then the non-sorted data columns.
Expand Down
13 changes: 11 additions & 2 deletions velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {
checkEmptySpillRuns();
}

void Spiller::spill(std::vector<char*>& rows) {
template <typename VectorType>
void Spiller::spill(VectorType& rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
VELOX_CHECK(!rows.empty());
Expand Down Expand Up @@ -705,7 +706,8 @@ bool Spiller::fillSpillRuns(RowContainerIterator* iterator) {
return lastRun;
}

void Spiller::fillSpillRun(std::vector<char*>& rows) {
template <typename VectorType>
void Spiller::fillSpillRun(VectorType& rows) {
VELOX_CHECK_EQ(bits_.numPartitions(), 1);
checkEmptySpillRuns();
uint64_t execTimeNs{0};
Expand Down Expand Up @@ -754,4 +756,11 @@ std::string Spiller::typeName(Type type) {
common::SpillStats Spiller::stats() const {
return spillStats_->copy();
}

template void Spiller::fillSpillRun(std::vector<char*>&);
template void Spiller::fillSpillRun(std::vector<char*, memory::StlAllocator<char*>>&);

template void Spiller::spill(std::vector<char*>&);
template void Spiller::spill(std::vector<char*, memory::StlAllocator<char*>>&);

} // namespace facebook::velox::exec
6 changes: 4 additions & 2 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class Spiller {
/// 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container. The caller needs to erase them from the row container.
void spill(std::vector<char*>& rows);
template <typename VectorType>
void spill(VectorType& rows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
/// only used by the spilling operator which doesn't need data sort, such as
Expand Down Expand Up @@ -297,7 +298,8 @@ class Spiller {

// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);
template <typename VectorType>
void fillSpillRun(VectorType& rows);

// Writes out all the rows collected in spillRuns_.
void runSpill(bool lastRun);
Expand Down

0 comments on commit 3830f04

Please sign in to comment.