Skip to content

Commit

Permalink
add prefix sort
Browse files Browse the repository at this point in the history
  • Loading branch information
skadilover committed Nov 8, 2023
1 parent 42b604b commit 8de29fd
Show file tree
Hide file tree
Showing 13 changed files with 2,010 additions and 17 deletions.
6 changes: 6 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ class QueryConfig {
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";

static constexpr const char* kEnablePrefixSort = "enable_prefix_sort";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -664,6 +666,10 @@ class QueryConfig {
return get<bool>(kEnableExpressionEvaluationCache, true);
}

bool isPrefixSortEnabled() const {
return get<bool>(kEnablePrefixSort, false);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ add_library(
NestedLoopJoinProbe.cpp
Operator.cpp
OperatorUtils.cpp
PrefixSort.cpp
OrderBy.cpp
PartitionedOutput.cpp
OutputBuffer.cpp
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ OrderBy::OrderBy(
&nonReclaimableSection_,
&numSpillRuns_,
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr,
operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold());
operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold(),
driverCtx->queryConfig().isPrefixSortEnabled());
}

void OrderBy::addInput(RowVectorPtr input) {
Expand Down
120 changes: 120 additions & 0 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "PrefixSort.h"

namespace facebook::velox::exec {
void PrefixSort::extractRowToPrefix(
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
// extract key
for (int32_t index = 0; index < sortLayout_.numPrefixKeys_; index++) {
VELOX_DYNAMIC_TYPE_DISPATCH(
rowToPrefix,
rowContainer_->keyTypes()[index]->kind(),
index,
rowContainer_->columnAt(index),
row,
prefix);
}
// Set address of row.
*reinterpret_cast<char**>(prefix + sortLayout_.keySize) = row;
}

void PrefixSort::preparePrefix() {
// Compute prefix offsets for sort columns.
uint32_t offset = 0;
for (int i = 0; i < sortLayout_.numPrefixKeys_; i++) {
prefixOffsets.push_back(offset);
offset += prefixKeySize(rowContainer_->keyTypes()[i]->kind());
}
int32_t numRows = numInputRows_;
// Allocate prefixes_ data.
constexpr auto kPageSize = memory::AllocationTraits::kPageSize;
auto numPages =
bits::roundUp(numRows * sortLayout_.entrySize, kPageSize) /
kPageSize;
rowContainer_->pool()->allocateContiguous(numPages, prefixAllocation);
prefixes_ = prefixAllocation.data<char>();

RowContainerIterator tmp;
RowContainerIterator* iter = &tmp;
int32_t count = 0;
auto numAllocations = rowContainer_->rows_.numRanges();
if (iter->allocationIndex == 0 && iter->rowOffset == 0) {
iter->normalizedKeysLeft = rowContainer_->numRowsWithNormalizedKey_;
iter->normalizedKeySize = rowContainer_->originalNormalizedKeySize_;
}
int32_t rowSize = rowContainer_->fixedRowSize_ +
(iter->normalizedKeysLeft > 0 ? rowContainer_->originalNormalizedKeySize_ : 0);
char* prefix = prefixes_;
char* address = nullptr;
for (auto i = iter->allocationIndex; i < numAllocations; ++i) {
auto range = rowContainer_->rows_.rangeAt(i);
auto* data =
range.data() + memory::alignmentPadding(range.data(), rowContainer_->alignment_);
auto limit = range.size() -
(reinterpret_cast<uintptr_t>(data) -
reinterpret_cast<uintptr_t>(range.data()));
auto row = iter->rowOffset;
while (row + rowSize <= limit) {
address = data + row +
(iter->normalizedKeysLeft > 0 ? rowContainer_->originalNormalizedKeySize_ : 0);
VELOX_DCHECK_EQ(
reinterpret_cast<uintptr_t>(address) % rowContainer_->alignment_, 0);
row += rowSize;
if (--iter->normalizedKeysLeft == 0) {
rowSize -= rowContainer_->originalNormalizedKeySize_;
}
if (bits::isBitSet(address, rowContainer_->freeFlagOffset_)) {
continue;
}
prefix = prefixes_ + sortLayout_.entrySize * count;
extractRowToPrefix(address, prefix);
*(reinterpret_cast<char**>(prefix + sortLayout_.keySize)) = address;
count++;
}
iter->rowOffset = 0;
}
}

int PrefixSort::compare(
const PrefixSortIterator& left,
const PrefixSortIterator& right) {
if (!sortLayout_.needSortData) {
return FastMemcmp(*left, *right, (size_t)sortLayout_.keySize);
}
else {
int result = FastMemcmp(*left, *right, (size_t)sortLayout_.keySize);
if (result != 0) {
return result;
}
char* leftAddress = getAddressFromPrefix(left);
char* rightAddress = getAddressFromPrefix(right);
for (int i = sortLayout_.numPrefixKeys_; i < sortLayout_.numSortKeys_;
i++) {
int result = rowContainer_->compare(
leftAddress, rightAddress, i, sortLayout_.keyCompareFlags_[i]);
if (result != 0) {
return result;
}
}
}
return 0;
}

void PrefixSort::sort(std::vector<char*>& rows) {
auto start = PrefixSortIterator(prefixes_, sortLayout_.entrySize);
auto end = start + numInputRows_;
auto prefixSortContext = PrefixSortContext(sortLayout_.entrySize, *end);
PrefixQuickSort(
prefixSortContext,
start,
end,
[&](const PrefixSortIterator& a, const PrefixSortIterator& b) {
return compare(a, b);
});
// copy address from prefix tail to returnRows
for (int i = 0; i < end - start; i++) {
rows[i] = getAddressFromPrefix(start + i);
}
}

} // namespace facebook::velox::exec
117 changes: 117 additions & 0 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#pragma once

#include "PrefixSortAlgorithm.h"
#include "RowContainer.h"
#include "string.h"
#include "velox/common/memory/Allocation.h"
#include "velox/common/memory/AllocationPool.h"
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/common/memory/MemoryAllocator.h"

namespace facebook::velox::exec {

struct PrefixSortLayout {
PrefixSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& keyCompareFlags)
: keySize(0), numPrefixKeys_(0), numSortKeys_(types.size()), keyCompareFlags_(keyCompareFlags) {
VELOX_CHECK(types.size() > 0);
for (TypePtr type : types) {
if (type->kind() == TypeKind::BIGINT) {
numPrefixKeys_++;
keySize += sizeof(TypeTraits<TypeKind::BIGINT>::NativeType);
} else {
break;
}
}
entrySize = keySize + sizeof(char*);
if (numPrefixKeys_ < numSortKeys_) {
needSortData = true;
}
}

// prefix size is fixed.
uint32_t keySize;
uint32_t entrySize;
int32_t numPrefixKeys_;
const int32_t numSortKeys_;
std::vector<CompareFlags> keyCompareFlags_;
bool needSortData = false;
};

class PrefixSort {
public:
PrefixSort(
RowContainer* FOLLY_NONNULL rowContainer,
const std::vector<CompareFlags>& keyCompareFlags,
size_t numInputRows)
: sortLayout_(rowContainer->keyTypes(), keyCompareFlags),
rowContainer_(rowContainer), numInputRows_(numInputRows) {}

// Implement the prepare and sort methods separately to
// facilitate the collection of metrics.
void preparePrefix();

void sort(std::vector<char*>& rows);

int compare(const PrefixSortIterator& left, const PrefixSortIterator& right);

private:
void extractRowToPrefix(char* row, char* prefix);

template <TypeKind typeKind>
inline void rowToPrefix(
uint32_t index,
const RowColumn& rowColumn,
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
VELOX_UNSUPPORTED("prefix sort not support the type.");
}

uint32_t prefixKeySize(const TypeKind& typeKind) {
if (typeKind == TypeKind::BIGINT) {
return sizeof(TypeTraits<TypeKind::BIGINT>::NativeType);
}
// TODO support varchar later
VELOX_UNSUPPORTED("prefix sort not support the type.");
}

inline char* getAddressFromPrefix(const PrefixSortIterator& iter) {
return *reinterpret_cast<char**>((*iter) + sortLayout_.keySize);
}
// Store prefix and address for sort data.
size_t numInputRows_;
memory::ContiguousAllocation prefixAllocation;
char* prefixes_;
PrefixSortLayout sortLayout_;
std::vector<int32_t> prefixOffsets;
RowContainer* rowContainer_;
};

template <>
inline void PrefixSort::rowToPrefix<TypeKind::BIGINT>(
uint32_t index,
const RowColumn& rowColumn,
char* FOLLY_NONNULL row,
char* FOLLY_NONNULL prefix) {
using T = TypeTraits<TypeKind::BIGINT>::NativeType;
// store null as min/max value according compare flags.
if (RowContainer::isNullAt(
row, rowColumn.nullByte(), rowColumn.nullMask())) {
CompareFlags compareFlags = sortLayout_.keyCompareFlags_[index];
EncodeData(prefix + prefixOffsets[index],
((compareFlags.ascending && compareFlags.nullsFirst) ||
(!compareFlags.ascending && !compareFlags.nullsFirst))
? std::numeric_limits<T>::min()
: std::numeric_limits<T>::max());
} else {
EncodeData(prefix + prefixOffsets[index], *(reinterpret_cast<T*>(row + rowColumn.offset())));
}
// invert bits if desc
if (!sortLayout_.keyCompareFlags_[index].ascending) {
for (idx_t s = 0; s < sizeof(T); s++) {
*(prefix + prefixOffsets[index] + s) = ~*(prefix + prefixOffsets[index] + s);
}
}
}
} // namespace facebook::velox::exec
Loading

0 comments on commit 8de29fd

Please sign in to comment.