diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 44cae04081a9..f4ce2af12bae 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -53,6 +53,7 @@ add_library( NestedLoopJoinProbe.cpp Operator.cpp OperatorUtils.cpp + PrefixSort.cpp OrderBy.cpp PartitionedOutput.cpp OutputBuffer.cpp diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp new file mode 100644 index 000000000000..ce7858ce4311 --- /dev/null +++ b/velox/exec/PrefixSort.cpp @@ -0,0 +1,230 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/PrefixSort.h" + +using namespace facebook::velox::exec::prefixsort; + +namespace facebook::velox::exec { + +namespace { + +template +FOLLY_ALWAYS_INLINE void encodeRowColumn( + const PrefixSortLayout& prefixSortLayout, + const uint32_t index, + const RowColumn& rowColumn, + char* const row, + char* const prefix) { + std::optional value; + if (RowContainer::isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) { + value = std::nullopt; + } else { + value = *(reinterpret_cast(row + rowColumn.offset())); + } + prefixSortLayout.encoders[index].encode( + value, prefix + prefixSortLayout.prefixOffsets[index]); +} + +FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix( + TypeKind typeKind, + const PrefixSortLayout& prefixSortLayout, + const uint32_t index, + const RowColumn& rowColumn, + char* const row, + char* const prefix) { + switch (typeKind) { + case TypeKind::INTEGER: { + encodeRowColumn(prefixSortLayout, index, rowColumn, row, prefix); + return; + } + case TypeKind::BIGINT: { + encodeRowColumn(prefixSortLayout, index, rowColumn, row, prefix); + return; + } + case TypeKind::REAL: { + encodeRowColumn(prefixSortLayout, index, rowColumn, row, prefix); + return; + } + case TypeKind::DOUBLE: { + encodeRowColumn(prefixSortLayout, index, rowColumn, row, prefix); + return; + } + case TypeKind::TIMESTAMP: { + encodeRowColumn( + prefixSortLayout, index, rowColumn, row, prefix); + return; + } + default: + VELOX_UNSUPPORTED( + "prefix-sort does not support type kind: {}", + mapTypeKindToName(typeKind)); + } +} + +} // namespace + +PrefixSortLayout PrefixSortLayout::makeSortLayout( + const std::vector& types, + const std::vector& compareFlags, + uint32_t maxNormalizedKeySize) { + uint32_t normalizedKeySize = 0; + uint32_t numNormalizedKeys = 0; + const uint32_t numKeys = types.size(); + std::vector prefixOffsets; + std::vector encoders; + + // Calculate encoders and prefix-offsets, and stop the loop if a key that + // cannot be normalized is encountered. + for (auto i = 0; i < numKeys; ++i) { + if (normalizedKeySize > maxNormalizedKeySize) { + break; + } + std::optional encodedSize = + PrefixSortEncoder::encodedSize(types[i]->kind()); + if (encodedSize.has_value()) { + prefixOffsets.push_back(normalizedKeySize); + encoders.push_back( + {compareFlags[i].ascending, compareFlags[i].nullsFirst}); + normalizedKeySize += encodedSize.value(); + numNormalizedKeys++; + } else { + break; + } + } + return PrefixSortLayout{ + normalizedKeySize + sizeof(char*), + normalizedKeySize, + numNormalizedKeys, + numKeys, + compareFlags, + numNormalizedKeys < numKeys, + std::move(prefixOffsets), + std::move(encoders)}; +} + +FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys( + char* left, + char* right) { + return memcmp(left, right, sortLayout_.normalizedKeySize); +} + +int PrefixSort::comparePartNormalizedKeys(char* left, char* right) { + int result = compareAllNormalizedKeys(left, right); + if (result != 0) { + return result; + } + // If prefixes are equal, compare the left sort keys with rowContainer. + char* leftAddress = getAddressFromPrefix(left); + char* rightAddress = getAddressFromPrefix(right); + for (auto i = sortLayout_.numNormalizedKeys; i < sortLayout_.numKeys; ++i) { + result = rowContainer_->compare( + leftAddress, rightAddress, i, sortLayout_.compareFlags[i]); + if (result != 0) { + return result; + } + } + return 0; +} + +PrefixSort::PrefixSort( + memory::MemoryPool* pool, + RowContainer* rowContainer, + const std::vector& keyCompareFlags, + const PrefixSortConfig& config) + : pool_(pool), + sortLayout_(PrefixSortLayout::makeSortLayout( + rowContainer->keyTypes(), + keyCompareFlags, + config.maxNormalizedKeySize)), + rowContainer_(rowContainer) {} + +void PrefixSort::extractRowToPrefix(char* row, char* prefix) { + for (auto i = 0; i < sortLayout_.numNormalizedKeys; i++) { + extractRowColumnToPrefix( + rowContainer_->keyTypes()[i]->kind(), + sortLayout_, + i, + rowContainer_->columnAt(i), + row, + prefix); + } + // Set row address. + getAddressFromPrefix(prefix) = row; +} + +void PrefixSort::sort(std::vector& rows) { + VELOX_CHECK_GT(rows.size(), 0); + // All keys can not normalize, skip the binary string compare opt. + // Although benchmark time cost is nearly the same, it could avoid + // prefixAllocation. + if (sortLayout_.numNormalizedKeys == 0) { + std::sort( + rows.begin(), + rows.end(), + [&](const char* leftRow, const char* rightRow) { + for (vector_size_t index = 0; index < sortLayout_.numKeys; ++index) { + if (auto result = rowContainer_->compare( + leftRow, + rightRow, + index, + sortLayout_.compareFlags[index])) { + return result < 0; + } + } + return false; + }); + return; + } + + const auto numRows = rows.size(); + const auto entrySize = sortLayout_.entrySize; + memory::ContiguousAllocation prefixAllocation; + + // 1. Allocate prefixes data. + { + const auto numPages = + memory::AllocationTraits::numPages(numRows * entrySize); + pool_->allocateContiguous(numPages, prefixAllocation); + } + char* const prefixes = prefixAllocation.data(); + + // 2. Extract rows to prefixes with row address. + for (auto i = 0; i < rows.size(); ++i) { + extractRowToPrefix(rows[i], prefixes + entrySize * i); + } + + // 3. Sort prefixes with row address. + { + const auto swapBuffer = AlignedBuffer::allocate(entrySize, pool_); + PrefixSortRunner sortRunner(entrySize, swapBuffer->asMutable()); + const auto start = prefixes; + const auto end = prefixes + numRows * entrySize; + if (sortLayout_.hasNonNormalizedKey) { + sortRunner.quickSort(start, end, [&](char* a, char* b) { + return comparePartNormalizedKeys(a, b); + }); + } else { + sortRunner.quickSort(start, end, [&](char* a, char* b) { + return compareAllNormalizedKeys(a, b); + }); + } + } + // 4. Output sorted row addresses. + for (int i = 0; i < rows.size(); i++) { + rows[i] = getAddressFromPrefix(prefixes + i * entrySize); + } +} +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h new file mode 100644 index 000000000000..3a4c5a67c16c --- /dev/null +++ b/velox/exec/PrefixSort.h @@ -0,0 +1,121 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/common/memory/MemoryAllocator.h" +#include "velox/exec/RowContainer.h" +#include "velox/exec/prefixsort/PrefixSortAlgorithm.h" +#include "velox/exec/prefixsort/PrefixSortEncoder.h" + +namespace facebook::velox::exec { + +struct PrefixSortConfig { + PrefixSortConfig(const uint32_t maxNormalizedKeySize) + : maxNormalizedKeySize(maxNormalizedKeySize) {} + /// Max number of bytes can store normalized keys in prefix-sort buffer per + /// entry. + const uint32_t maxNormalizedKeySize; +}; + +/// The layout of prefix-sort buffer, a prefix entry includes: +/// 1. normalized keys +/// 2. non-normalized data ptr for semi-normalized types such as +/// string_view`s ptr, it will be filled when support Varchar. +/// 3. the row address ptr point to RowContainer`s rows is added at the end of +// prefix. +struct PrefixSortLayout { + /// Number of bytes to store a prefix, it equals to + /// normalizedKeySize_ + 8 (non-normalized-ptr) + sizeof(char*). + const uint64_t entrySize; + + /// If a sort key supports normalization and can be added to the prefix + /// sort buffer, it is called a normalized key. + const uint32_t normalizedKeySize; + + const uint32_t numNormalizedKeys; + + /// The num of sort keys include normalized and non-normalized. + const uint32_t numKeys; + + /// CompareFlags of all sort keys. + const std::vector compareFlags; + + /// Whether the sort keys contains non-normalized key. + const bool hasNonNormalizedKey; + + /// Offsets of normalized keys, used to find write locations when + /// extracting columns + const std::vector prefixOffsets; + + /// The encoders for normalized keys. + const std::vector encoders; + + static PrefixSortLayout makeSortLayout( + const std::vector& types, + const std::vector& compareFlags, + uint32_t maxNormalizedKeySize); +}; + +class PrefixSort { + public: + PrefixSort( + memory::MemoryPool* pool, + RowContainer* rowContainer, + const std::vector& keyCompareFlags, + const PrefixSortConfig& config); + + /// Follow the steps below to sort the data in RowContainer: + /// 1. Allocate a contiguous block of memory to store normalized keys. + /// 2. Extract the sort keys from the RowContainer. If the key can be + /// normalized, normalize it. For this kind of keys can be normalized,we + /// combine them with the original row address ptr and store them + /// together into a buffer, called 'Prefix'. + /// 3. Sort the prefixes data we got in step 2. + /// For keys can normalized(All fixed width types), we use 'memcmp' to compare + /// the normalized binary string. + /// For keys can not normalized, we use RowContainer`s compare method to + /// compare value. + /// For keys can part-normalized(Varchar, Row etc.), we will store the + /// normalized part and points to raw data in prefix, and custom the points + /// compare. The compare strategy will be defined in PrefixSortLayout as + /// follow-up, we treat this part as non-normalized until we implement all + /// fixed width types. + /// For complex types, e.g. ROW that can be converted to scalar types will be + /// supported. + /// 4. Extract the original row address ptr from prefixes (previously stored + /// them in the prefix buffer) into the input rows vector. + /// + /// @param rows The result of RowContainer::listRows(), assuming that the + /// caller (SortBuffer etc.) has already got the result. + void sort(std::vector& rows); + + private: + int compareAllNormalizedKeys(char* left, char* right); + + int comparePartNormalizedKeys(char* left, char* right); + + void extractRowToPrefix(char* row, char* prefix); + + // Return the reference of row address ptr for read/write. + FOLLY_ALWAYS_INLINE char*& getAddressFromPrefix(char* prefix) { + return *reinterpret_cast(prefix + sortLayout_.normalizedKeySize); + } + + memory::MemoryPool* pool_; + const PrefixSortLayout sortLayout_; + RowContainer* rowContainer_; +}; +} // namespace facebook::velox::exec diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 92fd47a6bb60..eac595cbadf0 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -49,3 +49,8 @@ if(${VELOX_ENABLE_PARQUET}) arrow thrift) endif() + +add_executable(velox_prefixsort_benchmark PrefixSortBenchmark.cpp) + +target_link_libraries(velox_prefixsort_benchmark velox_exec velox_vector_fuzzer + velox_vector_test_lib ${FOLLY_BENCHMARK}) diff --git a/velox/exec/benchmarks/PrefixSortBenchmark.cpp b/velox/exec/benchmarks/PrefixSortBenchmark.cpp new file mode 100644 index 000000000000..e155d123ab74 --- /dev/null +++ b/velox/exec/benchmarks/PrefixSortBenchmark.cpp @@ -0,0 +1,236 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "glog/logging.h" +#include "velox/exec/PrefixSort.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; + +namespace { + +// Init memoryManager +memory::MemoryPool* rootPool() { + static std::shared_ptr rootPool_ = []() { + memory::MemoryManager::initialize({}); + return memory::memoryManager()->addRootPool("PrefixSortTest", 4L << 30); + }(); + return rootPool_.get(); +} + +class TestCase { + public: + TestCase( + memory::MemoryPool* pool, + const std::string& testName, + size_t numRows, + const RowTypePtr& rowType) + : testName_(testName), numRows_(numRows) { + data_ = std::make_unique(rowType->children(), pool); + RowVectorPtr sortedRows = makeFuzzRows(pool, numRows, rowType); + storeRows(numRows, sortedRows, data_.get(), rows_); + }; + + const std::string& testName() const { + return testName_; + } + + size_t numRows() const { + return numRows_; + } + + const std::vector& rows() const { + return rows_; + } + + RowContainer* rowContainer() const { + return data_.get(); + } + + private: + // Store data into a RowContainer to mock the behavior of SortBuffer. + void storeRows( + int numRows, + const RowVectorPtr& data, + RowContainer* rowContainer, + std::vector& rows) { + rows.resize(numRows); + for (int row = 0; row < numRows; ++row) { + rows[row] = rowContainer->newRow(); + } + for (int column = 0; column < data->childrenSize(); ++column) { + DecodedVector decoded(*data->childAt(column)); + for (int i = 0; i < numRows; ++i) { + char* row = rows[i]; + rowContainer->store(decoded, i, row, column); + } + } + } + + // Fuzz test rows : for front columns (column 0 to size -2) use high + // nullRatio to enforce all columns to be compared. + RowVectorPtr makeFuzzRows( + memory::MemoryPool* pool, + size_t numRows, + const RowTypePtr& rowType) { + VectorFuzzer fuzzer({.vectorSize = numRows}, pool); + VectorFuzzer fuzzerWithNulls( + {.vectorSize = numRows, .nullRatio = 0.7}, pool); + if (rowType->children().size() == 1) { + return fuzzer.fuzzRow(rowType); + } + std::vector children; + for (int column = 0; column < rowType->children().size() - 1; ++column) { + children.push_back(fuzzerWithNulls.fuzz(rowType->childAt(column))); + } + children.push_back( + fuzzer.fuzz(rowType->childAt(rowType->children().size() - 1))); + return std::make_shared( + pool, rowType, nullptr, numRows, std::move(children)); + } + + const std::string testName_; + const size_t numRows_; + // Rows address stored in RowContainer + std::vector rows_; + std::unique_ptr data_; +}; + +class PrefixSortBenchmark { + public: + PrefixSortBenchmark() : pool_(rootPool()->addLeafChild("PrefixSortTest")) {} + + void runPrefixSort( + const std::vector& rows, + RowContainer* rowContainer, + const std::vector& compareFlags) { + PrefixSortConfig prefixSortConfig(1024); + PrefixSort prefixSort( + pool_.get(), rowContainer, compareFlags, prefixSortConfig); + // Copy rows to sortedRows to avoid sort rows already sorted. + std::vector sortedRows = rows; + prefixSort.sort(sortedRows); + } + + void runStdSort( + const std::vector& rows, + RowContainer* rowContainer, + const std::vector& compareFlags) { + std::vector sortedRows = rows; + std::sort( + sortedRows.begin(), + sortedRows.end(), + [&](const char* leftRow, const char* rightRow) { + for (vector_size_t index = 0; index < compareFlags.size(); ++index) { + if (auto result = rowContainer->compare( + leftRow, rightRow, index, compareFlags[index])) { + return result < 0; + } + } + return false; + }); + } + + // Add benchmark manually to avoid writing a lot of BENCHMARK. + void benchMark( + const std::string& testName, + size_t numRows, + const RowTypePtr& rowType, + int iterateTimes = 1) { + // CompareFlags could be same in benchmark. + std::vector compareFlags; + for (int i = 0; i < rowType->children().size(); ++i) { + compareFlags.push_back( + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}); + } + auto testCase = + std::make_unique(pool_.get(), testName, numRows, rowType); + // Add benchmarks for std-sort and prefix-sort. + { + folly::addBenchmark( + __FILE__, + "StdSort_" + testCase->testName(), + [rows = testCase->rows(), + container = testCase->rowContainer(), + sortFlags = compareFlags, + iterateTimes = iterateTimes, + this]() { + for (auto i = 0; i < iterateTimes; ++i) { + runStdSort(rows, container, sortFlags); + } + return 1; + }); + folly::addBenchmark( + __FILE__, + "PrefixSort_" + testCase->testName(), + [rows = testCase->rows(), + container = testCase->rowContainer(), + sortFlags = compareFlags, + iterateTimes = iterateTimes, + this]() { + for (auto i = 0; i < iterateTimes; ++i) { + runPrefixSort(rows, container, sortFlags); + } + return 1; + }); + } + testCases_.push_back(std::move(testCase)); + } + + private: + std::vector> testCases_; + const std::shared_ptr pool_; +}; +} // namespace + +int main(int argc, char** argv) { + folly::Init init(&argc, &argv); + + PrefixSortBenchmark bm; + + const RowTypePtr rowBigint = ROW({BIGINT()}); + const RowTypePtr row2Bigints = ROW({BIGINT(), BIGINT()}); + const RowTypePtr row3Bigints = ROW({BIGINT(), BIGINT(), BIGINT()}); + const RowTypePtr row4Bigints = ROW({BIGINT(), BIGINT(), BIGINT(), BIGINT()}); + + bm.benchMark("1_Bigint_1k", 1L << 10, rowBigint, 100); + bm.benchMark("2_Bigints_1k", 1L << 10, row2Bigints, 100); + bm.benchMark("3_Bigints_1k", 1L << 10, row3Bigints, 100); + bm.benchMark("4_Bigints_1k", 1L << 10, row4Bigints, 100); + + bm.benchMark("1_Bigint_10k", 10L << 10, rowBigint, 100); + bm.benchMark("2_Bigints_10k", 10L << 10, row2Bigints, 100); + bm.benchMark("3_Bigints_10k", 10L << 10, row3Bigints, 100); + bm.benchMark("4_Bigints_10k", 10L << 10, row4Bigints, 100); + + bm.benchMark("1_Bigint_100k", 100L << 10, rowBigint, 100); + bm.benchMark("2_Bigints_100k", 100L << 10, row2Bigints, 100); + bm.benchMark("3_Bigints_100k", 100L << 10, row3Bigints, 100); + bm.benchMark("4_Bigints_100k", 100L << 10, row4Bigints, 100); + + // For varchar type is still not support normalization, so the performance of + // prefix-sort and std-sort is the same. + const RowTypePtr rowVarchar = ROW({VARCHAR()}); + bm.benchMark("Varchar_1k", 1L << 10, rowVarchar, 100); + bm.benchMark("Varchar_10k", 10L << 10, rowVarchar, 100); + bm.benchMark("Varchar_100k", 100L << 10, rowVarchar, 100); + + folly::runBenchmarks(); + return 0; +} diff --git a/velox/exec/prefixsort/PrefixSortEncoder.h b/velox/exec/prefixsort/PrefixSortEncoder.h index 7408390ecabf..1323c43a4eb9 100644 --- a/velox/exec/prefixsort/PrefixSortEncoder.h +++ b/velox/exec/prefixsort/PrefixSortEncoder.h @@ -23,6 +23,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/base/SimdUtil.h" #include "velox/type/Timestamp.h" +#include "velox/type/Type.h" namespace facebook::velox::exec::prefixsort { @@ -65,6 +66,31 @@ class PrefixSortEncoder { return nullsFirst_; } + /// @return For supported types, returns the encoded size, assume nullable. + /// For not supported types, returns 'std::nullopt'. + FOLLY_ALWAYS_INLINE static std::optional encodedSize( + TypeKind typeKind) { + switch ((typeKind)) { + case ::facebook::velox::TypeKind::INTEGER: { + return 5; + } + case ::facebook::velox::TypeKind::BIGINT: { + return 9; + } + case ::facebook::velox::TypeKind::REAL: { + return 5; + } + case ::facebook::velox::TypeKind::DOUBLE: { + return 9; + } + case ::facebook::velox::TypeKind::TIMESTAMP: { + return 17; + } + default: + return std::nullopt; + } + } + private: const bool ascending_; const bool nullsFirst_; diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 21f1b70609ef..6e213a7607fa 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -83,7 +83,8 @@ add_executable( ValuesTest.cpp WindowFunctionRegistryTest.cpp WindowTest.cpp - SortBufferTest.cpp) + SortBufferTest.cpp + PrefixSortTest.cpp) add_executable( velox_exec_infra_test diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp new file mode 100644 index 000000000000..25ba0c7ba3c2 --- /dev/null +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -0,0 +1,353 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "velox/exec/SortBuffer.h" + +#include "velox/exec/PrefixSort.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" + +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox; +using namespace facebook::velox::memory; +using namespace facebook::velox::test; + +namespace facebook::velox::exec::prefixsort::test { + +class TestSetting { + public: + TestSetting( + const std::vector& sortCompareFlags, + const RowVectorPtr& expectedResult) + : sortCompareFlags_(sortCompareFlags), expectedResult_(expectedResult){}; + + const std::vector& sortCompareFlags() const { + return sortCompareFlags_; + } + + const RowVectorPtr& expectedResult() const { + return expectedResult_; + } + + std::string debugString() const { + const auto numRows = expectedResult_->size(); + const std::string expectedResultStr = + expectedResult_->toString(0, numRows > 100 ? 100 : numRows); + std::stringstream sortCompareFlagsStr; + for (const auto sortCompareFlag : sortCompareFlags_) { + sortCompareFlagsStr << sortCompareFlag.toString(); + } + return fmt::format( + "sortCompareFlags:{}, \nexpectedResult:\n{}", + sortCompareFlagsStr.str(), + expectedResultStr); + } + + private: + std::vector sortCompareFlags_; + RowVectorPtr expectedResult_; +}; + +class PrefixSortTest : public OperatorTestBase { + protected: + std::vector + storeRows(int numRows, const RowVectorPtr& sortedRows, RowContainer* data); + + void testSort( + int numRows, + const std::vector testSettings, + const RowVectorPtr& sortedRows); + + // Use std::sort to generate expected result. + const RowVectorPtr generateExpectedResult( + const std::vector compareFlags, + int numRows, + const RowVectorPtr& sortedRows); + + // 20 MB + const int64_t maxBytes_ = 20LL << 20; + const std::shared_ptr rootPool_{ + memory::MemoryManager::getInstance()->addRootPool( + "PrefixSortTest", + maxBytes_)}; + + const std::shared_ptr pool_{ + rootPool_->addLeafChild("PrefixSortTest", maxBytes_)}; + + const std::shared_ptr fuzzerPool = + rootPool_->addLeafChild("VectorFuzzer"); +}; + +std::vector PrefixSortTest::storeRows( + int numRows, + const RowVectorPtr& sortedRows, + RowContainer* data) { + std::vector rows; + SelectivityVector allRows(numRows); + rows.resize(numRows); + for (int row = 0; row < numRows; ++row) { + rows[row] = data->newRow(); + } + for (int column = 0; column < sortedRows->childrenSize(); ++column) { + DecodedVector decoded(*sortedRows->childAt(column), allRows); + for (int i = 0; i < numRows; ++i) { + char* row = rows[i]; + data->store(decoded, i, row, column); + } + } + return rows; +} + +const RowVectorPtr PrefixSortTest::generateExpectedResult( + const std::vector compareFlags, + int numRows, + const RowVectorPtr& sortedRows) { + const auto sortedRowsType = asRowType(sortedRows->type()); + const int sortKeysNum = sortedRows->childrenSize(); + RowContainer rowContainer(sortedRowsType->children(), pool_.get()); + + std::vector rows = storeRows(numRows, sortedRows, &rowContainer); + std::sort( + rows.begin(), rows.end(), [&](const char* leftRow, const char* rightRow) { + for (vector_size_t index = 0; index < sortKeysNum; ++index) { + if (auto result = rowContainer.compare( + leftRow, rightRow, index, compareFlags[index])) { + return result < 0; + } + } + return false; + }); + const RowVectorPtr result = + BaseVector::create(sortedRowsType, numRows, pool_.get()); + for (int column = 0; column < sortedRows->childrenSize(); ++column) { + rowContainer.extractColumn( + rows.data(), numRows, column, result->childAt(column)); + } + return result; +} + +void PrefixSortTest::testSort( + const int numRows, + const std::vector testSettings, + const RowVectorPtr& sortedRows) { + const auto sortedRowsType = asRowType(sortedRows->type()); + for (const auto& setting : testSettings) { + SCOPED_TRACE(setting.debugString()); + // 1. Store test rows into rowContainer. + RowContainer rowContainer(sortedRowsType->children(), pool_.get()); + std::vector rows = storeRows(numRows, sortedRows, &rowContainer); + // 2. Use PrefixSort sort rows. + PrefixSortConfig prefixSortConfig(1024); + PrefixSort prefixSort( + pool_.get(), + &rowContainer, + setting.sortCompareFlags(), + prefixSortConfig); + prefixSort.sort(rows); + + // 3. Extract columns from row container. + const RowVectorPtr actual = + BaseVector::create(sortedRowsType, numRows, pool_.get()); + for (int column = 0; column < sortedRows->childrenSize(); ++column) { + rowContainer.extractColumn( + rows.data(), numRows, column, actual->childAt(column)); + } + // 4. Compare acutal & expected + assertEqualVectors(actual, setting.expectedResult()); + } +} + +TEST_F(PrefixSortTest, singleKey) { + const int numRows = 5; + const int columnsSize = 7; + + // Vectors without nulls. + const std::vector testData = { + makeFlatVector({5, 4, 3, 2, 1}), + makeFlatVector({5, 4, 3, 2, 1}), + makeFlatVector({5, 4, 3, 2, 1}), + makeFlatVector({5.5, 4.4, 3.3, 2.2, 1.1}), + makeFlatVector({5.5, 4.4, 3.3, 2.2, 1.1}), + makeFlatVector( + {Timestamp(5, 5), + Timestamp(4, 4), + Timestamp(3, 3), + Timestamp(2, 2), + Timestamp(1, 1)}), + makeFlatVector({"eee", "ddd", "ccc", "bbb", "aaa"})}; + for (int i = 5; i < columnsSize; ++i) { + const auto sortedRows = makeRowVector({testData[i]}); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + const auto ascResult = generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } +} + +TEST_F(PrefixSortTest, singleKeyWithNulls) { + const int numRows = 5; + const int columnsSize = 7; + + Timestamp ts = {5, 5}; + // Vectors with nulls. + const std::vector testData = { + makeNullableFlatVector({5, 4, std::nullopt, 2, 1}), + makeNullableFlatVector({5, 4, std::nullopt, 2, 1}), + makeNullableFlatVector({5, 4, std::nullopt, 2, 1}), + makeNullableFlatVector({5.5, 4.4, std::nullopt, 2.2, 1.1}), + makeNullableFlatVector({5.5, 4.4, std::nullopt, 2.2, 1.1}), + makeNullableFlatVector( + {Timestamp(5, 5), + Timestamp(4, 4), + std::nullopt, + Timestamp(2, 2), + Timestamp(1, 1)}), + makeNullableFlatVector( + {"eee", "ddd", std::nullopt, "bbb", "aaa"})}; + + for (int i = 5; i < columnsSize; ++i) { + const auto sortedRows = makeRowVector({testData[i]}); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + + const auto ascResult = generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } +} + +TEST_F(PrefixSortTest, multipleKeys) { + // Test all keys normalized : bigint, integer + { + const int numRows = 5; + const std::vector testData = { + makeNullableFlatVector({5, 2, std::nullopt, 2, 1}), + makeNullableFlatVector({5, 4, std::nullopt, 2, 1})}; + const auto sortedRows = makeRowVector(testData); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + + const auto ascResult = generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } + + // Test keys with semi-normalized : bigint, varchar + { + const int numRows = 5; + const std::vector testData = { + makeNullableFlatVector({5, 2, std::nullopt, 2, 1}), + makeNullableFlatVector( + {"eee", "ddd", std::nullopt, "bbb", "aaa"})}; + const auto sortedRows = makeRowVector(testData); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + + const auto ascResult = generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } +} + +TEST_F(PrefixSortTest, fuzz) { + std::vector allTypes = { + INTEGER(), + BOOLEAN(), + TINYINT(), + SMALLINT(), + BIGINT(), + HUGEINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + VARCHAR(), + VARBINARY()}; + const int numRows = 10240; + for (const auto& type : allTypes) { + VectorFuzzer fuzzer( + {.vectorSize = numRows, .nullRatio = 0.1}, fuzzerPool.get()); + RowVectorPtr sortedRows = fuzzer.fuzzRow(ROW({type})); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + const auto ascResult = generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } +} + +TEST_F(PrefixSortTest, fuzzMulti) { + std::vector allTypes = { + INTEGER(), + BOOLEAN(), + TINYINT(), + SMALLINT(), + BIGINT(), + HUGEINT(), + REAL(), + DOUBLE(), + TIMESTAMP(), + VARCHAR(), + VARBINARY()}; + const int numRows = 10240; + VectorFuzzer fuzzer( + {.vectorSize = numRows, .nullRatio = 0.1}, fuzzerPool.get()); + for (const auto& type1 : allTypes) { + for (const auto& type2 : allTypes) { + RowVectorPtr sortedRows = fuzzer.fuzzRow(ROW({type1, type2})); + std::vector ascFlag = { + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, true, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + std::vector dscFlag = { + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}, + {true, false, false, CompareFlags::NullHandlingMode::kNullAsValue}}; + const auto ascResult = + generateExpectedResult(ascFlag, numRows, sortedRows); + const auto dscResult = + generateExpectedResult(dscFlag, numRows, sortedRows); + std::vector testSettings = { + TestSetting(ascFlag, ascResult), TestSetting(dscFlag, dscResult)}; + testSort(numRows, testSettings, sortedRows); + } + } +} +} // namespace facebook::velox::exec::prefixsort::test