-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add prefix sort #7230
add prefix sort #7230
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#include "PrefixSort.h" | ||
|
||
namespace facebook::velox::exec { | ||
void PrefixSort::extractRowToPrefix( | ||
char* FOLLY_NONNULL row, | ||
char* FOLLY_NONNULL prefix) { | ||
// extract key | ||
for (int32_t index = 0; index < sortLayout_.numPrefixKeys_; index++) { | ||
VELOX_DYNAMIC_TYPE_DISPATCH( | ||
rowToPrefix, | ||
rowContainer_->keyTypes()[index]->kind(), | ||
index, | ||
rowContainer_->columnAt(index), | ||
row, | ||
prefix); | ||
} | ||
// Set address of row. | ||
*reinterpret_cast<char**>(prefix + sortLayout_.keySize) = row; | ||
} | ||
|
||
void PrefixSort::preparePrefix() { | ||
// Compute prefix offsets for sort columns. | ||
uint32_t offset = 0; | ||
for (int i = 0; i < sortLayout_.numPrefixKeys_; i++) { | ||
prefixOffsets.push_back(offset); | ||
offset += prefixKeySize(rowContainer_->keyTypes()[i]->kind()); | ||
} | ||
int32_t numRows = numInputRows_; | ||
// Allocate prefixes_ data. | ||
constexpr auto kPageSize = memory::AllocationTraits::kPageSize; | ||
auto numPages = | ||
bits::roundUp(numRows * sortLayout_.entrySize, kPageSize) / kPageSize; | ||
rowContainer_->pool()->allocateContiguous(numPages, prefixAllocation); | ||
prefixes_ = prefixAllocation.data<char>(); | ||
} | ||
|
||
void PrefixSort::sort(std::vector<char*>& rows) { | ||
RowContainerIterator iter; | ||
rowContainer_->listRows(&iter, numInputRows_, rows.data()); | ||
for (uint64_t i = 0; i < rows.size(); ++i) { | ||
extractRowToPrefix(rows[i], prefixes_ + sortLayout_.entrySize * i); | ||
} | ||
auto swapBuffer = AlignedBuffer::allocate<char>( | ||
sortLayout_.entrySize, rowContainer_->pool()); | ||
PrefixSortRunner sortRunner( | ||
sortLayout_.entrySize, swapBuffer->asMutable<char>()); | ||
auto start = prefixes_; | ||
auto end = prefixes_ + numInputRows_ * sortLayout_.entrySize; | ||
|
||
sortRunner.quickSort( | ||
start, end, [&](char* a, char* b) { return compare(a, b); }); | ||
|
||
for (int i = 0; i < rows.size(); i++) { | ||
rows[i] = *reinterpret_cast<char**>( | ||
prefixes_ + i * sortLayout_.entrySize + sortLayout_.keySize); | ||
} | ||
} | ||
|
||
} // namespace facebook::velox::exec |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
/* | ||
* Copyright (c) Facebook, Inc. and its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#pragma once | ||
|
||
#include "RowContainer.h" | ||
#include "string.h" | ||
#include "velox/common/memory/Allocation.h" | ||
#include "velox/common/memory/AllocationPool.h" | ||
#include "velox/common/memory/HashStringAllocator.h" | ||
#include "velox/common/memory/MemoryAllocator.h" | ||
#include "velox/exec/prefixsort/PrefixSortAlgorithm.h" | ||
#include "velox/exec/prefixsort/PrefixSortEncoder.h" | ||
|
||
using namespace facebook::velox::exec::prefixsort; | ||
|
||
namespace facebook::velox::exec { | ||
|
||
struct PrefixSortConfig { | ||
PrefixSortConfig( | ||
const uint32_t maxPrefixKeyLength) | ||
: maxPrefixKeyLength(maxPrefixKeyLength) {} | ||
uint32_t maxPrefixKeyLength; | ||
}; | ||
|
||
struct PrefixSortLayout { | ||
PrefixSortLayout( | ||
const std::vector<TypePtr>& types, | ||
const std::vector<CompareFlags>& keyCompareFlags, | ||
const uint32_t maxPrefixKeyLength) | ||
: keySize(0), | ||
numPrefixKeys_(0), | ||
numSortKeys_(types.size()), | ||
keyCompareFlags_(keyCompareFlags) { | ||
VELOX_CHECK(types.size() > 0); | ||
for (TypePtr type : types) { | ||
if (numPrefixKeys_ > maxPrefixKeyLength) { | ||
break; | ||
} | ||
if (type->kind() == TypeKind::BIGINT) { | ||
numPrefixKeys_++; | ||
keySize += sizeof(TypeTraits<TypeKind::BIGINT>::NativeType); | ||
} else { | ||
break; | ||
} | ||
} | ||
entrySize = keySize + sizeof(char*); | ||
if (numPrefixKeys_ < numSortKeys_) { | ||
needSortData = true; | ||
} | ||
} | ||
|
||
// prefix size is fixed. | ||
uint32_t keySize; | ||
uint64_t entrySize; | ||
int32_t numPrefixKeys_; | ||
const int32_t numSortKeys_; | ||
std::vector<CompareFlags> keyCompareFlags_; | ||
bool needSortData = false; | ||
}; | ||
|
||
class PrefixSort { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps, this can be another PR with tests. |
||
public: | ||
PrefixSort( | ||
RowContainer* FOLLY_NONNULL rowContainer, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not use FOLLY_NONNULL |
||
const std::vector<CompareFlags>& keyCompareFlags, | ||
size_t numInputRows, | ||
const PrefixSortConfig& config) | ||
: sortLayout_( | ||
rowContainer->keyTypes(), | ||
keyCompareFlags, | ||
config.maxPrefixKeyLength), | ||
rowContainer_(rowContainer), | ||
numInputRows_(numInputRows) {} | ||
|
||
// Implement the prepare and sort methods separately to | ||
// facilitate the collection of metrics. | ||
void preparePrefix(); | ||
|
||
void sort(std::vector<char*>& rows); | ||
|
||
int compare(char* left, char* right) { | ||
if (!sortLayout_.needSortData) { | ||
return memcmp(left, right, (size_t)sortLayout_.keySize); | ||
} else { | ||
int result = memcmp(left, right, (size_t)sortLayout_.keySize); | ||
if (result != 0) { | ||
return result; | ||
} | ||
char* leftAddress = getAddressFromPrefix(left); | ||
char* rightAddress = getAddressFromPrefix(right); | ||
for (int i = sortLayout_.numPrefixKeys_; i < sortLayout_.numSortKeys_; | ||
i++) { | ||
result = rowContainer_->compare( | ||
leftAddress, rightAddress, i, sortLayout_.keyCompareFlags_[i]); | ||
if (result != 0) { | ||
return result; | ||
} | ||
} | ||
} | ||
return 0; | ||
} | ||
|
||
private: | ||
void extractRowToPrefix(char* row, char* prefix); | ||
|
||
template <TypeKind typeKind> | ||
inline void rowToPrefix( | ||
uint32_t index, | ||
const RowColumn& rowColumn, | ||
char* FOLLY_NONNULL row, | ||
char* FOLLY_NONNULL prefix) { | ||
VELOX_UNSUPPORTED("prefix sort not support the type."); | ||
} | ||
|
||
uint32_t prefixKeySize(const TypeKind& typeKind) { | ||
if (typeKind == TypeKind::BIGINT) { | ||
return sizeof(TypeTraits<TypeKind::BIGINT>::NativeType); | ||
} | ||
// TODO support varchar later | ||
VELOX_UNSUPPORTED("prefix sort not support the type."); | ||
} | ||
|
||
inline char* getAddressFromPrefix(char* prefix) { | ||
return *reinterpret_cast<char**>(prefix + sortLayout_.keySize); | ||
} | ||
|
||
// Store prefix and address for sort data. | ||
memory::ContiguousAllocation prefixAllocation; | ||
char* prefixes_; | ||
PrefixSortLayout sortLayout_; | ||
std::vector<int32_t> prefixOffsets; | ||
RowContainer* rowContainer_; | ||
size_t numInputRows_; | ||
}; | ||
|
||
template <> | ||
inline void PrefixSort::rowToPrefix<TypeKind::BIGINT>( | ||
uint32_t index, | ||
const RowColumn& rowColumn, | ||
char* FOLLY_NONNULL row, | ||
char* FOLLY_NONNULL prefix) { | ||
using T = TypeTraits<TypeKind::BIGINT>::NativeType; | ||
// store null as min/max value according compare flags. | ||
if (RowContainer::isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) { | ||
CompareFlags compareFlags = sortLayout_.keyCompareFlags_[index]; | ||
PrefixSortEncoder::encode( | ||
((compareFlags.ascending && compareFlags.nullsFirst) || | ||
(!compareFlags.ascending && !compareFlags.nullsFirst)) | ||
? std::numeric_limits<T>::min() | ||
: std::numeric_limits<T>::max(), | ||
prefix + prefixOffsets[index]); | ||
} else { | ||
PrefixSortEncoder::encode( | ||
*(reinterpret_cast<T*>(row + rowColumn.offset())), | ||
prefix + prefixOffsets[index]); | ||
} | ||
// invert bits if desc | ||
if (!sortLayout_.keyCompareFlags_[index].ascending) { | ||
for (uint64_t s = 0; s < sizeof(T); s++) { | ||
*(prefix + prefixOffsets[index] + s) = | ||
~*(prefix + prefixOffsets[index] + s); | ||
} | ||
} | ||
} | ||
} // namespace facebook::velox::exec |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,13 +26,15 @@ SortBuffer::SortBuffer( | |
velox::memory::MemoryPool* pool, | ||
tsan_atomic<bool>* nonReclaimableSection, | ||
const common::SpillConfig* spillConfig, | ||
uint64_t spillMemoryThreshold) | ||
uint64_t spillMemoryThreshold, | ||
const std::optional<PrefixSortConfig>& prefixSortConfig) | ||
: input_(input), | ||
sortCompareFlags_(sortCompareFlags), | ||
pool_(pool), | ||
nonReclaimableSection_(nonReclaimableSection), | ||
spillConfig_(spillConfig), | ||
spillMemoryThreshold_(spillMemoryThreshold) { | ||
spillMemoryThreshold_(spillMemoryThreshold), | ||
prefixSortConfig_(prefixSortConfig) { | ||
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); | ||
VELOX_CHECK_GT(sortCompareFlags_.size(), 0); | ||
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); | ||
|
@@ -111,20 +113,30 @@ void SortBuffer::noMoreInput() { | |
// the rows. | ||
sortedRows_.resize(numInputRows_); | ||
RowContainerIterator iter; | ||
data_->listRows(&iter, numInputRows_, sortedRows_.data()); | ||
std::sort( | ||
sortedRows_.begin(), | ||
sortedRows_.end(), | ||
[this](const char* leftRow, const char* rightRow) { | ||
for (vector_size_t index = 0; index < sortCompareFlags_.size(); | ||
++index) { | ||
if (auto result = data_->compare( | ||
leftRow, rightRow, index, sortCompareFlags_[index])) { | ||
return result < 0; | ||
if (prefixSortConfig_.has_value()) { | ||
auto prefixSort = PrefixSort( | ||
data_.get(), | ||
sortCompareFlags_, | ||
numInputRows_, | ||
prefixSortConfig_.value()); | ||
prefixSort.preparePrefix(); | ||
prefixSort.sort(sortedRows_); | ||
} else { | ||
data_->listRows(&iter, numInputRows_, sortedRows_.data()); | ||
std::sort( | ||
sortedRows_.begin(), | ||
sortedRows_.end(), | ||
[this](const char* leftRow, const char* rightRow) { | ||
for (vector_size_t index = 0; index < sortCompareFlags_.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a simple case sorting by a few integers, we can optimize this comparison to use memcmp, no? I wonder how would this optimization compare with using PrefixSort. Specifically, I'm curious whether it will give us the most benefits with PrefixSort adding just a little bit of perf boost on top. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this is the code velox use now see the if body that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's right and I assume this is very inefficient. I wonder what happens if we just optimize this comparator. I expect it will be speed things up quite a bit. |
||
++index) { | ||
if (auto result = data_->compare( | ||
leftRow, rightRow, index, sortCompareFlags_[index])) { | ||
return result < 0; | ||
} | ||
} | ||
} | ||
return false; | ||
}); | ||
return false; | ||
}); | ||
} | ||
} else { | ||
// Spill the remaining in-memory state to disk if spilling has been | ||
// triggered on this sort buffer. This is to simplify query OOM prevention | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you add documentation for these properties to config.rst?