Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prefix sort #7230

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,15 @@ class QueryConfig {
/// Maximum number of splits to preload. Set to 0 to disable preloading.
static constexpr const char* kMaxSplitPreloadPerDriver =
"max_split_preload_per_driver";
static constexpr const char* kEnablePrefixSort = "enable_prefix_sort";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add documentation for these properties to config.rst?


static constexpr const char* kPrefixSortMaxKeyLength =
"prefix_sort_max_key_length";

// TODO: for testing , remove as follow-up
static constexpr const char* kEnablePrefixSortWithIterator =
"enable_prefix_sort_with_iterater";


uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
Expand Down Expand Up @@ -714,6 +723,19 @@ class QueryConfig {
return get<int32_t>(kMaxSplitPreloadPerDriver, 2);
}

bool isPrefixSortEnabled() const {
return get<bool>(kEnablePrefixSort, false);
}

bool isPrefixSortEnabledWithIterator() const {
return get<bool>(kEnablePrefixSortWithIterator, false);
}

uint32_t prefixSortMaxKeyLength() const {
return get<uint32_t>(
kPrefixSortMaxKeyLength, std::numeric_limits<uint32_t>::max());
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ add_library(
NestedLoopJoinProbe.cpp
Operator.cpp
OperatorUtils.cpp
PrefixSort.cpp
OrderBy.cpp
PartitionedOutput.cpp
OutputBuffer.cpp
Expand Down
6 changes: 5 additions & 1 deletion velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ OrderBy::OrderBy(
pool(),
&nonReclaimableSection_,
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr,
operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold());
operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold(),
driverCtx->queryConfig().isPrefixSortEnabled()
? std::make_optional<PrefixSortConfig>(
driverCtx->queryConfig().prefixSortMaxKeyLength())
: std::nullopt);
}

void OrderBy::addInput(RowVectorPtr input) {
Expand Down
74 changes: 74 additions & 0 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PrefixSort.h"

namespace facebook::velox::exec {
void PrefixSort::extractRowToPrefix(
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
// extract key
for (int32_t index = 0; index < sortLayout_.numPrefixKeys_; index++) {
VELOX_DYNAMIC_TYPE_DISPATCH(
rowToPrefix,
rowContainer_->keyTypes()[index]->kind(),
index,
rowContainer_->columnAt(index),
row,
prefix);
}
// Set address of row.
*reinterpret_cast<char**>(prefix + sortLayout_.keySize) = row;
}

void PrefixSort::preparePrefix() {
// Compute prefix offsets for sort columns.
uint32_t offset = 0;
for (int i = 0; i < sortLayout_.numPrefixKeys_; i++) {
prefixOffsets.push_back(offset);
offset += prefixKeySize(rowContainer_->keyTypes()[i]->kind());
}
int32_t numRows = numInputRows_;
// Allocate prefixes_ data.
constexpr auto kPageSize = memory::AllocationTraits::kPageSize;
auto numPages =
bits::roundUp(numRows * sortLayout_.entrySize, kPageSize) / kPageSize;
rowContainer_->pool()->allocateContiguous(numPages, prefixAllocation);
prefixes_ = prefixAllocation.data<char>();
}

void PrefixSort::sort(std::vector<char*>& rows) {
RowContainerIterator iter;
rowContainer_->listRows(&iter, numInputRows_, rows.data());
for (uint64_t i = 0; i < rows.size(); ++i) {
extractRowToPrefix(rows[i], prefixes_ + sortLayout_.entrySize * i);
}
auto swapBuffer = AlignedBuffer::allocate<char>(
sortLayout_.entrySize, rowContainer_->pool());
PrefixSortRunner sortRunner(
sortLayout_.entrySize, swapBuffer->asMutable<char>());
auto start = prefixes_;
auto end = prefixes_ + numInputRows_ * sortLayout_.entrySize;

sortRunner.quickSort(
start, end, [&](char* a, char* b) { return compare(a, b); });

for (int i = 0; i < rows.size(); i++) {
rows[i] = *reinterpret_cast<char**>(
prefixes_ + i * sortLayout_.entrySize + sortLayout_.keySize);
}
}

} // namespace facebook::velox::exec
178 changes: 178 additions & 0 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "RowContainer.h"
#include "string.h"
#include "velox/common/memory/Allocation.h"
#include "velox/common/memory/AllocationPool.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/exec/prefixsort/PrefixSortAlgorithm.h"
#include "velox/exec/prefixsort/PrefixSortEncoder.h"

using namespace facebook::velox::exec::prefixsort;

namespace facebook::velox::exec {

struct PrefixSortConfig {
PrefixSortConfig(
const uint32_t maxPrefixKeyLength)
: maxPrefixKeyLength(maxPrefixKeyLength) {}
uint32_t maxPrefixKeyLength;
};

struct PrefixSortLayout {
PrefixSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& keyCompareFlags,
const uint32_t maxPrefixKeyLength)
: keySize(0),
numPrefixKeys_(0),
numSortKeys_(types.size()),
keyCompareFlags_(keyCompareFlags) {
VELOX_CHECK(types.size() > 0);
for (TypePtr type : types) {
if (numPrefixKeys_ > maxPrefixKeyLength) {
break;
}
if (type->kind() == TypeKind::BIGINT) {
numPrefixKeys_++;
keySize += sizeof(TypeTraits<TypeKind::BIGINT>::NativeType);
} else {
break;
}
}
entrySize = keySize + sizeof(char*);
if (numPrefixKeys_ < numSortKeys_) {
needSortData = true;
}
}

// prefix size is fixed.
uint32_t keySize;
uint64_t entrySize;
int32_t numPrefixKeys_;
const int32_t numSortKeys_;
std::vector<CompareFlags> keyCompareFlags_;
bool needSortData = false;
};

class PrefixSort {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, this can be another PR with tests.

public:
PrefixSort(
RowContainer* FOLLY_NONNULL rowContainer,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use FOLLY_NONNULL

const std::vector<CompareFlags>& keyCompareFlags,
size_t numInputRows,
const PrefixSortConfig& config)
: sortLayout_(
rowContainer->keyTypes(),
keyCompareFlags,
config.maxPrefixKeyLength),
rowContainer_(rowContainer),
numInputRows_(numInputRows) {}

// Implement the prepare and sort methods separately to
// facilitate the collection of metrics.
void preparePrefix();

void sort(std::vector<char*>& rows);

int compare(char* left, char* right) {
if (!sortLayout_.needSortData) {
return memcmp(left, right, (size_t)sortLayout_.keySize);
} else {
int result = memcmp(left, right, (size_t)sortLayout_.keySize);
if (result != 0) {
return result;
}
char* leftAddress = getAddressFromPrefix(left);
char* rightAddress = getAddressFromPrefix(right);
for (int i = sortLayout_.numPrefixKeys_; i < sortLayout_.numSortKeys_;
i++) {
result = rowContainer_->compare(
leftAddress, rightAddress, i, sortLayout_.keyCompareFlags_[i]);
if (result != 0) {
return result;
}
}
}
return 0;
}

private:
void extractRowToPrefix(char* row, char* prefix);

template <TypeKind typeKind>
inline void rowToPrefix(
uint32_t index,
const RowColumn& rowColumn,
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
VELOX_UNSUPPORTED("prefix sort not support the type.");
}

uint32_t prefixKeySize(const TypeKind& typeKind) {
if (typeKind == TypeKind::BIGINT) {
return sizeof(TypeTraits<TypeKind::BIGINT>::NativeType);
}
// TODO support varchar later
VELOX_UNSUPPORTED("prefix sort not support the type.");
}

inline char* getAddressFromPrefix(char* prefix) {
return *reinterpret_cast<char**>(prefix + sortLayout_.keySize);
}

// Store prefix and address for sort data.
memory::ContiguousAllocation prefixAllocation;
char* prefixes_;
PrefixSortLayout sortLayout_;
std::vector<int32_t> prefixOffsets;
RowContainer* rowContainer_;
size_t numInputRows_;
};

template <>
inline void PrefixSort::rowToPrefix<TypeKind::BIGINT>(
uint32_t index,
const RowColumn& rowColumn,
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
using T = TypeTraits<TypeKind::BIGINT>::NativeType;
// store null as min/max value according compare flags.
if (RowContainer::isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) {
CompareFlags compareFlags = sortLayout_.keyCompareFlags_[index];
PrefixSortEncoder::encode(
((compareFlags.ascending && compareFlags.nullsFirst) ||
(!compareFlags.ascending && !compareFlags.nullsFirst))
? std::numeric_limits<T>::min()
: std::numeric_limits<T>::max(),
prefix + prefixOffsets[index]);
} else {
PrefixSortEncoder::encode(
*(reinterpret_cast<T*>(row + rowColumn.offset())),
prefix + prefixOffsets[index]);
}
// invert bits if desc
if (!sortLayout_.keyCompareFlags_[index].ascending) {
for (uint64_t s = 0; s < sizeof(T); s++) {
*(prefix + prefixOffsets[index] + s) =
~*(prefix + prefixOffsets[index] + s);
}
}
}
} // namespace facebook::velox::exec
42 changes: 27 additions & 15 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ SortBuffer::SortBuffer(
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
const common::SpillConfig* spillConfig,
uint64_t spillMemoryThreshold)
uint64_t spillMemoryThreshold,
const std::optional<PrefixSortConfig>& prefixSortConfig)
: input_(input),
sortCompareFlags_(sortCompareFlags),
pool_(pool),
nonReclaimableSection_(nonReclaimableSection),
spillConfig_(spillConfig),
spillMemoryThreshold_(spillMemoryThreshold) {
spillMemoryThreshold_(spillMemoryThreshold),
prefixSortConfig_(prefixSortConfig) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
Expand Down Expand Up @@ -111,20 +113,30 @@ void SortBuffer::noMoreInput() {
// the rows.
sortedRows_.resize(numInputRows_);
RowContainerIterator iter;
data_->listRows(&iter, numInputRows_, sortedRows_.data());
std::sort(
sortedRows_.begin(),
sortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
for (vector_size_t index = 0; index < sortCompareFlags_.size();
++index) {
if (auto result = data_->compare(
leftRow, rightRow, index, sortCompareFlags_[index])) {
return result < 0;
if (prefixSortConfig_.has_value()) {
auto prefixSort = PrefixSort(
data_.get(),
sortCompareFlags_,
numInputRows_,
prefixSortConfig_.value());
prefixSort.preparePrefix();
prefixSort.sort(sortedRows_);
} else {
data_->listRows(&iter, numInputRows_, sortedRows_.data());
std::sort(
sortedRows_.begin(),
sortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
for (vector_size_t index = 0; index < sortCompareFlags_.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a simple case sorting by a few integers, we can optimize this comparison to use memcmp, no? I wonder how would this optimization compare with using PrefixSort. Specifically, I'm curious whether it will give us the most benefits with PrefixSort adding just a little bit of perf boost on top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is the code velox use now see the if body thats PrefixSort impl
if (prefixSortConfig_.has_value())
`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right and I assume this is very inefficient. I wonder what happens if we just optimize this comparator. I expect it will be speed things up quite a bit.

++index) {
if (auto result = data_->compare(
leftRow, rightRow, index, sortCompareFlags_[index])) {
return result < 0;
}
}
}
return false;
});
return false;
});
}
} else {
// Spill the remaining in-memory state to disk if spilling has been
// triggered on this sort buffer. This is to simplify query OOM prevention
Expand Down
Loading
Loading