Skip to content

Commit

Permalink
[native] Expose REST API to fetch worker stats in Prometheus format
Browse files Browse the repository at this point in the history
Add Prometheus Reporter using the prometheus-cpp library.
Add a CMake flag PRESTO_ENABLE_PROMETHEUS_REPORTER to enable Prometheus Reporter.
Add REST API '/v1/info/metrics' to fetch worker metrics from Prometheus Reporter in prometheus format.
This endpoint is only enabled if the Prometheus Reporter is enabled.

Co-authored-by:jaystarshot <jay.narale@uber.com>
  • Loading branch information
karteekmurthys committed Jul 1, 2024
1 parent dcb4ed5 commit c73c5aa
Show file tree
Hide file tree
Showing 15 changed files with 565 additions and 20 deletions.
1 change: 1 addition & 0 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ jobs:
-DPRESTO_ENABLE_PARQUET=ON \
-DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON \
-DPRESTO_ENABLE_JWT=ON \
-DPRESTO_ENABLE_PROMETHEUS_REPORTER=ON \
-DCMAKE_PREFIX_PATH=/usr/local \
-DCMAKE_CXX_COMPILER_LAUNCHER=ccache \
-DMAX_LINK_JOBS=2
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_PROMETHEUS_REPORTER
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ endif
ifeq ($(PRESTO_ENABLE_JWT), ON)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_JWT=ON
endif
ifeq ($(PRESTO_ENABLE_PROMETHEUS_REPORTER), ON)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_PROMETHEUS_REPORTER=ON
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ discovery.uri=http://127.0.0.1:58215
presto.version=testversion
http-server.http.port=7777
shutdown-onset-sec=1
register-test-functions=true
register-test-functions=true
runtime-metrics-collection-enabled=true
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,8 @@ set_property(TARGET presto_server PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_subdirectory(runtime-metrics)
target_link_libraries(presto_server prometheus_reporter)
endif()
6 changes: 0 additions & 6 deletions presto-native-execution/presto_cpp/main/PrestoMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <glog/logging.h>
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Utils.h"
#include "velox/common/base/StatsReporter.h"

DEFINE_string(etc_dir, ".", "etc directory for presto configuration");

Expand All @@ -30,8 +29,3 @@ int main(int argc, char* argv[]) {
presto.run();
PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()";
}

// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
return new facebook::velox::DummyStatsReporter();
});
21 changes: 16 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ void PrestoServer::run() {
baseVeloxQueryConfig->initialize(
fmt::format("{}/velox.properties", configDirectoryPath_), true);

if (systemConfig->enableRuntimeMetricsCollection()) {
enableRuntimeMetricReporting();
}

httpPort = systemConfig->httpServerHttpPort();
if (systemConfig->httpServerHttpsEnabled()) {
httpsPort = systemConfig->httpServerHttpsPort();
Expand Down Expand Up @@ -372,6 +368,21 @@ void PrestoServer::run() {
.sendWithEOM();
});

if (systemConfig->enableRuntimeMetricsCollection()) {
enableWorkerStatsReporting();
if (folly::Singleton<velox::BaseStatsReporter>::try_get()) {
httpServer_->registerGet(
"/v1/info/metrics",
[](proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(
downstream,
folly::Singleton<velox::BaseStatsReporter>::try_get()
->fetchMetrics());
});
}
}
registerFunctions();
registerRemoteFunctions();
registerVectorSerdes();
Expand Down Expand Up @@ -1171,7 +1182,7 @@ std::string PrestoServer::getBaseSpillDirectory() const {
return SystemConfig::instance()->spillerSpillPath().value_or("");
}

void PrestoServer::enableRuntimeMetricReporting() {
void PrestoServer::enableWorkerStatsReporting() {
// This flag must be set to register the counters.
facebook::velox::BaseStatsReporter::registered = true;
registerStatsCounters();
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class PrestoServer {
virtual std::string getBaseSpillDirectory() const;

/// Invoked to enable stats reporting and register counters.
virtual void enableRuntimeMetricReporting();
virtual void enableWorkerStatsReporting();

/// Invoked to get the list of filters passed to the http server.
std::vector<std::unique_ptr<proxygen::RequestHandlerFactory>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Add as a source to presto_server to allow global reporter Singleton
# initialization.

find_package(prometheus-cpp CONFIG REQUIRED)

# Prepare a static library to link with prometheus_reporter_test
add_library(prometheus_reporter OBJECT PrometheusStatsReporter.cpp)
target_link_libraries(prometheus_reporter presto_common prometheus-cpp::core)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/runtime-metrics/PrometheusStatsReporter.h"

#include <prometheus/collectable.h>
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
#include <prometheus/histogram.h>
#include <prometheus/registry.h>
#include <prometheus/summary.h>
#include <prometheus/text_serializer.h>

namespace facebook::presto::prometheus {

// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter(
[]() -> facebook::velox::BaseStatsReporter* {
return facebook::presto::prometheus::PrometheusStatsReporter::
createPrometheusReporter()
.release();
});

static constexpr std::string_view kSummarySuffix("_summary");

struct PrometheusStatsReporter::PrometheusImpl {
explicit PrometheusImpl(const ::prometheus::Labels& labels) {
registry = std::make_shared<::prometheus::Registry>();
for (const auto& itr : labels) {
this->labels[itr.first] = itr.second;
}
}

::prometheus::Labels labels;
std::shared_ptr<::prometheus::Registry> registry;
};

PrometheusStatsReporter::PrometheusStatsReporter(
const std::map<std::string, std::string>& labels) {
impl_ = std::make_shared<PrometheusImpl>(labels);
}

void PrometheusStatsReporter::registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const {
if (registeredMetricsMap_.count(key)) {
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
// '.' is replaced with '_'.
std::string sanitizedMetricKey = std::string(key);
std::replace(sanitizedMetricKey.begin(), sanitizedMetricKey.end(), '.', '_');
switch (statType) {
case facebook::velox::StatType::COUNT: {
// A new MetricFamily object is built for every new metric key.
auto& counterFamily = ::prometheus::BuildCounter()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);
auto& counter = counterFamily.Add(impl_->labels);
registeredMetricsMap_.emplace(
std::string(key), StatsInfo{statType, &counter});
} break;
case facebook::velox::StatType::SUM:
case facebook::velox::StatType::AVG:
case facebook::velox::StatType::RATE: {
auto& gaugeFamily = ::prometheus::BuildGauge()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);
auto& gauge = gaugeFamily.Add(impl_->labels);
registeredMetricsMap_.emplace(
std::string(key), StatsInfo{statType, &gauge});
} break;
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}", velox::statTypeString(statType));
}
}

void PrometheusStatsReporter::registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const {
registerMetricExportType(key.toString().c_str(), statType);
}

void PrometheusStatsReporter::registerHistogramMetricExportType(
const char* key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
if (registeredMetricsMap_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
return;
}
auto numBuckets = (max - min) / bucketWidth;
auto bound = min + bucketWidth;
std::string sanitizedMetricKey = std::string(key);
// '.' is replaced with '_'.
std::replace(sanitizedMetricKey.begin(), sanitizedMetricKey.end(), '.', '_');

auto& histogramFamily = ::prometheus::BuildHistogram()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);

::prometheus::Histogram::BucketBoundaries bucketBoundaries;
while (numBuckets > 0) {
bucketBoundaries.push_back(bound);
bound += bucketWidth;
numBuckets--;
}
VELOX_CHECK_GE(bucketBoundaries.size(), 1);
auto& histogramMetric = histogramFamily.Add(impl_->labels, bucketBoundaries);

registeredMetricsMap_.emplace(
key, StatsInfo{velox::StatType::HISTOGRAM, &histogramMetric});
// If percentiles are provided, create a Summary type metric and register.
if (pcts.size() > 0) {
auto summaryMetricKey = sanitizedMetricKey + std::string(kSummarySuffix);
auto& summaryFamily = ::prometheus::BuildSummary()
.Name(summaryMetricKey)
.Register(*impl_->registry);
::prometheus::Summary::Quantiles quantiles;
for (auto pct : pcts) {
quantiles.push_back(
::prometheus::detail::CKMSQuantiles::Quantile(pct / (double)100, 0));
}
auto& summaryMetric = summaryFamily.Add({impl_->labels}, quantiles);
registeredMetricsMap_.emplace(
std::string(key).append(kSummarySuffix),
StatsInfo{velox::StatType::HISTOGRAM, &summaryMetric});
}
}

void PrometheusStatsReporter::registerHistogramMetricExportType(
folly::StringPiece key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
registerHistogramMetricExportType(
key.toString().c_str(), bucketWidth, min, max, pcts);
}

void PrometheusStatsReporter::addMetricValue(
const std::string& key,
size_t value) const {
addMetricValue(key.c_str(), value);
}

void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue called for unregistered metric " << key;
return;
}
auto statsInfo = metricIterator->second;
switch (statsInfo.statType) {
case velox::StatType::COUNT: {
auto counter =
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
counter->Increment(value);
} break;
case velox::StatType::SUM:
case velox::StatType::AVG:
case velox::StatType::RATE: {
// Overrides the existing state.
auto gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Set(value);
} break;
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}",
velox::statTypeString(statsInfo.statType));
};
}

void PrometheusStatsReporter::addMetricValue(
folly::StringPiece key,
size_t value) const {
addMetricValue(key.toString().c_str(), value);
}

void PrometheusStatsReporter::addHistogramMetricValue(
const std::string& key,
size_t value) const {
addHistogramMetricValue(key.c_str(), value);
}

void PrometheusStatsReporter::addHistogramMetricValue(
const char* key,
size_t value) const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
return;
}
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
metricIterator->second.metricPtr);
histogram->Observe(value);

std::string summaryKey = std::string(key).append(kSummarySuffix);
metricIterator = registeredMetricsMap_.find(summaryKey);
if (metricIterator != registeredMetricsMap_.end()) {
auto summary = reinterpret_cast<::prometheus::Summary*>(
metricIterator->second.metricPtr);
summary->Observe(value);
}
}

void PrometheusStatsReporter::addHistogramMetricValue(
folly::StringPiece key,
size_t value) const {
addHistogramMetricValue(key.toString().c_str(), value);
}

std::string PrometheusStatsReporter::fetchMetrics() {
if (registeredMetricsMap_.empty()) {
return "";
}
::prometheus::TextSerializer serializer;
// Registry::Collect() acquires lock on a mutex.
return serializer.Serialize(impl_->registry->Collect());
}

}; // namespace facebook::presto::prometheus
Loading

0 comments on commit c73c5aa

Please sign in to comment.