Skip to content

Commit

Permalink
Add prefix-sort with support for fixed width sorting keys (facebookin…
Browse files Browse the repository at this point in the history
…cubator#8146)

Summary:
Add PrefixSort and ut & benchmarks for it :
1. PrefixSort is used to improve in-memory sort performance, using memcmp to compare binary string (normalized encoded sort keys called 'prefix') when sorting. This PR adds 'extract rows to prefix'  and 'sort' for fixed width types (int32, int64, float, double, timestamp etc.) as a basic PR, more types will add as a follow-up.
2. As a basic work, this PR add ut & benchmarks for scalar types includes single/multi keys and fuzz cases etc.

3. Benchmarks, std-sort vs prefix-sort :
`
============================================================================
[...]ec/benchmarks/PrefixSortBenchmark.cpp     relative  time/iter   iters/s
============================================================================
StdSort_Bigint100k                                         34.30ms     29.16
PrefixSort_Bigint100k                                      27.16ms     36.81
StdSort_Bigint1M                                          465.00ms      2.15
PrefixSort_Bigint1M                                       330.97ms      3.02
StdSort_Bigint10M                                            7.96s   125.68m
PrefixSort_Bigint10M                                         3.87s   258.28m
StdSort_Varchar100k                                        62.02ms     16.12
PrefixSort_Varchar100k                                     61.64ms     16.22
StdSort_Varchar1M                                            1.31s   763.74m
PrefixSort_Varchar1M                                         1.27s   785.51m
StdSort_Varchar10M                                          23.06s    43.36m
PrefixSort_Varchar10M                                       22.67s    44.11m
StdSort_BigintBigint100k                                   48.61ms     20.57
PrefixSort_BigintBigint100k                                30.70ms     32.58
StdSort_BigintBigint1M                                    631.11ms      1.58
PrefixSort_BigintBigint1M                                 379.40ms      2.64
StdSort_BigintBigint10M                                     11.38s    87.86m
PrefixSort_BigintBigint10M                                   4.42s   226.42m
StdSort_BigintVarchar100k                                  65.36ms     15.30
PrefixSort_BigintVarchar100k                               56.52ms     17.69
StdSort_BigintVarchar1M                                      1.29s   777.33m
PrefixSort_BigintVarchar1M                                   1.11s   901.87m
StdSort_BigintVarchar10M                                    22.03s    45.39m
PrefixSort_BigintVarchar10M                                 18.00s    55.56m
`

For the case that all keys can normlized, Bigint, BiginBigint, when row numbers bigger than 1M, you can see obvious optimization effects  about 1x-2x up.

For the case that can not normlized, Varchar , as binary string opt logic is skipped , the performance keep same.

One more thing is that we use std::memcmp in this PR,  if the cost is worth it, we can align the prefix-buffer to 4 or 8, so that we can use int compare or long compare, and the performance will be much more obvious.

Part of facebookincubator#6766

Pull Request resolved: facebookincubator#8146

Differential Revision: D54247347

Pulled By: mbasmanova
  • Loading branch information
skadilover authored and facebook-github-bot committed Feb 27, 2024
1 parent 62b8cfd commit 9b035a5
Show file tree
Hide file tree
Showing 8 changed files with 974 additions and 1 deletion.
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ add_library(
NestedLoopJoinProbe.cpp
Operator.cpp
OperatorUtils.cpp
PrefixSort.cpp
OrderBy.cpp
PartitionedOutput.cpp
OutputBuffer.cpp
Expand Down
230 changes: 230 additions & 0 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/PrefixSort.h"

using namespace facebook::velox::exec::prefixsort;

namespace facebook::velox::exec {

namespace {

template <typename T>
FOLLY_ALWAYS_INLINE void encodeRowColumn(
const PrefixSortLayout& prefixSortLayout,
const uint32_t index,
const RowColumn& rowColumn,
char* const row,
char* const prefix) {
std::optional<T> value;
if (RowContainer::isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) {
value = std::nullopt;
} else {
value = *(reinterpret_cast<T*>(row + rowColumn.offset()));
}
prefixSortLayout.encoders[index].encode(
value, prefix + prefixSortLayout.prefixOffsets[index]);
}

FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix(
TypeKind typeKind,
const PrefixSortLayout& prefixSortLayout,
const uint32_t index,
const RowColumn& rowColumn,
char* const row,
char* const prefix) {
switch (typeKind) {
case TypeKind::INTEGER: {
encodeRowColumn<int32_t>(prefixSortLayout, index, rowColumn, row, prefix);
return;
}
case TypeKind::BIGINT: {
encodeRowColumn<int64_t>(prefixSortLayout, index, rowColumn, row, prefix);
return;
}
case TypeKind::REAL: {
encodeRowColumn<float>(prefixSortLayout, index, rowColumn, row, prefix);
return;
}
case TypeKind::DOUBLE: {
encodeRowColumn<double>(prefixSortLayout, index, rowColumn, row, prefix);
return;
}
case TypeKind::TIMESTAMP: {
encodeRowColumn<Timestamp>(
prefixSortLayout, index, rowColumn, row, prefix);
return;
}
default:
VELOX_UNSUPPORTED(
"prefix-sort does not support type kind: {}",
mapTypeKindToName(typeKind));
}
}

} // namespace

PrefixSortLayout PrefixSortLayout::makeSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize) {
uint32_t normalizedKeySize = 0;
uint32_t numNormalizedKeys = 0;
const uint32_t numKeys = types.size();
std::vector<uint32_t> prefixOffsets;
std::vector<PrefixSortEncoder> encoders;

// Calculate encoders and prefix-offsets, and stop the loop if a key that
// cannot be normalized is encountered.
for (auto i = 0; i < numKeys; ++i) {
if (normalizedKeySize > maxNormalizedKeySize) {
break;
}
std::optional<uint32_t> encodedSize =
PrefixSortEncoder::encodedSize(types[i]->kind());
if (encodedSize.has_value()) {
prefixOffsets.push_back(normalizedKeySize);
encoders.push_back(
{compareFlags[i].ascending, compareFlags[i].nullsFirst});
normalizedKeySize += encodedSize.value();
numNormalizedKeys++;
} else {
break;
}
}
return PrefixSortLayout{
normalizedKeySize + sizeof(char*),
normalizedKeySize,
numNormalizedKeys,
numKeys,
compareFlags,
numNormalizedKeys < numKeys,
std::move(prefixOffsets),
std::move(encoders)};
}

FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys(
char* left,
char* right) {
return memcmp(left, right, sortLayout_.normalizedKeySize);
}

int PrefixSort::comparePartNormalizedKeys(char* left, char* right) {
int result = compareAllNormalizedKeys(left, right);
if (result != 0) {
return result;
}
// If prefixes are equal, compare the left sort keys with rowContainer.
char* leftAddress = getAddressFromPrefix(left);
char* rightAddress = getAddressFromPrefix(right);
for (auto i = sortLayout_.numNormalizedKeys; i < sortLayout_.numKeys; ++i) {
result = rowContainer_->compare(
leftAddress, rightAddress, i, sortLayout_.compareFlags[i]);
if (result != 0) {
return result;
}
}
return 0;
}

PrefixSort::PrefixSort(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& keyCompareFlags,
const PrefixSortConfig& config)
: pool_(pool),
sortLayout_(PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(),
keyCompareFlags,
config.maxNormalizedKeySize)),
rowContainer_(rowContainer) {}

void PrefixSort::extractRowToPrefix(char* row, char* prefix) {
for (auto i = 0; i < sortLayout_.numNormalizedKeys; i++) {
extractRowColumnToPrefix(
rowContainer_->keyTypes()[i]->kind(),
sortLayout_,
i,
rowContainer_->columnAt(i),
row,
prefix);
}
// Set row address.
getAddressFromPrefix(prefix) = row;
}

void PrefixSort::sort(std::vector<char*>& rows) {
VELOX_CHECK_GT(rows.size(), 0);
// All keys can not normalize, skip the binary string compare opt.
// Although benchmark time cost is nearly the same, it could avoid
// prefixAllocation.
if (sortLayout_.numNormalizedKeys == 0) {
std::sort(
rows.begin(),
rows.end(),
[&](const char* leftRow, const char* rightRow) {
for (vector_size_t index = 0; index < sortLayout_.numKeys; ++index) {
if (auto result = rowContainer_->compare(
leftRow,
rightRow,
index,
sortLayout_.compareFlags[index])) {
return result < 0;
}
}
return false;
});
return;
}

const auto numRows = rows.size();
const auto entrySize = sortLayout_.entrySize;
memory::ContiguousAllocation prefixAllocation;

// 1. Allocate prefixes data.
{
const auto numPages =
memory::AllocationTraits::numPages(numRows * entrySize);
pool_->allocateContiguous(numPages, prefixAllocation);
}
char* const prefixes = prefixAllocation.data<char>();

// 2. Extract rows to prefixes with row address.
for (auto i = 0; i < rows.size(); ++i) {
extractRowToPrefix(rows[i], prefixes + entrySize * i);
}

// 3. Sort prefixes with row address.
{
const auto swapBuffer = AlignedBuffer::allocate<char>(entrySize, pool_);
PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable<char>());
const auto start = prefixes;
const auto end = prefixes + numRows * entrySize;
if (sortLayout_.hasNonNormalizedKey) {
sortRunner.quickSort(start, end, [&](char* a, char* b) {
return comparePartNormalizedKeys(a, b);
});
} else {
sortRunner.quickSort(start, end, [&](char* a, char* b) {
return compareAllNormalizedKeys(a, b);
});
}
}
// 4. Output sorted row addresses.
for (int i = 0; i < rows.size(); i++) {
rows[i] = getAddressFromPrefix(prefixes + i * entrySize);
}
}
} // namespace facebook::velox::exec
121 changes: 121 additions & 0 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/common/memory/MemoryAllocator.h"
#include "velox/exec/RowContainer.h"
#include "velox/exec/prefixsort/PrefixSortAlgorithm.h"
#include "velox/exec/prefixsort/PrefixSortEncoder.h"

namespace facebook::velox::exec {

struct PrefixSortConfig {
PrefixSortConfig(const uint32_t maxNormalizedKeySize)
: maxNormalizedKeySize(maxNormalizedKeySize) {}
/// Max number of bytes can store normalized keys in prefix-sort buffer per
/// entry.
const uint32_t maxNormalizedKeySize;
};

/// The layout of prefix-sort buffer, a prefix entry includes:
/// 1. normalized keys
/// 2. non-normalized data ptr for semi-normalized types such as
/// string_view`s ptr, it will be filled when support Varchar.
/// 3. the row address ptr point to RowContainer`s rows is added at the end of
// prefix.
struct PrefixSortLayout {
/// Number of bytes to store a prefix, it equals to
/// normalizedKeySize_ + 8 (non-normalized-ptr) + sizeof(char*).
const uint64_t entrySize;

/// If a sort key supports normalization and can be added to the prefix
/// sort buffer, it is called a normalized key.
const uint32_t normalizedKeySize;

const uint32_t numNormalizedKeys;

/// The num of sort keys include normalized and non-normalized.
const uint32_t numKeys;

/// CompareFlags of all sort keys.
const std::vector<CompareFlags> compareFlags;

/// Whether the sort keys contains non-normalized key.
const bool hasNonNormalizedKey;

/// Offsets of normalized keys, used to find write locations when
/// extracting columns
const std::vector<uint32_t> prefixOffsets;

/// The encoders for normalized keys.
const std::vector<prefixsort::PrefixSortEncoder> encoders;

static PrefixSortLayout makeSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize);
};

class PrefixSort {
public:
PrefixSort(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& keyCompareFlags,
const PrefixSortConfig& config);

/// Follow the steps below to sort the data in RowContainer:
/// 1. Allocate a contiguous block of memory to store normalized keys.
/// 2. Extract the sort keys from the RowContainer. If the key can be
/// normalized, normalize it. For this kind of keys can be normalized,we
/// combine them with the original row address ptr and store them
/// together into a buffer, called 'Prefix'.
/// 3. Sort the prefixes data we got in step 2.
/// For keys can normalized(All fixed width types), we use 'memcmp' to compare
/// the normalized binary string.
/// For keys can not normalized, we use RowContainer`s compare method to
/// compare value.
/// For keys can part-normalized(Varchar, Row etc.), we will store the
/// normalized part and points to raw data in prefix, and custom the points
/// compare. The compare strategy will be defined in PrefixSortLayout as
/// follow-up, we treat this part as non-normalized until we implement all
/// fixed width types.
/// For complex types, e.g. ROW that can be converted to scalar types will be
/// supported.
/// 4. Extract the original row address ptr from prefixes (previously stored
/// them in the prefix buffer) into the input rows vector.
///
/// @param rows The result of RowContainer::listRows(), assuming that the
/// caller (SortBuffer etc.) has already got the result.
void sort(std::vector<char*>& rows);

private:
int compareAllNormalizedKeys(char* left, char* right);

int comparePartNormalizedKeys(char* left, char* right);

void extractRowToPrefix(char* row, char* prefix);

// Return the reference of row address ptr for read/write.
FOLLY_ALWAYS_INLINE char*& getAddressFromPrefix(char* prefix) {
return *reinterpret_cast<char**>(prefix + sortLayout_.normalizedKeySize);
}

memory::MemoryPool* pool_;
const PrefixSortLayout sortLayout_;
RowContainer* rowContainer_;
};
} // namespace facebook::velox::exec
5 changes: 5 additions & 0 deletions velox/exec/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ if(${VELOX_ENABLE_PARQUET})
arrow
thrift)
endif()

add_executable(velox_prefixsort_benchmark PrefixSortBenchmark.cpp)

target_link_libraries(velox_prefixsort_benchmark velox_exec velox_vector_fuzzer
velox_vector_test_lib ${FOLLY_BENCHMARK})
Loading

0 comments on commit 9b035a5

Please sign in to comment.