Skip to content

Commit

Permalink
Expose REST API to fetch runtime metrics in prometheus format
Browse files Browse the repository at this point in the history
Co-authored-by:jaystarshot <jay.narale@uber.com>
  • Loading branch information
karteekmurthys committed Apr 27, 2024
1 parent 62d9641 commit 22c1820
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 7 deletions.
7 changes: 7 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_PROMETHEUS_REPORTER
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down Expand Up @@ -201,6 +204,10 @@ if(PRESTO_ENABLE_JWT)
add_compile_definitions(PRESTO_ENABLE_JWT)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_compile_definitions(PRESTO_ENABLE_PROMETHEUS_REPORTER)
endif()

if("${MAX_LINK_JOBS}")
set_property(GLOBAL APPEND PROPERTY JOB_POOLS
"presto_link_job_pool=${MAX_LINK_JOBS}")
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ discovery.uri=http://127.0.0.1:58215
presto.version=testversion
http-server.http.port=7777
shutdown-onset-sec=1
register-test-functions=true
register-test-functions=true
runtime-metrics-collection-enabled=true
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,7 @@ set_property(TARGET presto_server PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
message(STATUS "Linking prometheus metrics reporter")
target_link_libraries(presto_server_lib prometheus_reporter)
endif()
23 changes: 18 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,25 @@
#include <glog/logging.h>
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Utils.h"

#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
#include "presto_cpp/main/common/prometheus-metrics/PrometheusReporter.h"
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
auto nodeConfig = facebook::presto::NodeConfig::instance();
const std::string cluster = nodeConfig->nodeEnvironment();
const char* hostName = std::getenv("HOSTNAME");
const std::string worker = !hostName ? "" : hostName;
return new facebook::presto::prometheus::PrometheusReporter(
::prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
});
#else
#include "velox/common/base/StatsReporter.h"
// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::velox::DummyStatsReporter();
});
#endif

DEFINE_string(etc_dir, ".", "etc directory for presto configuration");

Expand All @@ -30,8 +48,3 @@ int main(int argc, char* argv[]) {
presto.run();
PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()";
}

// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::velox::DummyStatsReporter();
});
31 changes: 30 additions & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/serializers/PrestoSerializer.h"

#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
#include "presto_cpp/main/common/prometheus-metrics/PrometheusReporter.h"
#endif

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
#include "presto_cpp/main/RemoteFunctionRegisterer.h"
#endif
Expand Down Expand Up @@ -217,6 +221,10 @@ void PrestoServer::run() {
exit(EXIT_FAILURE);
}

if (systemConfig->enableRuntimeMetricsCollection()) {
// This flag must be set to register the counters.
facebook::velox::BaseStatsReporter::registered = true;
}
registerStatsCounters();
registerFileSinks();
registerFileSystems();
Expand Down Expand Up @@ -318,6 +326,14 @@ void PrestoServer::run() {
proxygen::ResponseHandler* downstream) {
server->reportServerInfo(downstream);
});
httpServer_->registerGet(
"/v1/info/metrics",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
server->reportWorkerMetrics(downstream);
});
httpServer_->registerGet(
"/v1/info/state",
[server = this](
Expand Down Expand Up @@ -1097,7 +1113,6 @@ void PrestoServer::populateMemAndCPUInfo() {
poolInfo.queryMemoryAllocations.insert(
{queryId, {protocol::MemoryAllocation{"total", bytes}}});
++numContexts;
poolInfo.reservedBytes += bytes;
});
RECORD_METRIC_VALUE(kCounterNumQueryContexts, numContexts);
cpuMon_.update();
Expand Down Expand Up @@ -1136,6 +1151,20 @@ void PrestoServer::reportServerInfo(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(serverInfo));
}

void PrestoServer::reportWorkerMetrics(proxygen::ResponseHandler* downstream) {
#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
const auto nodeConfig = facebook::presto::NodeConfig::instance();
const std::string cluster = nodeConfig->nodeEnvironment();
const char* hostName = std::getenv("HOSTNAME");
const std::string worker = !hostName ? "" : hostName;
auto reporter = std::dynamic_pointer_cast<
facebook::presto::prometheus::PrometheusReporter>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
http::sendOkResponse(downstream, reporter->getSerializedMetrics());
#else
return;
#endif
}
void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void reportWorkerMetrics(proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ set_property(TARGET presto_common PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
message(STATUS "Adding prometheus-metrics")
add_subdirectory(prometheus-metrics)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
find_package(prometheus-cpp CONFIG REQUIRED)

add_library(prometheus_reporter PrometheusReporter.h PrometheusReporter.cpp)

target_link_libraries(prometheus_reporter presto_common prometheus-cpp::core)

set_property(TARGET prometheus_reporter PROPERTY JOB_POOL_LINK
presto_link_job_pool)
add_subdirectory(tests)
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/common/prometheus-metrics/PrometheusReporter.h"
namespace facebook::presto::prometheus {

using namespace ::prometheus;

void PrometheusReporter::registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const {
if (registeredMetrics_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
// Prometheus format requires '.' to be replaced.
std::string displayKey = std::string(key);
std::replace(displayKey.begin(), displayKey.end(), '.', '_');
switch (statType) {
case facebook::velox::StatType::COUNT: {
auto& counterFamily =
BuildCounter().Name(displayKey).Register(*registry_);
auto& counter = counterFamily.Add(labels_);
countersMap_.wlock()->emplace(std::string(key), counter);
break;
}
case facebook::velox::StatType::SUM:
case facebook::velox::StatType::AVG:
case facebook::velox::StatType::RATE: {
auto& gaugeFamily = BuildGauge().Name(displayKey).Register(*registry_);
auto& gauge = gaugeFamily.Add(labels_);
gaugeMap_.wlock()->emplace(std::string(key), gauge);
break;
}
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}", std::to_string((int)statType));
}
registeredMetrics_.emplace(key, statType);
}

void PrometheusReporter::registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const {
registerMetricExportType(key.start(), statType);
}

void PrometheusReporter::registerHistogramMetricExportType(
const char* key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
if (registeredHistogramMetrics_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
int numBuckets = (max - min) / bucketWidth;
auto bound = min + bucketWidth;
std::string keyStr = std::string(key);
std::replace(keyStr.begin(), keyStr.end(), '.', '_');

auto& histogramFamily = BuildHistogram().Name(keyStr).Register(*registry_);
Histogram::BucketBoundaries bucketBoundaries;
while (numBuckets) {
bucketBoundaries.push_back(bound);
bound += bucketWidth;
numBuckets--;
}
VELOX_CHECK_GE(bucketBoundaries.size(), 1);
auto& histogramMetric = histogramFamily.Add(labels_, bucketBoundaries);
histogramMap_.wlock()->emplace(key, histogramMetric);
// If percentiles are provided, create a Summary type metric and register.
if (pcts.size() > 0) {
auto& summaryFamily =
BuildSummary().Name(keyStr.append(kSummarySuffix)).Register(*registry_);
Summary::Quantiles quantiles;
for (auto pct : pcts) {
quantiles.push_back(
detail::CKMSQuantiles::Quantile(pct / (double)100, 0));
}
auto& summaryMetric = summaryFamily.Add({labels_}, quantiles);
summaryMap_.wlock()->emplace(std::string(key), summaryMetric);
}
registeredHistogramMetrics_.insert(key);
}

void PrometheusReporter::registerHistogramMetricExportType(
folly::StringPiece key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
registerHistogramMetricExportType(key.begin(), bucketWidth, min, max, pcts);
}

void PrometheusReporter::addMetricValue(const std::string& key, size_t value)
const {
addMetricValue(key.c_str(), value);
}

void PrometheusReporter::addMetricValue(const char* key, size_t value) const {
if (!registeredMetrics_.count(key)) {
VLOG(1) << "addMetricValue for unregistred metric " << key;
return;
}
auto statType = registeredMetrics_.find(key)->second;
switch (statType) {
case velox::StatType::COUNT:
countersMap_.wlock()->find(key)->second.Increment(value);
break;
case velox::StatType::SUM:
case velox::StatType::AVG:
case velox::StatType::RATE:
// Overrides the existing state.
gaugeMap_.wlock()->find(key)->second.Set(value);
break;
};
}

void PrometheusReporter::addMetricValue(folly::StringPiece key, size_t value)
const {
addMetricValue(key.begin(), value);
}

void PrometheusReporter::addHistogramMetricValue(
const std::string& key,
size_t value) const {
addHistogramMetricValue(key.c_str(), value);
}

void PrometheusReporter::addHistogramMetricValue(const char* key, size_t value)
const {
if (!registeredHistogramMetrics_.count(key)) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
return;
}
histogramMap_.wlock()->find(key)->second.Observe(value);
if (summaryMap_.rlock()->count(key)) {
summaryMap_.wlock()->find(key)->second.Observe(value);
}
}

void PrometheusReporter::addHistogramMetricValue(
folly::StringPiece key,
size_t value) const {
addHistogramMetricValue(key.begin(), value);
}

std::string PrometheusReporter::getSerializedMetrics() const {
if (registeredMetrics_.empty() && registeredHistogramMetrics_.empty()) {
return "";
}
TextSerializer serializer;
// Registry::Collect() acquires lock on a mutex.
return serializer.Serialize(registry_->Collect());
}
}; // namespace facebook::presto::prometheus
Loading

0 comments on commit 22c1820

Please sign in to comment.