Skip to content

Commit

Permalink
[native] Retrieve presto native session properties
Browse files Browse the repository at this point in the history
Co-authored-by: Abe Varghese Kodiyan <abe.varghese@ibm.com>
Co-authored-by: Joe Abraham <joe.abraham@ibm.com>
Co-authored-by: Deepthy Davis <deepthy.davis@ibm.com>
  • Loading branch information
4 people authored and aditi-pandit committed Aug 20, 2024
1 parent 56223eb commit fdd2059
Show file tree
Hide file tree
Showing 8 changed files with 511 additions and 51 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ add_library(
ServerOperation.cpp
SignalHandler.cpp
SystemConnector.cpp
SessionProperties.cpp
TaskManager.cpp
TaskResource.cpp
PeriodicHeartbeatManager.cpp
Expand Down
13 changes: 13 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,19 @@ void PrestoServer::run() {
taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

if (systemConfig->prestoNativeSidecar()) {
httpServer_->registerGet(
"/v1/properties/session",
[this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
auto sessionProperties =
taskManager_->getQueryContextManager()->getSessionProperties();
http::sendOkResponse(downstream, sessionProperties.serialize());
});
}

std::string taskUri;
if (httpsPort.has_value()) {
taskUri = fmt::format(kTaskUriFormat, kHttps, address_, httpsPort.value());
Expand Down
76 changes: 25 additions & 51 deletions presto-native-execution/presto_cpp/main/QueryContextManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,6 @@ using facebook::presto::protocol::TaskId;

namespace facebook::presto {
namespace {
// Utility function to translate a config name in Presto to its equivalent in
// Velox. Returns 'name' as is if there is no mapping.
std::string toVeloxConfig(const std::string& name) {
using velox::core::QueryConfig;
static const folly::F14FastMap<std::string, std::string>
kPrestoToVeloxMapping = {
{"native_simplified_expression_evaluation_enabled",
QueryConfig::kExprEvalSimplified},
{"native_max_spill_level", QueryConfig::kMaxSpillLevel},
{"native_max_spill_file_size", QueryConfig::kMaxSpillFileSize},
{"native_spill_compression_codec",
QueryConfig::kSpillCompressionKind},
{"native_spill_write_buffer_size",
QueryConfig::kSpillWriteBufferSize},
{"native_spill_file_create_config",
QueryConfig::kSpillFileCreateConfig},
{"native_join_spill_enabled", QueryConfig::kJoinSpillEnabled},
{"native_window_spill_enabled", QueryConfig::kWindowSpillEnabled},
{"native_writer_spill_enabled", QueryConfig::kWriterSpillEnabled},
{"native_row_number_spill_enabled",
QueryConfig::kRowNumberSpillEnabled},
{"native_spiller_num_partition_bits",
QueryConfig::kSpillNumPartitionBits},
{"native_topn_row_number_spill_enabled",
QueryConfig::kTopNRowNumberSpillEnabled},
{"native_debug_validate_output_from_operators",
QueryConfig::kValidateOutputFromOperators}};
auto it = kPrestoToVeloxMapping.find(name);
return it == kPrestoToVeloxMapping.end() ? name : it->second;
}

// Update passed in query session configs with system configs. For any pairing
// system/session configs if session config is present, it overrides system
Expand Down Expand Up @@ -89,26 +59,6 @@ void updateFromSystemConfigs(
}
}

std::unordered_map<std::string, std::string> toVeloxConfigs(
const protocol::SessionRepresentation& session) {
// Use base velox query config as the starting point and add Presto session
// properties on top of it.
auto configs = BaseVeloxQueryConfig::instance()->values();
for (const auto& it : session.systemProperties) {
configs[toVeloxConfig(it.first)] = it.second;
}

// If there's a timeZoneKey, convert to timezone name and add to the
// configs. Throws if timeZoneKey can't be resolved.
if (session.timeZoneKey != 0) {
configs.emplace(
velox::core::QueryConfig::kSessionTimezone,
velox::tz::getTimeZoneName(session.timeZoneKey));
}
updateFromSystemConfigs(configs);
return configs;
}

std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
toConnectorConfigs(const protocol::SessionRepresentation& session) {
std::unordered_map<std::string, std::unordered_map<std::string, std::string>>
Expand Down Expand Up @@ -167,7 +117,9 @@ void updateVeloxConnectorConfigs(
QueryContextManager::QueryContextManager(
folly::Executor* driverExecutor,
folly::Executor* spillerExecutor)
: driverExecutor_(driverExecutor), spillerExecutor_(spillerExecutor) {}
: driverExecutor_(driverExecutor),
spillerExecutor_(spillerExecutor),
sessionProperties_(SessionProperties()) {}

std::shared_ptr<velox::core::QueryCtx>
QueryContextManager::findOrCreateQueryCtx(
Expand Down Expand Up @@ -246,4 +198,26 @@ void QueryContextCache::testingClear() {
queryIds_.clear();
}

std::unordered_map<std::string, std::string>
QueryContextManager::toVeloxConfigs(
const protocol::SessionRepresentation& session) {
// Use base velox query config as the starting point and add Presto session
// properties on top of it.
auto configs = BaseVeloxQueryConfig::instance()->values();
for (const auto& it : session.systemProperties) {
configs[sessionProperties_.toVeloxConfig(it.first)] = it.second;
sessionProperties_.updateVeloxConfig(it.first, it.second);
}

// If there's a timeZoneKey, convert to timezone name and add to the
// configs. Throws if timeZoneKey can't be resolved.
if (session.timeZoneKey != 0) {
configs.emplace(
velox::core::QueryConfig::kSessionTimezone,
velox::tz::getTimeZoneName(session.timeZoneKey));
}
updateFromSystemConfigs(configs);
return configs;
}

} // namespace facebook::presto
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/QueryContextManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <unordered_map>

#include "presto_cpp/main/SessionProperties.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/core/QueryCtx.h"

Expand Down Expand Up @@ -116,6 +117,10 @@ class QueryContextManager {
/// Test method to clear the query context cache.
void testingClearCache();

const SessionProperties& getSessionProperties() const {
return sessionProperties_;
}

private:
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
const protocol::TaskId& taskId,
Expand All @@ -125,10 +130,14 @@ class QueryContextManager {
std::unordered_map<std::string, std::string>>&&
connectorConfigStrings);

std::unordered_map<std::string, std::string> toVeloxConfigs(
const protocol::SessionRepresentation& session);

folly::Executor* const driverExecutor_{nullptr};
folly::Executor* const spillerExecutor_{nullptr};

folly::Synchronized<QueryContextCache> queryContextCache_;
SessionProperties sessionProperties_;
};

} // namespace facebook::presto
Loading

0 comments on commit fdd2059

Please sign in to comment.