Skip to content

Commit

Permalink
[Native] Prometheus exporter using prometheus-cpp client
Browse files Browse the repository at this point in the history
  • Loading branch information
Karteekmurthys committed Jan 31, 2024
1 parent 5c510e7 commit 3ce8d6c
Show file tree
Hide file tree
Showing 13 changed files with 484 additions and 16 deletions.
7 changes: 7 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_PROMETHEUS_REPORTER
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down Expand Up @@ -201,6 +204,10 @@ if(PRESTO_ENABLE_JWT)
add_compile_definitions(PRESTO_ENABLE_JWT)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_compile_definitions(PRESTO_ENABLE_PROMETHEUS_REPORTER)
endif()

if("${MAX_LINK_JOBS}")
set_property(GLOBAL APPEND PROPERTY JOB_POOLS
"presto_link_job_pool=${MAX_LINK_JOBS}")
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@ set_property(TARGET presto_server PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
message(STATUS "Linking prometheus metrics reporter")
target_link_libraries(presto_server_lib prometheus_reporter)
endif()
31 changes: 28 additions & 3 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "presto_cpp/main/TaskResource.h"
#include "presto_cpp/main/common/ConfigReader.h"
#include "presto_cpp/main/common/Counters.h"
#include "presto_cpp/main/common/StatsReporterImpl.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -53,6 +52,25 @@
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
#include "velox/serializers/PrestoSerializer.h"

#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
#include "presto_cpp/main/common/prometheus-metrics/PrometheusReporter.h"
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
auto nodeConfig = facebook::presto::NodeConfig::instance();
std::string cluster = nodeConfig->nodeEnvironment();
char* hostName = std::getenv("HOSTNAME");
std::string worker = !hostName ? "" : hostName;
return new facebook::presto::prometheus::PrometheusReporter(
::prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
});
#else
#include "presto_cpp/main/common/StatsReporterImpl.h"
// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::presto::StatsReporterImpl();
});
#endif

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
#include "presto_cpp/main/RemoteFunctionRegisterer.h"
#endif
Expand Down Expand Up @@ -1107,15 +1125,22 @@ void PrestoServer::reportServerInfo(proxygen::ResponseHandler* downstream) {
}

void PrestoServer::reportHealthMetrics(proxygen::ResponseHandler* downstream) {
auto reporter = std::dynamic_pointer_cast<StatsReporterImpl>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
auto nodeConfig = facebook::presto::NodeConfig::instance();
std::string cluster = nodeConfig->nodeEnvironment();
char* hostName = std::getenv("HOSTNAME");
std::string worker = !hostName ? "" : hostName;
#ifdef PRESTO_ENABLE_PROMETHEUS_REPORTER
auto reporter = std::dynamic_pointer_cast<
facebook::presto::prometheus::PrometheusReporter>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
http::sendOkResponse(downstream, reporter->getSerializedMetrics());
#else
auto reporter = std::dynamic_pointer_cast<StatsReporterImpl>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
prometheus::PrometheusSerializer serializer(
prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
http::sendOkResponse(downstream, reporter->getMetrics(serializer));
#endif
}
void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ set_property(TARGET presto_common PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_subdirectory(prometheus-metrics)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,4 @@ const std::string StatsReporterImpl::getMetrics(
std::lock_guard<std::mutex> lock(mutex_);
return serializer.serialize(registeredStats_, metricsMap_);
}

// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new StatsReporterImpl();
});
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include <iostream>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Counters.h"
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/velox/common/base/Exceptions.h"

namespace facebook::presto {

Expand Down Expand Up @@ -123,13 +123,12 @@ class StatsReporterImpl : public facebook::velox::BaseStatsReporter {

void addMetricValue(folly::StringPiece key, size_t value = 1) const override;

virtual void addHistogramMetricValue(const std::string& key, size_t value)
void addHistogramMetricValue(const std::string& key, size_t value)
const override {}

virtual void addHistogramMetricValue(const char* key, size_t value)
const override {}
void addHistogramMetricValue(const char* key, size_t value) const override {}

virtual void addHistogramMetricValue(folly::StringPiece key, size_t value)
void addHistogramMetricValue(folly::StringPiece key, size_t value)
const override {}

const facebook::velox::StatType getRegisteredStatType(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
find_package(prometheus-cpp CONFIG REQUIRED)
add_library(prometheus_reporter PrometheusReporter.cpp PrometheusReporter.h)
target_link_libraries(prometheus_reporter presto_common prometheus-cpp::core)
set_property(TARGET prometheus_reporter PROPERTY JOB_POOL_LINK
presto_link_job_pool)
add_executable(prometheus_reporter_test PrometheusReporterTest.cpp)
target_link_libraries(prometheus_reporter_test presto_server_lib
velox_exec_test_lib prometheus_reporter gtest gtest_main)
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "PrometheusReporter.h"
namespace facebook::presto::prometheus {
void PrometheusReporter::registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const {
if (registeredMetrics_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
// Prometheus format requires '.' to be replaced.
std::string keyStr = std::string(key);
std::replace(keyStr.begin(), keyStr.end(), '.', '_');
switch (statType) {
case facebook::velox::StatType::COUNT: {
auto& counterFamily =
::prometheus::BuildCounter().Name(keyStr).Register(*registry_);
auto& counter = counterFamily.Add(labels_);
countersMap_.emplace(std::string(key), counter);
break;
}
case facebook::velox::StatType::SUM:
case facebook::velox::StatType::AVG:
case facebook::velox::StatType::RATE: {
auto& gaugeFamily =
::prometheus::BuildGauge().Name(keyStr).Register(*registry_);
auto& gauge = gaugeFamily.Add(labels_);
gaugeMap_.emplace(std::string(key), gauge);
break;
}
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}", std::to_string((int)statType));
}
registeredMetrics_.emplace(key, statType);
}

void PrometheusReporter::registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const {
registerMetricExportType(key.start(), statType);
}

void PrometheusReporter::registerHistogramMetricExportType(
const char* key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
std::lock_guard<std::mutex> l(mutex_);
if (registeredHistogramMetrics_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
int numBuckets = (max - min) / bucketWidth;
auto bound = min + bucketWidth;
std::string keyStr = std::string(key);
std::replace(keyStr.begin(), keyStr.end(), '.', '_');
auto& histogramFamily =
::prometheus::BuildHistogram().Name(keyStr).Register(*registry_);
::prometheus::Histogram::BucketBoundaries bucketBoundaries;
while (numBuckets) {
bucketBoundaries.push_back(bound);
bound += bucketWidth;
numBuckets--;
}
VELOX_CHECK_GE(bucketBoundaries.size(), 1);
auto& histogramMetric = histogramFamily.Add(labels_, bucketBoundaries);
histogramMap_.emplace(key, histogramMetric);
// If percentiles are provided, create a Summary type metric and register.
if (pcts.size() > 0) {
auto& summaryFamily = ::prometheus::BuildSummary()
.Name(keyStr.append(kSummarySuffix))
.Register(*registry_);
::prometheus::Summary::Quantiles quantiles;
for (auto pct : pcts) {
quantiles.push_back(
::prometheus::detail::CKMSQuantiles::Quantile(pct / (double)100, 0));
}
auto& summaryMetric = summaryFamily.Add({labels_}, quantiles);
summaryMap_.emplace(std::string(key), summaryMetric);
}
registeredHistogramMetrics_.insert(key);
}

void PrometheusReporter::registerHistogramMetricExportType(
folly::StringPiece key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
registerHistogramMetricExportType(key.begin(), bucketWidth, min, max, pcts);
}

void PrometheusReporter::addMetricValue(const std::string& key, size_t value)
const {
addMetricValue(key.c_str(), value);
}

void PrometheusReporter::addMetricValue(const char* key, size_t value) const {
std::lock_guard<std::mutex> l(mutex_);
if (!registeredMetrics_.count(key)) {
VLOG(1) << "addMetricValue for unregistred metric " << key;
return;
}
auto statType = registeredMetrics_.find(key)->second;
switch (statType) {
case velox::StatType::COUNT:
countersMap_.find(key)->second.Increment(value);
break;
case velox::StatType::SUM:
case velox::StatType::AVG:
case velox::StatType::RATE:
gaugeMap_.find(key)->second.Set(value);
break;
};
}

void PrometheusReporter::addMetricValue(folly::StringPiece key, size_t value)
const {
addMetricValue(key.begin(), value);
}

void PrometheusReporter::addHistogramMetricValue(
const std::string& key,
size_t value) const {
addHistogramMetricValue(key.c_str(), value);
}

void PrometheusReporter::addHistogramMetricValue(const char* key, size_t value)
const {
std::lock_guard<std::mutex> l(mutex_);
if (!registeredHistogramMetrics_.count(key)) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
return;
}
histogramMap_.find(key)->second.Observe(value);
if (summaryMap_.count(key)) {
summaryMap_.find(key)->second.Observe(value);
}
}

void PrometheusReporter::addHistogramMetricValue(
folly::StringPiece key,
size_t value) const {
addHistogramMetricValue(key.begin(), value);
}

std::string PrometheusReporter::getSerializedMetrics() const {
if (registeredMetrics_.empty() && registeredHistogramMetrics_.empty()) {
return "";
}
::prometheus::TextSerializer serializer;
return serializer.Serialize(registry_->Collect());
}
}; // namespace facebook::presto::prometheus
Loading

0 comments on commit 3ce8d6c

Please sign in to comment.