Skip to content

Commit

Permalink
Prometheus serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Karteekmurthys committed Jan 31, 2024
1 parent c739d83 commit 5c510e7
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 30 deletions.
10 changes: 8 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void PrestoServer::run() {
exit(EXIT_FAILURE);
}

if(systemConfig->enableRuntimeStatsCollection()) {
if (systemConfig->enableRuntimeMetricsCollection()) {
// This flag must be set to register the counters.
facebook::velox::BaseStatsReporter::registered = true;
}
Expand Down Expand Up @@ -1109,7 +1109,13 @@ void PrestoServer::reportServerInfo(proxygen::ResponseHandler* downstream) {
void PrestoServer::reportHealthMetrics(proxygen::ResponseHandler* downstream) {
auto reporter = std::dynamic_pointer_cast<StatsReporterImpl>(
folly::Singleton<facebook::velox::BaseStatsReporter>::try_get());
http::sendOkResponse(downstream, reporter->getMetricsForPrometheus());
auto nodeConfig = facebook::presto::NodeConfig::instance();
std::string cluster = nodeConfig->nodeEnvironment();
char* hostName = std::getenv("HOSTNAME");
std::string worker = !hostName ? "" : hostName;
prometheus::PrometheusSerializer serializer(
prometheus::Labels{{"cluster", cluster}, {"worker", worker}});
http::sendOkResponse(downstream, reporter->getMetrics(serializer));
}
void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
Expand Down
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ std::chrono::duration<double> SystemConfig::cacheVeloxTtlCheckInterval() const {
}

bool SystemConfig::enableRuntimeMetricsCollection() const {
return optionalProperty<bool>(kEnableRuntimeMetricsCollection).value();
return optionalProperty<bool>(kEnableRuntimeMetricsCollection)
.value_or(false);
}

NodeConfig::NodeConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ void StatsReporterImpl::registerMetricExportType(
metricsMap_.emplace(key, 0);
}

const uint64_t getCurrentEpochTimestamp() {
auto p1 = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::seconds>(p1.time_since_epoch())
.count();
}

void StatsReporterImpl::addMetricValue(const char* key, size_t value) const {
std::lock_guard<std::mutex> lock(mutex_);
auto it = registeredStats_.find(key);
Expand All @@ -62,23 +56,10 @@ void StatsReporterImpl::addMetricValue(folly::StringPiece key, size_t value)
addMetricValue(key.start(), value);
}

const std::string StatsReporterImpl::getMetricsForPrometheus() {
const std::string StatsReporterImpl::getMetrics(
const MetricsSerializer& serializer) {
std::lock_guard<std::mutex> lock(mutex_);
std::stringstream ss;
for (const auto metric : metricsMap_) {
auto metricName = metric.first;
std::replace(metricName.begin(), metricName.end(), '.', '_');
auto statType = registeredStats_[metric.first];
ss << "# HELP " << metricName << std::endl;
std::string statTypeStr = "gauge";
if (statType == facebook::velox::StatType::COUNT) {
statTypeStr = "counter";
}
ss << "# TYPE " << metricName << " " << statTypeStr << std::endl;
ss << metricName << "{cluster=\"" << cluster_ << "\""
<< ",worker=\"" << workerPod_ << "\"} " << metric.second << std::endl;
}
return ss.str();
return serializer.serialize(registeredStats_, metricsMap_);
}

// Initialize singleton for the reporter
Expand Down
58 changes: 55 additions & 3 deletions presto-native-execution/presto_cpp/main/common/StatsReporterImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,60 @@
namespace facebook::presto {

class MetricsSerializer {
facebook::velox::StringView getMetrics();
public:
virtual std::string serialize(
const std::unordered_map<std::string, facebook::velox::StatType>&
metricStatTypes,
const std::unordered_map<std::string, size_t>& metricValues) const = 0;
};

namespace prometheus {
using Labels = std::unordered_map<std::string, std::string>;
class PrometheusSerializer : public MetricsSerializer {
public:
explicit PrometheusSerializer(const Labels& labels) : labels_(labels) {}

std::string serialize(
const std::unordered_map<std::string, facebook::velox::StatType>&
metricStatTypes,
const std::unordered_map<std::string, size_t>& metricValues) const {
std::stringstream ss;
for (const auto metric : metricValues) {
auto metricName = metric.first;
std::replace(metricName.begin(), metricName.end(), '.', '_');
auto statType = metricStatTypes.find(metric.first)->second;
ss << "# HELP " << metricName << std::endl;
std::string statTypeStr = "gauge";
if (statType == facebook::velox::StatType::COUNT) {
statTypeStr = "counter";
}
ss << "# TYPE " << metricName << " " << statTypeStr << std::endl;
int i = 0;
ss << metricName << "{";
for (auto& label : labels_) {
ss << label.first << "=\"" << label.second << "\"";
if (i < labels_.size() - 1) {
// Comma separated labels.
ss << ",";
}
++i;
}
ss << "} " << metric.second << std::endl;
}
return ss.str();
}

private:
// A map of labels assigned to each metric which helps in filtering at client
// end.
const Labels labels_;
};
} // namespace prometheus.

/// An implementation of BaseStatsReporter which gathers runtime metrics and
/// and maintains them in-memory. Users can call
/// StatsReporterImpl::getMetrics(MetricSerializer) to get metrics in custom
/// formatted string.
class StatsReporterImpl : public facebook::velox::BaseStatsReporter {
public:
StatsReporterImpl(
Expand Down Expand Up @@ -106,14 +158,14 @@ class StatsReporterImpl : public facebook::velox::BaseStatsReporter {
* Above info is from:
* https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
*/
const std::string getMetricsForPrometheus();
const std::string getMetrics(const MetricsSerializer& serializer);

private:
/// Mapping of registered stats key to StatType.
mutable std::unordered_map<std::string, facebook::velox::StatType>
registeredStats_;
/// A mapping from stats key of type COUNT to value.
mutable std::unordered_map<std::string, int64_t> metricsMap_;
mutable std::unordered_map<std::string, size_t> metricsMap_;
// Mutex to control access to registeredStats_ and metricMap_ members.
mutable std::mutex mutex_;
std::string cluster_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class StatsReporterImplTest : public testing::Test {
};

/// Tests addStatType and addStats functions.
TEST_F(StatsReporterImplTest, addStats) {
TEST_F(StatsReporterImplTest, prometheusSerializer) {
auto reporter =
std::make_shared<StatsReporterImpl>("test_cluster", "test_worker_pod");

Expand All @@ -45,7 +45,9 @@ TEST_F(StatsReporterImplTest, addStats) {
}
// Uses default value of 1 for second parameter.
reporter->addMetricValue("key1");
auto prometheusFormat = reporter->getMetricsForPrometheus();
prometheus::PrometheusSerializer serializer(prometheus::Labels{
{"cluster", "test_cluster"}, {"worker", "test_worker_pod"}});
auto prometheusFormat = reporter->getMetrics(serializer);
const std::string expected[] = {
"# HELP key2",
"# TYPE key2 gauge",
Expand Down

0 comments on commit 5c510e7

Please sign in to comment.