diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 23440ffcaef7..dbc9a48f11f4 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -357,6 +357,15 @@ class QueryConfig { /// Maximum number of splits to preload. Set to 0 to disable preloading. static constexpr const char* kMaxSplitPreloadPerDriver = "max_split_preload_per_driver"; + static constexpr const char* kEnablePrefixSort = "enable_prefix_sort"; + + static constexpr const char* kPrefixSortMaxKeyLength = + "prefix_sort_max_key_length"; + + // TODO: for testing , remove as follow-up + static constexpr const char* kEnablePrefixSortWithIterator = + "enable_prefix_sort_with_iterater"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( @@ -714,6 +723,19 @@ class QueryConfig { return get(kMaxSplitPreloadPerDriver, 2); } + bool isPrefixSortEnabled() const { + return get(kEnablePrefixSort, false); + } + + bool isPrefixSortEnabledWithIterator() const { + return get(kEnablePrefixSortWithIterator, false); + } + + uint32_t prefixSortMaxKeyLength() const { + return get( + kPrefixSortMaxKeyLength, std::numeric_limits::max()); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 44cae04081a9..f4ce2af12bae 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -53,6 +53,7 @@ add_library( NestedLoopJoinProbe.cpp Operator.cpp OperatorUtils.cpp + PrefixSort.cpp OrderBy.cpp PartitionedOutput.cpp OutputBuffer.cpp diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index f78b4517e671..2098f69be322 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -66,7 +66,11 @@ OrderBy::OrderBy( pool(), &nonReclaimableSection_, spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr, - operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold()); + operatorCtx_->driverCtx()->queryConfig().orderBySpillMemoryThreshold(), + driverCtx->queryConfig().isPrefixSortEnabled() + ? std::make_optional( + driverCtx->queryConfig().prefixSortMaxKeyLength()) + : std::nullopt); } void OrderBy::addInput(RowVectorPtr input) { diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp new file mode 100644 index 000000000000..a886fde6c2bf --- /dev/null +++ b/velox/exec/PrefixSort.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PrefixSort.h" + +namespace facebook::velox::exec { +void PrefixSort::extractRowToPrefix( + char* FOLLY_NONNULL row, + char* FOLLY_NONNULL prefix) { + // extract key + for (int32_t index = 0; index < sortLayout_.numPrefixKeys_; index++) { + VELOX_DYNAMIC_TYPE_DISPATCH( + rowToPrefix, + rowContainer_->keyTypes()[index]->kind(), + index, + rowContainer_->columnAt(index), + row, + prefix); + } + // Set address of row. + *reinterpret_cast(prefix + sortLayout_.keySize) = row; +} + +void PrefixSort::preparePrefix() { + // Compute prefix offsets for sort columns. + uint32_t offset = 0; + for (int i = 0; i < sortLayout_.numPrefixKeys_; i++) { + prefixOffsets.push_back(offset); + offset += prefixKeySize(rowContainer_->keyTypes()[i]->kind()); + } + int32_t numRows = numInputRows_; + // Allocate prefixes_ data. + constexpr auto kPageSize = memory::AllocationTraits::kPageSize; + auto numPages = + bits::roundUp(numRows * sortLayout_.entrySize, kPageSize) / kPageSize; + rowContainer_->pool()->allocateContiguous(numPages, prefixAllocation); + prefixes_ = prefixAllocation.data(); +} + +void PrefixSort::sort(std::vector& rows) { + RowContainerIterator iter; + rowContainer_->listRows(&iter, numInputRows_, rows.data()); + for (uint64_t i = 0; i < rows.size(); ++i) { + extractRowToPrefix(rows[i], prefixes_ + sortLayout_.entrySize * i); + } + auto swapBuffer = AlignedBuffer::allocate( + sortLayout_.entrySize, rowContainer_->pool()); + PrefixSortRunner sortRunner( + sortLayout_.entrySize, swapBuffer->asMutable()); + auto start = prefixes_; + auto end = prefixes_ + numInputRows_ * sortLayout_.entrySize; + + sortRunner.quickSort( + start, end, [&](char* a, char* b) { return compare(a, b); }); + + for (int i = 0; i < rows.size(); i++) { + rows[i] = *reinterpret_cast( + prefixes_ + i * sortLayout_.entrySize + sortLayout_.keySize); + } +} + +} // namespace facebook::velox::exec \ No newline at end of file diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h new file mode 100644 index 000000000000..4aa6fcb23a85 --- /dev/null +++ b/velox/exec/PrefixSort.h @@ -0,0 +1,178 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "RowContainer.h" +#include "string.h" +#include "velox/common/memory/Allocation.h" +#include "velox/common/memory/AllocationPool.h" +#include "velox/common/memory/HashStringAllocator.h" +#include "velox/common/memory/MemoryAllocator.h" +#include "velox/exec/prefixsort/PrefixSortAlgorithm.h" +#include "velox/exec/prefixsort/PrefixSortEncoder.h" + +using namespace facebook::velox::exec::prefixsort; + +namespace facebook::velox::exec { + +struct PrefixSortConfig { + PrefixSortConfig( + const uint32_t maxPrefixKeyLength) + : maxPrefixKeyLength(maxPrefixKeyLength) {} + uint32_t maxPrefixKeyLength; +}; + +struct PrefixSortLayout { + PrefixSortLayout( + const std::vector& types, + const std::vector& keyCompareFlags, + const uint32_t maxPrefixKeyLength) + : keySize(0), + numPrefixKeys_(0), + numSortKeys_(types.size()), + keyCompareFlags_(keyCompareFlags) { + VELOX_CHECK(types.size() > 0); + for (TypePtr type : types) { + if (numPrefixKeys_ > maxPrefixKeyLength) { + break; + } + if (type->kind() == TypeKind::BIGINT) { + numPrefixKeys_++; + keySize += sizeof(TypeTraits::NativeType); + } else { + break; + } + } + entrySize = keySize + sizeof(char*); + if (numPrefixKeys_ < numSortKeys_) { + needSortData = true; + } + } + + // prefix size is fixed. + uint32_t keySize; + uint64_t entrySize; + int32_t numPrefixKeys_; + const int32_t numSortKeys_; + std::vector keyCompareFlags_; + bool needSortData = false; +}; + +class PrefixSort { + public: + PrefixSort( + RowContainer* FOLLY_NONNULL rowContainer, + const std::vector& keyCompareFlags, + size_t numInputRows, + const PrefixSortConfig& config) + : sortLayout_( + rowContainer->keyTypes(), + keyCompareFlags, + config.maxPrefixKeyLength), + rowContainer_(rowContainer), + numInputRows_(numInputRows) {} + + // Implement the prepare and sort methods separately to + // facilitate the collection of metrics. + void preparePrefix(); + + void sort(std::vector& rows); + + int compare(char* left, char* right) { + if (!sortLayout_.needSortData) { + return memcmp(left, right, (size_t)sortLayout_.keySize); + } else { + int result = memcmp(left, right, (size_t)sortLayout_.keySize); + if (result != 0) { + return result; + } + char* leftAddress = getAddressFromPrefix(left); + char* rightAddress = getAddressFromPrefix(right); + for (int i = sortLayout_.numPrefixKeys_; i < sortLayout_.numSortKeys_; + i++) { + result = rowContainer_->compare( + leftAddress, rightAddress, i, sortLayout_.keyCompareFlags_[i]); + if (result != 0) { + return result; + } + } + } + return 0; + } + + private: + void extractRowToPrefix(char* row, char* prefix); + + template + inline void rowToPrefix( + uint32_t index, + const RowColumn& rowColumn, + char* FOLLY_NONNULL row, + char* FOLLY_NONNULL prefix) { + VELOX_UNSUPPORTED("prefix sort not support the type."); + } + + uint32_t prefixKeySize(const TypeKind& typeKind) { + if (typeKind == TypeKind::BIGINT) { + return sizeof(TypeTraits::NativeType); + } + // TODO support varchar later + VELOX_UNSUPPORTED("prefix sort not support the type."); + } + + inline char* getAddressFromPrefix(char* prefix) { + return *reinterpret_cast(prefix + sortLayout_.keySize); + } + + // Store prefix and address for sort data. + memory::ContiguousAllocation prefixAllocation; + char* prefixes_; + PrefixSortLayout sortLayout_; + std::vector prefixOffsets; + RowContainer* rowContainer_; + size_t numInputRows_; +}; + +template <> +inline void PrefixSort::rowToPrefix( + uint32_t index, + const RowColumn& rowColumn, + char* FOLLY_NONNULL row, + char* FOLLY_NONNULL prefix) { + using T = TypeTraits::NativeType; + // store null as min/max value according compare flags. + if (RowContainer::isNullAt(row, rowColumn.nullByte(), rowColumn.nullMask())) { + CompareFlags compareFlags = sortLayout_.keyCompareFlags_[index]; + PrefixSortEncoder::encode( + ((compareFlags.ascending && compareFlags.nullsFirst) || + (!compareFlags.ascending && !compareFlags.nullsFirst)) + ? std::numeric_limits::min() + : std::numeric_limits::max(), + prefix + prefixOffsets[index]); + } else { + PrefixSortEncoder::encode( + *(reinterpret_cast(row + rowColumn.offset())), + prefix + prefixOffsets[index]); + } + // invert bits if desc + if (!sortLayout_.keyCompareFlags_[index].ascending) { + for (uint64_t s = 0; s < sizeof(T); s++) { + *(prefix + prefixOffsets[index] + s) = + ~*(prefix + prefixOffsets[index] + s); + } + } +} +} // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index dcd78ffeca34..061043904742 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -26,13 +26,15 @@ SortBuffer::SortBuffer( velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, const common::SpillConfig* spillConfig, - uint64_t spillMemoryThreshold) + uint64_t spillMemoryThreshold, + const std::optional& prefixSortConfig) : input_(input), sortCompareFlags_(sortCompareFlags), pool_(pool), nonReclaimableSection_(nonReclaimableSection), spillConfig_(spillConfig), - spillMemoryThreshold_(spillMemoryThreshold) { + spillMemoryThreshold_(spillMemoryThreshold), + prefixSortConfig_(prefixSortConfig) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); VELOX_CHECK_GT(sortCompareFlags_.size(), 0); VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size()); @@ -111,20 +113,30 @@ void SortBuffer::noMoreInput() { // the rows. sortedRows_.resize(numInputRows_); RowContainerIterator iter; - data_->listRows(&iter, numInputRows_, sortedRows_.data()); - std::sort( - sortedRows_.begin(), - sortedRows_.end(), - [this](const char* leftRow, const char* rightRow) { - for (vector_size_t index = 0; index < sortCompareFlags_.size(); - ++index) { - if (auto result = data_->compare( - leftRow, rightRow, index, sortCompareFlags_[index])) { - return result < 0; + if (prefixSortConfig_.has_value()) { + auto prefixSort = PrefixSort( + data_.get(), + sortCompareFlags_, + numInputRows_, + prefixSortConfig_.value()); + prefixSort.preparePrefix(); + prefixSort.sort(sortedRows_); + } else { + data_->listRows(&iter, numInputRows_, sortedRows_.data()); + std::sort( + sortedRows_.begin(), + sortedRows_.end(), + [this](const char* leftRow, const char* rightRow) { + for (vector_size_t index = 0; index < sortCompareFlags_.size(); + ++index) { + if (auto result = data_->compare( + leftRow, rightRow, index, sortCompareFlags_[index])) { + return result < 0; + } } - } - return false; - }); + return false; + }); + } } else { // Spill the remaining in-memory state to disk if spilling has been // triggered on this sort buffer. This is to simplify query OOM prevention diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index fa62460d203a..8fe72864b7f1 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -16,6 +16,7 @@ #pragma once +#include "PrefixSort.h" #include "velox/exec/ContainerRowSerde.h" #include "velox/exec/Operator.h" #include "velox/exec/OperatorUtils.h" @@ -37,7 +38,8 @@ class SortBuffer { velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, const common::SpillConfig* spillConfig = nullptr, - uint64_t spillMemoryThreshold = 0); + uint64_t spillMemoryThreshold = 0, + const std::optional& prefixSortConfig = std::nullopt); void addInput(const VectorPtr& input); @@ -134,6 +136,8 @@ class SortBuffer { std::optional estimatedOutputRowSize_{}; // The number of rows that has been returned. size_t numOutputRows_{0}; + + const std::optional prefixSortConfig_; }; } // namespace facebook::velox::exec diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 92fd47a6bb60..699c1889e09b 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -49,3 +49,9 @@ if(${VELOX_ENABLE_PARQUET}) arrow thrift) endif() + +add_executable(velox_prefix_sort_benchmark PrefixSortBenchmark.cpp) + +target_link_libraries( + velox_prefix_sort_benchmark velox_exec velox_vector_test_lib + velox_exec_test_lib ${FOLLY_BENCHMARK}) diff --git a/velox/exec/benchmarks/PrefixSortBenchmark.cpp b/velox/exec/benchmarks/PrefixSortBenchmark.cpp new file mode 100644 index 000000000000..cc7d06ea3228 --- /dev/null +++ b/velox/exec/benchmarks/PrefixSortBenchmark.cpp @@ -0,0 +1,173 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/exec/PlanNodeStats.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/parse/TypeResolver.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::test; + +namespace { +struct TestCase { + // Dataset to be processed by the below plans. + std::vector rows; + + std::shared_ptr _1key; + std::shared_ptr _2key; + std::shared_ptr _3key; +}; + +class PrefixSortBenchmark : public VectorTestBase { + public: + std::vector + makeRows(RowTypePtr type, int32_t numVectors, int32_t rowsPerVector) { + std::vector vectors; + for (int32_t i = 0; i < numVectors; ++i) { + auto vector = std::dynamic_pointer_cast( + BatchMaker::createBatch(type, rowsPerVector, *pool_)); + vectors.push_back(vector); + } + return vectors; + } + + template + void + setRandomInts(int32_t column, int32_t max, std::vector rows) { + for (auto& r : rows) { + auto values = r->childAt(column)->as>(); + for (auto i = 0; i < values->size(); ++i) { + values->set(i, folly::Random::rand32(rng_) % max); + } + } + } + + std::shared_ptr makeOrderByPlan( + std::vector keys, + std::vector data) { + assert(!data.empty()); + exec::test::PlanBuilder builder; + + auto& type = data[0]->type()->as(); + builder.values(data); + builder.orderBy(keys, false); + + return builder.planNode(); + } + + void makeBenchmark( + std::string name, + RowTypePtr type, + int64_t numVectors, + int32_t numPerVector) { + auto test = std::make_unique(); + test->rows = makeRows(type, numVectors, numPerVector); + // low selectivity for full compare + setRandomInts(0, 1, test->rows); + setRandomInts(1, 1, test->rows); + setRandomInts(2, 10000000, test->rows); + + test->_1key = makeOrderByPlan({"c2"}, test->rows); + folly::addBenchmark( + __FILE__, name + "_1key_base", [plan = &test->_1key, this]() { + run(*plan, "false"); + return 1; + }); + folly::addBenchmark( + __FILE__, name + "_1key_prefix_sort", [plan = &test->_1key, + this] + () { + run(*plan, "true"); + return 1; + }); + test->_2key = makeOrderByPlan({"c1", "c2"}, test->rows); + folly::addBenchmark( + __FILE__, name + "_2key_base", [plan = &test->_2key, this]() { + run(*plan, "false"); + return 1; + }); + + folly::addBenchmark( + __FILE__, name + "_2key_prefix_sort", [plan = &test->_2key, + this]() { + run(*plan, "true"); + return 1; + }); + test->_3key = makeOrderByPlan({"c0", "c1", "c2"}, test->rows); + folly::addBenchmark( + __FILE__, name + "_3key_base", [plan = &test->_3key, this]() { + run(*plan, "false"); + return 1; + }); + folly::addBenchmark( + __FILE__, name + "_3key_prefix_sort", [plan = &test->_3key, + this]() { + run(*plan, "true"); + return 1; + }); + + cases_.push_back(std::move(test)); + } + + int64_t run( + std::shared_ptr plan, + const std::string& enablePrefixSort) { + auto start = getCurrentTimeMicro(); + int32_t numRows = 0; + auto result = exec::test::AssertQueryBuilder(plan) + .config( + facebook::velox::core::QueryConfig::kEnablePrefixSort, + enablePrefixSort) + .copyResults(pool_.get()); + numRows += result->childAt(0)->as>()->valueAt(0); + auto elapsedMicros = getCurrentTimeMicro() - start; + return elapsedMicros; + } + + std::vector> cases_; + folly::Random::DefaultGenerator rng_; +}; +} // namespace + +int main(int argc, char** argv) { + folly::init(&argc, &argv); + memory::MemoryManager::initialize({}); + + functions::prestosql::registerAllScalarFunctions(); + aggregate::prestosql::registerAllAggregateFunctions(); + parse::registerTypeResolver(); + + PrefixSortBenchmark bm; + + auto bigint3 = ROW( + {{"c0", BIGINT()}, {"c1", BIGINT()}, {"c2", BIGINT()}, {"c3", BIGINT()}}); + + // Integers. + bm.makeBenchmark("Bigint_100K", bigint3, 10, 10000); + bm.makeBenchmark("Bigint_1000K", bigint3, 100, 10000); + bm.makeBenchmark("Bigint_10000K", bigint3, 1000, 10000); + + folly::runBenchmarks(); + return 0; +} diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index fd8f8a6db47f..40e205e9e386 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -169,6 +169,32 @@ class OrderByTest : public OperatorTestBase { } } + void runWithPrefixSort( + core::PlanNodePtr planNode, + const std::string& duckDbSql, + const std::vector& sortingKeys, + const std::optional& maxPrefixLength) { + auto queryCtx = std::make_shared(executor_.get()); + if (maxPrefixLength.has_value()) { + queryCtx->testingOverrideConfigUnsafe({ + {core::QueryConfig::kEnablePrefixSort, "true"}, + {core::QueryConfig::kPrefixSortMaxKeyLength, maxPrefixLength.value()}, + }); + std::string straceMessage = + "run with prefix sort with max prefix length limit " + + maxPrefixLength.value(); + SCOPED_TRACE(straceMessage); + } else { + queryCtx->testingOverrideConfigUnsafe( + {{core::QueryConfig::kEnablePrefixSort, "true"}}); + SCOPED_TRACE("run with prefix sort without max prefix length limit "); + } + CursorParameters params; + params.planNode = planNode; + params.queryCtx = queryCtx; + assertQueryOrdered(params, duckDbSql, sortingKeys); + } + void runTest( core::PlanNodePtr planNode, const core::PlanNodeId& orderById, @@ -178,6 +204,14 @@ class OrderByTest : public OperatorTestBase { SCOPED_TRACE("run without spilling"); assertQueryOrdered(planNode, duckDbSql, sortingKeys); } + { + SCOPED_TRACE("run with prefix sort"); + runWithPrefixSort( + planNode, duckDbSql, sortingKeys, std::optional("0")); + runWithPrefixSort( + planNode, duckDbSql, sortingKeys, std::optional("1")); + runWithPrefixSort(planNode, duckDbSql, sortingKeys, std::nullopt); + } { SCOPED_TRACE("run with spilling"); auto spillDirectory = exec::test::TempDirectoryPath::create();