Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Expose REST API to fetch worker stats in Prometheus format #22360

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ jobs:
-DPRESTO_ENABLE_PARQUET=ON \
-DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON \
-DPRESTO_ENABLE_JWT=ON \
-DPRESTO_ENABLE_PROMETHEUS_REPORTER=ON \
karteekmurthys marked this conversation as resolved.
Show resolved Hide resolved
-DCMAKE_PREFIX_PATH=/usr/local \
-DCMAKE_CXX_COMPILER_LAUNCHER=ccache \
-DMAX_LINK_JOBS=2
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ option(PRESTO_ENABLE_TESTING "Enable tests" ON)

option(PRESTO_ENABLE_JWT "Enable JWT (JSON Web Token) authentication" OFF)

option(PRESTO_ENABLE_PROMETHEUS_REPORTER
karteekmurthys marked this conversation as resolved.
Show resolved Hide resolved
"Enables capturing of runtime metrics using prometheus client" OFF)

# Set all Velox options below
add_compile_definitions(FOLLY_HAVE_INT128_T=1)

Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ endif
ifeq ($(PRESTO_ENABLE_JWT), ON)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_JWT=ON
endif
ifeq ($(PRESTO_ENABLE_PROMETHEUS_REPORTER), ON)
EXTRA_CMAKE_FLAGS += -DPRESTO_ENABLE_PROMETHEUS_REPORTER=ON
endif

CMAKE_FLAGS := -DTREAT_WARNINGS_AS_ERRORS=${TREAT_WARNINGS_AS_ERRORS}
CMAKE_FLAGS += -DENABLE_ALL_WARNINGS=${ENABLE_WALL}
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ discovery.uri=http://127.0.0.1:58215
presto.version=testversion
http-server.http.port=7777
shutdown-onset-sec=1
register-test-functions=true
register-test-functions=true
runtime-metrics-collection-enabled=true
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,8 @@ set_property(TARGET presto_server PROPERTY JOB_POOL_LINK presto_link_job_pool)
if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()

if(PRESTO_ENABLE_PROMETHEUS_REPORTER)
add_subdirectory(runtime-metrics)
target_link_libraries(presto_server prometheus_reporter)
endif()
6 changes: 0 additions & 6 deletions presto-native-execution/presto_cpp/main/PrestoMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <glog/logging.h>
#include "presto_cpp/main/PrestoServer.h"
#include "presto_cpp/main/common/Utils.h"
#include "velox/common/base/StatsReporter.h"

DEFINE_string(etc_dir, ".", "etc directory for presto configuration");

Expand All @@ -30,8 +29,3 @@ int main(int argc, char* argv[]) {
presto.run();
PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()";
}

// Initialize singleton for the reporter.
folly::Singleton<facebook::velox::BaseStatsReporter> reporter([]() {
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
return new facebook::velox::DummyStatsReporter();
});
21 changes: 16 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ void PrestoServer::run() {
baseVeloxQueryConfig->initialize(
fmt::format("{}/velox.properties", configDirectoryPath_), true);

if (systemConfig->enableRuntimeMetricsCollection()) {
enableRuntimeMetricReporting();
}

httpPort = systemConfig->httpServerHttpPort();
if (systemConfig->httpServerHttpsEnabled()) {
httpsPort = systemConfig->httpServerHttpsPort();
Expand Down Expand Up @@ -372,6 +368,21 @@ void PrestoServer::run() {
.sendWithEOM();
});

if (systemConfig->enableRuntimeMetricsCollection()) {
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
enableWorkerStatsReporting();
if (folly::Singleton<velox::BaseStatsReporter>::try_get()) {
httpServer_->registerGet(
"/v1/info/metrics",
[](proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(
downstream,
folly::Singleton<velox::BaseStatsReporter>::try_get()
->fetchMetrics());
});
}
}
registerFunctions();
registerRemoteFunctions();
registerVectorSerdes();
Expand Down Expand Up @@ -1171,7 +1182,7 @@ std::string PrestoServer::getBaseSpillDirectory() const {
return SystemConfig::instance()->spillerSpillPath().value_or("");
}

void PrestoServer::enableRuntimeMetricReporting() {
void PrestoServer::enableWorkerStatsReporting() {
// This flag must be set to register the counters.
facebook::velox::BaseStatsReporter::registered = true;
registerStatsCounters();
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class PrestoServer {
virtual std::string getBaseSpillDirectory() const;

/// Invoked to enable stats reporting and register counters.
virtual void enableRuntimeMetricReporting();
virtual void enableWorkerStatsReporting();
majetideepak marked this conversation as resolved.
Show resolved Hide resolved

/// Invoked to get the list of filters passed to the http server.
std::vector<std::unique_ptr<proxygen::RequestHandlerFactory>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Add as a source to presto_server to allow global reporter Singleton
# initialization.

find_package(prometheus-cpp CONFIG REQUIRED)

# Prepare a static library to link with prometheus_reporter_test
add_library(prometheus_reporter OBJECT PrometheusStatsReporter.cpp)
target_link_libraries(prometheus_reporter presto_common prometheus-cpp::core)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/runtime-metrics/PrometheusStatsReporter.h"

#include <prometheus/collectable.h>
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
#include <prometheus/histogram.h>
#include <prometheus/registry.h>
#include <prometheus/summary.h>
#include <prometheus/text_serializer.h>

namespace facebook::presto::prometheus {

// Initialize singleton for the reporter
folly::Singleton<facebook::velox::BaseStatsReporter> reporter(
[]() -> facebook::velox::BaseStatsReporter* {
return facebook::presto::prometheus::PrometheusStatsReporter::
createPrometheusReporter()
.release();
});

static constexpr std::string_view kSummarySuffix("_summary");

struct PrometheusStatsReporter::PrometheusImpl {
explicit PrometheusImpl(const ::prometheus::Labels& labels) {
registry = std::make_shared<::prometheus::Registry>();
for (const auto& itr : labels) {
this->labels[itr.first] = itr.second;
}
}

::prometheus::Labels labels;
std::shared_ptr<::prometheus::Registry> registry;
};

PrometheusStatsReporter::PrometheusStatsReporter(
const std::map<std::string, std::string>& labels) {
impl_ = std::make_shared<PrometheusImpl>(labels);
}

void PrometheusStatsReporter::registerMetricExportType(
const char* key,
facebook::velox::StatType statType) const {
if (registeredMetricsMap_.count(key)) {
VLOG(1) << "Trying to register already registered metric " << key;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
return;
}
// '.' is replaced with '_'.
std::string sanitizedMetricKey = std::string(key);
std::replace(sanitizedMetricKey.begin(), sanitizedMetricKey.end(), '.', '_');
switch (statType) {
case facebook::velox::StatType::COUNT: {
// A new MetricFamily object is built for every new metric key.
auto& counterFamily = ::prometheus::BuildCounter()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);
auto& counter = counterFamily.Add(impl_->labels);
registeredMetricsMap_.emplace(
std::string(key), StatsInfo{statType, &counter});
} break;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
case facebook::velox::StatType::SUM:
case facebook::velox::StatType::AVG:
case facebook::velox::StatType::RATE: {
auto& gaugeFamily = ::prometheus::BuildGauge()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);
auto& gauge = gaugeFamily.Add(impl_->labels);
registeredMetricsMap_.emplace(
std::string(key), StatsInfo{statType, &gauge});
} break;
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}", velox::statTypeString(statType));
}
}

void PrometheusStatsReporter::registerMetricExportType(
folly::StringPiece key,
facebook::velox::StatType statType) const {
registerMetricExportType(key.toString().c_str(), statType);
}

void PrometheusStatsReporter::registerHistogramMetricExportType(
const char* key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
if (registeredMetricsMap_.count(key)) {
// Already registered;
VLOG(1) << "Trying to register already registered metric " << key;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
return;
}
auto numBuckets = (max - min) / bucketWidth;
auto bound = min + bucketWidth;
std::string sanitizedMetricKey = std::string(key);
// '.' is replaced with '_'.
std::replace(sanitizedMetricKey.begin(), sanitizedMetricKey.end(), '.', '_');

auto& histogramFamily = ::prometheus::BuildHistogram()
.Name(sanitizedMetricKey)
.Register(*impl_->registry);

::prometheus::Histogram::BucketBoundaries bucketBoundaries;
while (numBuckets > 0) {
bucketBoundaries.push_back(bound);
bound += bucketWidth;
numBuckets--;
}
VELOX_CHECK_GE(bucketBoundaries.size(), 1);
auto& histogramMetric = histogramFamily.Add(impl_->labels, bucketBoundaries);

registeredMetricsMap_.emplace(
key, StatsInfo{velox::StatType::HISTOGRAM, &histogramMetric});
// If percentiles are provided, create a Summary type metric and register.
if (pcts.size() > 0) {
auto summaryMetricKey = sanitizedMetricKey + std::string(kSummarySuffix);
auto& summaryFamily = ::prometheus::BuildSummary()
.Name(summaryMetricKey)
.Register(*impl_->registry);
::prometheus::Summary::Quantiles quantiles;
for (auto pct : pcts) {
quantiles.push_back(
::prometheus::detail::CKMSQuantiles::Quantile(pct / (double)100, 0));
}
auto& summaryMetric = summaryFamily.Add({impl_->labels}, quantiles);
registeredMetricsMap_.emplace(
std::string(key).append(kSummarySuffix),
StatsInfo{velox::StatType::HISTOGRAM, &summaryMetric});
}
}

void PrometheusStatsReporter::registerHistogramMetricExportType(
folly::StringPiece key,
int64_t bucketWidth,
int64_t min,
int64_t max,
const std::vector<int32_t>& pcts) const {
registerHistogramMetricExportType(
key.toString().c_str(), bucketWidth, min, max, pcts);
}

void PrometheusStatsReporter::addMetricValue(
const std::string& key,
size_t value) const {
addMetricValue(key.c_str(), value);
}

void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue called for unregistered metric " << key;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
return;
}
auto statsInfo = metricIterator->second;
switch (statsInfo.statType) {
case velox::StatType::COUNT: {
auto counter =
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
counter->Increment(value);
} break;
case velox::StatType::SUM:
case velox::StatType::AVG:
case velox::StatType::RATE: {
// Overrides the existing state.
auto gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Set(value);
} break;
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}",
velox::statTypeString(statsInfo.statType));
};
}

void PrometheusStatsReporter::addMetricValue(
folly::StringPiece key,
size_t value) const {
addMetricValue(key.toString().c_str(), value);
}

void PrometheusStatsReporter::addHistogramMetricValue(
const std::string& key,
size_t value) const {
addHistogramMetricValue(key.c_str(), value);
}

void PrometheusStatsReporter::addHistogramMetricValue(
const char* key,
size_t value) const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
majetideepak marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addMetricValue -> addHistogramMetricValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return;
}
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
metricIterator->second.metricPtr);
histogram->Observe(value);

std::string summaryKey = std::string(key).append(kSummarySuffix);
metricIterator = registeredMetricsMap_.find(summaryKey);
if (metricIterator != registeredMetricsMap_.end()) {
auto summary = reinterpret_cast<::prometheus::Summary*>(
metricIterator->second.metricPtr);
summary->Observe(value);
}
}

void PrometheusStatsReporter::addHistogramMetricValue(
folly::StringPiece key,
size_t value) const {
addHistogramMetricValue(key.toString().c_str(), value);
}

std::string PrometheusStatsReporter::fetchMetrics() {
if (registeredMetricsMap_.empty()) {
return "";
}
::prometheus::TextSerializer serializer;
// Registry::Collect() acquires lock on a mutex.
return serializer.Serialize(impl_->registry->Collect());
}

}; // namespace facebook::presto::prometheus
Loading
Loading