diff --git a/presto-native-execution/presto_cpp/main/CMakeLists.txt b/presto-native-execution/presto_cpp/main/CMakeLists.txt index e5cf1835dd82..14d288ebca64 100644 --- a/presto-native-execution/presto_cpp/main/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/CMakeLists.txt @@ -30,6 +30,7 @@ add_library( ServerOperation.cpp SignalHandler.cpp SystemConnector.cpp + SessionProperties.cpp TaskManager.cpp TaskResource.cpp PeriodicHeartbeatManager.cpp diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 21384cf218b5..9c587fcc084e 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -466,6 +466,19 @@ void PrestoServer::run() { taskManager_ = std::make_unique( driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get()); + if (systemConfig->prestoNativeSidecar()) { + httpServer_->registerGet( + "/v1/properties/session", + [this]( + proxygen::HTTPMessage* /*message*/, + const std::vector>& /*body*/, + proxygen::ResponseHandler* downstream) { + auto sessionProperties = + taskManager_->getQueryContextManager()->getSessionProperties(); + http::sendOkResponse(downstream, sessionProperties.serialize()); + }); + } + std::string taskUri; if (httpsPort.has_value()) { taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value()); diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp index eb41ce33c088..e6c0d171f183 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.cpp +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.cpp @@ -28,36 +28,6 @@ using facebook::presto::protocol::TaskId; namespace facebook::presto { namespace { -// Utility function to translate a config name in Presto to its equivalent in -// Velox. Returns 'name' as is if there is no mapping. -std::string toVeloxConfig(const std::string& name) { - using velox::core::QueryConfig; - static const folly::F14FastMap - kPrestoToVeloxMapping = { - {"native_simplified_expression_evaluation_enabled", - QueryConfig::kExprEvalSimplified}, - {"native_max_spill_level", QueryConfig::kMaxSpillLevel}, - {"native_max_spill_file_size", QueryConfig::kMaxSpillFileSize}, - {"native_spill_compression_codec", - QueryConfig::kSpillCompressionKind}, - {"native_spill_write_buffer_size", - QueryConfig::kSpillWriteBufferSize}, - {"native_spill_file_create_config", - QueryConfig::kSpillFileCreateConfig}, - {"native_join_spill_enabled", QueryConfig::kJoinSpillEnabled}, - {"native_window_spill_enabled", QueryConfig::kWindowSpillEnabled}, - {"native_writer_spill_enabled", QueryConfig::kWriterSpillEnabled}, - {"native_row_number_spill_enabled", - QueryConfig::kRowNumberSpillEnabled}, - {"native_spiller_num_partition_bits", - QueryConfig::kSpillNumPartitionBits}, - {"native_topn_row_number_spill_enabled", - QueryConfig::kTopNRowNumberSpillEnabled}, - {"native_debug_validate_output_from_operators", - QueryConfig::kValidateOutputFromOperators}}; - auto it = kPrestoToVeloxMapping.find(name); - return it == kPrestoToVeloxMapping.end() ? name : it->second; -} // Update passed in query session configs with system configs. For any pairing // system/session configs if session config is present, it overrides system @@ -89,26 +59,6 @@ void updateFromSystemConfigs( } } -std::unordered_map toVeloxConfigs( - const protocol::SessionRepresentation& session) { - // Use base velox query config as the starting point and add Presto session - // properties on top of it. - auto configs = BaseVeloxQueryConfig::instance()->values(); - for (const auto& it : session.systemProperties) { - configs[toVeloxConfig(it.first)] = it.second; - } - - // If there's a timeZoneKey, convert to timezone name and add to the - // configs. Throws if timeZoneKey can't be resolved. - if (session.timeZoneKey != 0) { - configs.emplace( - velox::core::QueryConfig::kSessionTimezone, - velox::tz::getTimeZoneName(session.timeZoneKey)); - } - updateFromSystemConfigs(configs); - return configs; -} - std::unordered_map> toConnectorConfigs(const protocol::SessionRepresentation& session) { std::unordered_map> @@ -167,7 +117,9 @@ void updateVeloxConnectorConfigs( QueryContextManager::QueryContextManager( folly::Executor* driverExecutor, folly::Executor* spillerExecutor) - : driverExecutor_(driverExecutor), spillerExecutor_(spillerExecutor) {} + : driverExecutor_(driverExecutor), + spillerExecutor_(spillerExecutor), + sessionProperties_(SessionProperties()) {} std::shared_ptr QueryContextManager::findOrCreateQueryCtx( @@ -246,4 +198,26 @@ void QueryContextCache::testingClear() { queryIds_.clear(); } +std::unordered_map +QueryContextManager::toVeloxConfigs( + const protocol::SessionRepresentation& session) { + // Use base velox query config as the starting point and add Presto session + // properties on top of it. + auto configs = BaseVeloxQueryConfig::instance()->values(); + for (const auto& it : session.systemProperties) { + configs[sessionProperties_.toVeloxConfig(it.first)] = it.second; + sessionProperties_.updateVeloxConfig(it.first, it.second); + } + + // If there's a timeZoneKey, convert to timezone name and add to the + // configs. Throws if timeZoneKey can't be resolved. + if (session.timeZoneKey != 0) { + configs.emplace( + velox::core::QueryConfig::kSessionTimezone, + velox::tz::getTimeZoneName(session.timeZoneKey)); + } + updateFromSystemConfigs(configs); + return configs; +} + } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/QueryContextManager.h b/presto-native-execution/presto_cpp/main/QueryContextManager.h index 12ac3fb29e30..49ee7f2e072b 100644 --- a/presto-native-execution/presto_cpp/main/QueryContextManager.h +++ b/presto-native-execution/presto_cpp/main/QueryContextManager.h @@ -19,6 +19,7 @@ #include #include +#include "presto_cpp/main/SessionProperties.h" #include "presto_cpp/presto_protocol/presto_protocol.h" #include "velox/core/QueryCtx.h" @@ -116,6 +117,10 @@ class QueryContextManager { /// Test method to clear the query context cache. void testingClearCache(); + const SessionProperties& getSessionProperties() const { + return sessionProperties_; + } + private: std::shared_ptr findOrCreateQueryCtx( const protocol::TaskId& taskId, @@ -125,10 +130,14 @@ class QueryContextManager { std::unordered_map>&& connectorConfigStrings); + std::unordered_map toVeloxConfigs( + const protocol::SessionRepresentation& session); + folly::Executor* const driverExecutor_{nullptr}; folly::Executor* const spillerExecutor_{nullptr}; folly::Synchronized queryContextCache_; + SessionProperties sessionProperties_; }; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.cpp b/presto-native-execution/presto_cpp/main/SessionProperties.cpp new file mode 100644 index 000000000000..8c0464669d19 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/SessionProperties.cpp @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "presto_cpp/main/SessionProperties.h" +#include "velox/core/QueryConfig.h" + +using namespace facebook::velox; + +namespace facebook::presto { + +namespace { +const std::string boolToString(bool value) { + return value ? "true" : "false"; +} +} // namespace + +json SessionProperty::serialize() { + json j; + j["name"] = name_; + j["description"] = description_; + j["typeSignature"] = type_; + j["defaultValue"] = defaultValue_; + j["hidden"] = hidden_; + return j; +} + +void SessionProperties::addSessionProperty( + const std::string& name, + const std::string& description, + const TypePtr& type, + bool isHidden, + const std::string& veloxConfigName, + const std::string& veloxDefault) { + sessionProperties_[name] = std::make_shared( + name, + description, + type->toString(), + isHidden, + veloxConfigName, + veloxDefault); +} + +// List of native session properties is kept as the source of truth here. +SessionProperties::SessionProperties() { + using velox::core::QueryConfig; + // Use empty instance to get default property values. + QueryConfig c{{}}; + + addSessionProperty( + kExprEvalSimplified, + "Native Execution only. Enable simplified path in expression evaluation", + BOOLEAN(), + false, + QueryConfig::kExprEvalSimplified, + boolToString(c.exprEvalSimplified())); + + addSessionProperty( + kMaxSpillLevel, + "Native Execution only. The maximum allowed spilling level for hash join " + "build. 0 is the initial spilling level, -1 means unlimited.", + INTEGER(), + false, + QueryConfig::kMaxSpillLevel, + std::to_string(c.maxSpillLevel())); + + addSessionProperty( + kMaxSpillFileSize, + "The max allowed spill file size. If it is zero, then there is no limit.", + INTEGER(), + false, + QueryConfig::kMaxSpillFileSize, + std::to_string(c.maxSpillFileSize())); + + addSessionProperty( + kSpillCompressionCodec, + "Native Execution only. The compression algorithm type to compress the " + "spilled data.\n Supported compression codecs are: ZLIB, SNAPPY, LZO, " + "ZSTD, LZ4 and GZIP. NONE means no compression.", + VARCHAR(), + false, + QueryConfig::kSpillCompressionKind, + c.spillCompressionKind()); + + addSessionProperty( + kSpillWriteBufferSize, + "Native Execution only. The maximum size in bytes to buffer the serialized " + "spill data before writing to disk for IO efficiency. If set to zero, " + "buffering is disabled.", + BIGINT(), + false, + QueryConfig::kSpillWriteBufferSize, + std::to_string(c.spillWriteBufferSize())); + + addSessionProperty( + kSpillFileCreateConfig, + "Native Execution only. Config used to create spill files. This config is " + "provided to underlying file system and the config is free form. The form should be " + "defined by the underlying file system.", + VARCHAR(), + false, + QueryConfig::kSpillFileCreateConfig, + c.spillFileCreateConfig()); + + addSessionProperty( + kJoinSpillEnabled, + "Native Execution only. Enable join spilling on native engine", + BOOLEAN(), + false, + QueryConfig::kJoinSpillEnabled, + boolToString(c.joinSpillEnabled())); + + addSessionProperty( + kWindowSpillEnabled, + "Native Execution only. Enable window spilling on native engine", + BOOLEAN(), + false, + QueryConfig::kWindowSpillEnabled, + boolToString(c.windowSpillEnabled())); + + addSessionProperty( + kWriterSpillEnabled, + "Native Execution only. Enable writer spilling on native engine", + BOOLEAN(), + false, + QueryConfig::kWriterSpillEnabled, + boolToString(c.writerSpillEnabled())); + + addSessionProperty( + kRowNumberSpillEnabled, + "Native Execution only. Enable row number spilling on native engine", + BOOLEAN(), + false, + QueryConfig::kRowNumberSpillEnabled, + boolToString(c.rowNumberSpillEnabled())); + + addSessionProperty( + kJoinSpillPartitionBits, + "Native Execution only. The number of bits (N) used to calculate the " + "spilling partition number for hash join and RowNumber: 2 ^ N", + INTEGER(), + false, + QueryConfig::kJoinSpillPartitionBits, + std::to_string(c.rowNumberSpillEnabled())); + + addSessionProperty( + kNativeSpillerNumPartitionBits, + "none", + TINYINT(), + false, + QueryConfig::kSpillNumPartitionBits, + std::to_string(c.spillNumPartitionBits())), + + addSessionProperty( + kTopNRowNumberSpillEnabled, + "Native Execution only. Enable topN row number spilling on native engine", + BOOLEAN(), + false, + QueryConfig::kTopNRowNumberSpillEnabled, + boolToString(c.topNRowNumberSpillEnabled())); + + addSessionProperty( + kValidateOutputFromOperators, + "If set to true, then during execution of tasks, the output vectors of " + "every operator are validated for consistency. This is an expensive check " + "so should only be used for debugging. It can help debug issues where " + "malformed vector cause failures or crashes by helping identify which " + "operator is generating them.", + BOOLEAN(), + false, + QueryConfig::kValidateOutputFromOperators, + boolToString(c.validateOutputFromOperators())); + + // If `legacy_timestamp` is true, the coordinator expects timestamp + // conversions without a timezone to be converted to the user's + // session_timezone. + addSessionProperty( + kLegacyTimestamp, + "Native Execution only. Use legacy TIME & TIMESTAMP semantics. Warning: " + "this will be removed", + BOOLEAN(), + false, + QueryConfig::kAdjustTimestampToTimezone, + // Overrides velox default value. legacy_timestamp default value is true + // in the coordinator. + "true"); + + // TODO: remove this once cpu driver slicing config is turned on by default in + // Velox. + addSessionProperty( + kDriverCpuTimeSliceLimitMs, + "Native Execution only. The cpu time slice limit in ms that a driver thread. " + "If not zero, can continuously run without yielding. If it is zero, then " + "there is no limit.", + INTEGER(), + false, + QueryConfig::kDriverCpuTimeSliceLimitMs, + // Overrides velox default value. Set it to 1 second to be aligned with + // Presto Java. + std::to_string(1000)); +} + +const std::unordered_map>& +SessionProperties::getSessionProperties() { + return sessionProperties_; +} + +const std::string SessionProperties::toVeloxConfig(const std::string& name) { + auto it = sessionProperties_.find(name); + return it == sessionProperties_.end() ? name + : it->second->getVeloxConfigName(); +} + +void SessionProperties::updateVeloxConfig( + const std::string& name, + const std::string& value) { + auto it = sessionProperties_.find(name); + // Velox config value is updated only for presto session properties. + if (it == sessionProperties_.end()) { + return; + } + it->second->updateValue(value); +} + +json SessionProperties::serialize() { + json j = json::array(); + const auto sessionProperties = getSessionProperties(); + for (const auto& entry : sessionProperties) { + j.push_back(entry.second->serialize()); + } + return j; +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/SessionProperties.h b/presto-native-execution/presto_cpp/main/SessionProperties.h new file mode 100644 index 000000000000..09e2b0506b5e --- /dev/null +++ b/presto-native-execution/presto_cpp/main/SessionProperties.h @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "presto_cpp/external/json/nlohmann/json.hpp" +#include "velox/type/Type.h" + +using json = nlohmann::json; + +namespace facebook::presto { + +/// This is the interface of the session property. +/// Note: This interface should align with java coordinator. +class SessionProperty { + public: + SessionProperty( + const std::string& name, + const std::string& description, + const std::string& type, + bool hidden, + const std::string& veloxConfigName, + const std::string& defaultValue) + : name_(name), + description_(description), + type_(type), + hidden_(hidden), + veloxConfigName_(veloxConfigName), + defaultValue_(defaultValue), + value_(defaultValue) {} + + const std::string getVeloxConfigName() { + return veloxConfigName_; + } + + void updateValue(const std::string& value) { + value_ = value; + } + + bool operator==(const SessionProperty& other) const { + return name_ == other.name_ && description_ == other.description_ && + type_ == other.type_ && hidden_ == other.hidden_ && + veloxConfigName_ == other.veloxConfigName_ && + defaultValue_ == other.defaultValue_; + } + + json serialize(); + + private: + const std::string name_; + const std::string description_; + + // Datatype of presto native property. + const std::string type_; + const bool hidden_; + const std::string veloxConfigName_; + const std::string defaultValue_; + std::string value_; +}; + +/// Defines all system session properties supported by native worker to ensure +/// that they are the source of truth and to differentiate them from Java based +/// session properties. Also maps the native session properties to velox. +class SessionProperties { + public: + /// Enable simplified path in expression evaluation. + static constexpr const char* kExprEvalSimplified = + "native_simplified_expression_evaluation_enabled"; + + /// Enable join spilling on native engine. + static constexpr const char* kJoinSpillEnabled = "native_join_spill_enabled"; + + /// The maximum allowed spilling level for hash join build. + static constexpr const char* kMaxSpillLevel = "native_max_spill_level"; + + /// The maximum allowed spill file size. + static constexpr const char* kMaxSpillFileSize = "native_max_spill_file_size"; + + /// Enable row number spilling on native engine. + static constexpr const char* kRowNumberSpillEnabled = + "native_row_number_spill_enabled"; + + /// The compression algorithm type to compress the spilled data. + static constexpr const char* kSpillCompressionCodec = + "native_spill_compression_codec"; + + /// The maximum size in bytes to buffer the serialized spill data before + /// writing to disk for IO efficiency. + static constexpr const char* kSpillWriteBufferSize = + "native_spill_write_buffer_size"; + + /// Config used to create spill files. This config is provided to underlying + /// file system and the config is free form. + static constexpr const char* kSpillFileCreateConfig = + "native_spill_file_create_config"; + + /// Enable window spilling on native engine. + static constexpr const char* kWindowSpillEnabled = + "native_window_spill_enabled"; + + /// Enable writer spilling on native engine. + static constexpr const char* kWriterSpillEnabled = + "native_writer_spill_enabled"; + + /// The number of bits (N) used to calculate the spilling + /// partition number for hash join and RowNumber: 2 ^ N + static constexpr const char* kJoinSpillPartitionBits = + "native_join_spiller_partition_bits"; + + static constexpr const char* kNativeSpillerNumPartitionBits = + "native_spiller_num_partition_bits"; + + /// Enable topN row number spilling on native engine. + static constexpr const char* kTopNRowNumberSpillEnabled = + "native_topn_row_number_spill_enabled"; + + /// If set to true, then during execution of tasks, the output vectors of + /// every operator are validated for consistency. This is an expensive check + /// so should only be used for debugging. + static constexpr const char* kValidateOutputFromOperators = + "native_debug_validate_output_from_operators"; + + /// Enable timezone-less timestamp conversions. + static constexpr const char* kLegacyTimestamp = "legacy_timestamp"; + + /// Specifies the cpu time slice limit in ms that a driver thread + /// can continuously run without yielding. + static constexpr const char* kDriverCpuTimeSliceLimitMs = + "driver_cpu_time_slice_limit_ms"; + + SessionProperties(); + + const std::unordered_map>& + getSessionProperties(); + + /// Utility function to translate a config name in Presto to its equivalent in + /// Velox. Returns 'name' as is if there is no mapping. + const std::string toVeloxConfig(const std::string& name); + + void updateVeloxConfig(const std::string& name, const std::string& value); + + json serialize(); + + protected: + void addSessionProperty( + const std::string& name, + const std::string& description, + const velox::TypePtr& type, + bool isHidden, + const std::string& veloxConfigName, + const std::string& veloxDefault); + + std::unordered_map> + sessionProperties_; +}; + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt index 004c5929eed6..9635db335915 100644 --- a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable( PrestoTaskTest.cpp QueryContextCacheTest.cpp ServerOperationTest.cpp + SessionPropertiesTest.cpp TaskManagerTest.cpp QueryContextManagerTest.cpp) diff --git a/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp new file mode 100644 index 000000000000..5da79aac1081 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "presto_cpp/main/SessionProperties.h" +#include "velox/core/QueryConfig.h" +#include "velox/type/Type.h" + +using namespace facebook::presto; +using namespace facebook::velox; + +class SessionPropertiesTest : public testing::Test {}; + +TEST_F(SessionPropertiesTest, validateMapping) { + const std::vector names = { + SessionProperties::kLegacyTimestamp, + SessionProperties::kDriverCpuTimeSliceLimitMs, + SessionProperties::kSpillCompressionCodec}; + const std::vector veloxConfigNames = { + core::QueryConfig::kAdjustTimestampToTimezone, + core::QueryConfig::kDriverCpuTimeSliceLimitMs, + core::QueryConfig::kSpillCompressionKind}; + auto sessionProperties = SessionProperties().getSessionProperties(); + const auto len = names.size(); + for (auto i = 0; i < len; i++) { + EXPECT_EQ( + veloxConfigNames[i], + sessionProperties.at(names[i])->getVeloxConfigName()); + } +} + +TEST_F(SessionPropertiesTest, serializeProperty) { + auto properties = SessionProperties(); + auto j = properties.serialize(); + for (const auto& property : j) { + auto name = property["name"]; + json expectedProperty = + properties.getSessionProperties().at(name)->serialize(); + EXPECT_EQ(property, expectedProperty); + } +}