Skip to content

Commit

Permalink
#2389: Per service gauge counter for number of enqueued rpcs
Browse files Browse the repository at this point in the history
Summary:
  Example Output: curl -s 127.0.0.1:9000/prometheus-metrics | grep rpcs_in_queue
			rpcs_in_queue_yb_tserver_TabletServerBackupService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781
			rpcs_in_queue_yb_tserver_TabletServerService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781
			rpcs_in_queue_yb_tserver_TabletServerAdminService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781
			rpcs_in_queue_yb_consensus_ConsensusService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 1 1575514867781
			rpcs_in_queue_yb_tserver_RemoteBootstrapService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781
			rpcs_in_queue_yb_cdc_CDCService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781
			rpcs_in_queue_yb_server_GenericService{exported_instance="DEFAULT_NODE_NAME",metric_id="yb.tabletserver",metric_type="server"} 0 1575514867781

Test Plan: Jenkins

Reviewers: bogdan, mikhail, sergei

Reviewed By: sergei

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D7658
  • Loading branch information
rajukumaryb committed Dec 17, 2019
1 parent 13c6a0d commit d21147a
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 7 deletions.
15 changes: 15 additions & 0 deletions src/yb/rpc/service_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ class ServicePoolImpl final : public InboundCallHandler {
rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
check_timeout_strand_(scheduler->io_service()),
log_prefix_(Format("$0: ", service_->service_name())) {

// Create per service counter for rpcs_in_queue_.
auto id = Format("rpcs_in_queue_$0", service_->service_name());
EscapeMetricNameForPrometheus(&id);
string description = id + " metric for ServicePoolImpl";
rpcs_in_queue_ = entity->FindOrCreateGauge(
std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>(
entity->prototype().name(), std::move(id),
description, MetricUnit::kRequests, description)),
static_cast<int64>(0) /* initial_value */);

LOG_WITH_PREFIX(INFO) << "yb::rpc::ServicePoolImpl created at " << this;
}

~ServicePoolImpl() {
Expand Down Expand Up @@ -378,11 +390,13 @@ class ServicePoolImpl final : public InboundCallHandler {
return false;
}

rpcs_in_queue_->Increment();
return true;
}

void CallDequeued() override {
queued_calls_.fetch_sub(1, std::memory_order_relaxed);
rpcs_in_queue_->Decrement();
}

const size_t max_queued_calls_;
Expand All @@ -393,6 +407,7 @@ class ServicePoolImpl final : public InboundCallHandler {
scoped_refptr<Counter> rpcs_timed_out_in_queue_;
scoped_refptr<Counter> rpcs_timed_out_early_in_queue_;
scoped_refptr<Counter> rpcs_queue_overflow_;
scoped_refptr<AtomicGauge<int64_t>> rpcs_in_queue_;
// Have to use CoarseDuration here, since CoarseTimePoint does not work with clang + libstdc++
std::atomic<CoarseDuration> last_backpressure_at_{CoarseTimePoint().time_since_epoch()};
std::atomic<int64_t> queued_calls_{0};
Expand Down
3 changes: 3 additions & 0 deletions src/yb/rpc/service_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@

namespace yb {

template<class T>
class AtomicGauge;

class Counter;
class Histogram;
class MetricEntity;
Expand Down
4 changes: 3 additions & 1 deletion src/yb/server/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ RpcServer::RpcServer(const std::string& name, RpcServerOptions opts,
: name_(name),
server_state_(UNINITIALIZED),
options_(std::move(opts)),
connection_context_factory_(std::move(connection_context_factory)) {}
connection_context_factory_(std::move(connection_context_factory)) {
LOG(INFO) << "yb::server::RpcServer created at " << this;
}

RpcServer::~RpcServer() {
Shutdown();
Expand Down
5 changes: 5 additions & 0 deletions src/yb/tserver/tablet_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,19 @@ void TabletServer::AutoInitServiceFlags() {

Status TabletServer::RegisterServices() {
tablet_server_service_ = new TabletServiceImpl(this);
LOG(INFO) << "yb::tserver::TabletServiceImpl created at " << tablet_server_service_;
std::unique_ptr<ServiceIf> ts_service(tablet_server_service_);
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_tablet_server_svc_queue_length,
std::move(ts_service)));

std::unique_ptr<ServiceIf> admin_service(new TabletServiceAdminImpl(this));
LOG(INFO) << "yb::tserver::TabletServiceAdminImpl created at " << admin_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_admin_svc_queue_length,
std::move(admin_service)));

std::unique_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(metric_entity(),
tablet_manager_.get()));
LOG(INFO) << "yb::tserver::ConsensusServiceImpl created at " << consensus_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_consensus_svc_queue_length,
std::move(consensus_service),
rpc::ServicePriority::kHigh));
Expand All @@ -301,6 +304,8 @@ Status TabletServer::RegisterServices() {
std::make_unique<enterprise::RemoteBootstrapServiceImpl>(fs_manager_.get(),
tablet_manager_.get(),
metric_entity());
LOG(INFO) << "yb::tserver::RemoteBootstrapServiceImpl created at " <<
remote_bootstrap_service.get();
RETURN_NOT_OK(RpcAndWebServerBase::RegisterService(FLAGS_ts_remote_bootstrap_svc_queue_length,
std::move(remote_bootstrap_service)));
return Status::OK();
Expand Down
4 changes: 1 addition & 3 deletions src/yb/util/mem_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ std::string CreateMetricName(const MemTracker& mem_tracker) {
return "mem_tracker";
}
std::string id = mem_tracker.id();
std::replace(id.begin(), id.end(), ' ', '_');
std::replace(id.begin(), id.end(), '.', '_');
std::replace(id.begin(), id.end(), '-', '_');
EscapeMetricNameForPrometheus(&id);
if (mem_tracker.parent()) {
return CreateMetricName(*mem_tracker.parent()) + "_" + id;
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/yb/util/metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,4 +899,11 @@ void ScopedLatencyMetric::Finish() {
}
}

// Replace specific chars with underscore to pass PrometheusNameRegex().
void EscapeMetricNameForPrometheus(std::string *id) {
std::replace(id->begin(), id->end(), ' ', '_');
std::replace(id->begin(), id->end(), '.', '_');
std::replace(id->begin(), id->end(), '-', '_');
}

} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/util/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,9 @@ class OwningGaugePrototype : public OwningMetricCtorArgs, public GaugePrototype<
flags)) {}
};

// Replace specific chars with underscore to pass PrometheusNameRegex().
void EscapeMetricNameForPrometheus(std::string *id);

} // namespace yb

#endif // YB_UTIL_METRICS_H
4 changes: 1 addition & 3 deletions src/yb/util/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ scoped_refptr<AtomicGauge<uint64>> ThreadCategoryTracker::FindOrCreateGauge(
const string& category) {
if (gauges_.find(category) == gauges_.end()) {
string id = name_ + "_" + category;
std::replace(id.begin(), id.end(), ' ', '_');
std::replace(id.begin(), id.end(), '.', '_');
std::replace(id.begin(), id.end(), '-', '_');
EscapeMetricNameForPrometheus(&id);
const string description = id + " metric in ThreadCategoryTracker";
std::unique_ptr<GaugePrototype<uint64>> gauge = std::make_unique<OwningGaugePrototype<uint64>>(
"server", id, description, yb::MetricUnit::kThreads, description, yb::EXPOSE_AS_COUNTER);
Expand Down

0 comments on commit d21147a

Please sign in to comment.