Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](meta-service) Support querying and adjusting rpc qps limit on meta service #42413

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <brpc/uri.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
#include <glog/logging.h>
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>
#include <google/protobuf/util/json_util.h>
Expand All @@ -30,8 +31,14 @@
#include <rapidjson/prettywriter.h>
#include <rapidjson/stringbuffer.h>

#include <algorithm>
#include <array>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <type_traits>
#include <variant>
#include <vector>
Expand All @@ -42,6 +49,7 @@
#include "meta-service/txn_kv.h"
#include "meta-service/txn_kv_error.h"
#include "meta_service.h"
#include "rate-limiter/rate_limiter.h"

namespace doris::cloud {

Expand Down Expand Up @@ -333,6 +341,113 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller
return http_json_reply(resp.status());
}

static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
const auto& uri = cntl->http_request().uri();
auto qps_limit_str = std::string {http_query(uri, "qps_limit")};
auto rpc_name = std::string {http_query(uri, "rpc_name")};
auto instance_id = std::string {http_query(uri, "instance_id")};

auto process_set_qps_limit = [&](std::function<bool(int64_t)> cb) -> HttpResponse {
DCHECK(!qps_limit_str.empty());
int64_t qps_limit = -1;
try {
qps_limit = std::stoll(qps_limit_str);
} catch (const std::exception& ex) {
return http_json_reply(
MetaServiceCode::INVALID_ARGUMENT,
fmt::format("param `qps_limit` is not a legal int64 type:{}", ex.what()));
}
if (qps_limit < 0) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
"`qps_limit` should not be less than 0");
}
if (cb(qps_limit)) {
return http_json_reply(MetaServiceCode::OK, "sucess to adjust rate limit");
}
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
fmt::format("failed to adjust rate limit for qps_limit={}, "
"rpc_name={}, instance_id={}, plz ensure correct "
"rpc/instance name",
qps_limit_str, rpc_name, instance_id));
};

auto set_global_qps_limit = [process_set_qps_limit, service]() {
return process_set_qps_limit([service](int64_t qps_limit) {
return service->rate_limiter()->set_rate_limit(qps_limit);
});
};

auto set_rpc_qps_limit = [&]() {
return process_set_qps_limit([&](int64_t qps_limit) {
return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name);
});
};

auto set_instance_qps_limit = [&]() {
return process_set_qps_limit([&](int64_t qps_limit) {
return service->rate_limiter()->set_instance_rate_limit(qps_limit, instance_id);
});
};

auto set_instance_rpc_qps_limit = [&]() {
return process_set_qps_limit([&](int64_t qps_limit) {
return service->rate_limiter()->set_rate_limit(qps_limit, rpc_name, instance_id);
});
};

auto process_invalid_arguments = [&]() -> HttpResponse {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
fmt::format("invalid argument: qps_limit(required)={}, "
"rpc_name(optional)={}, instance_id(optional)={}",
qps_limit_str, rpc_name, instance_id));
};

// We have 3 optional params and 2^3 combination, and 4 of them are illegal.
// We register callbacks for them in porcessors accordings to the level, represented by 3 bits.
std::array<std::function<HttpResponse()>, 8> processors;
std::fill_n(processors.begin(), 8, std::move(process_invalid_arguments));
processors[0b001] = std::move(set_global_qps_limit);
processors[0b011] = std::move(set_rpc_qps_limit);
processors[0b101] = std::move(set_instance_qps_limit);
processors[0b111] = std::move(set_instance_rpc_qps_limit);

uint8_t level = (0x01 & qps_limit_str.empty()) | ((0x01 & rpc_name.empty()) << 1) |
((0x01 & instance_id.empty()) << 2);

DCHECK_LT(level, 8);

return processors[level]();
}

static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
auto rate_limiter = service->rate_limiter();
rapidjson::Document d;
auto get_qps_limit = [&d](std::string_view rpc_name,
std::shared_ptr<RpcRateLimiter> rpc_limiter) {
rapidjson::Document node;
rapidjson::Document sub;
auto get_qps_token_limit = [&](std::string_view instance_id,
std::shared_ptr<RpcRateLimiter::QpsToken> qps_token) {
sub.AddMember(rapidjson::StringRef(instance_id.data(), instance_id.size()),
qps_token->max_qps_limit(), d.GetAllocator());
};
rpc_limiter->for_each_qps_token(std::move(get_qps_token_limit));

auto max_qps_limit = std::to_string(rpc_limiter->max_qps_limit());
node.AddMember("RPC qps limit",
rapidjson::StringRef(max_qps_limit.data(), max_qps_limit.size()),
d.GetAllocator());
node.AddMember("instance specific qps limit", sub, d.GetAllocator());
d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()), node, d.GetAllocator());
};
rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit));

rapidjson::StringBuffer sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
d.Accept(writer);
return http_json_reply(MetaServiceCode::OK, sb.GetString());
}

static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string_view key = http_query(uri, "key");
Expand Down Expand Up @@ -615,13 +730,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"abort_tablet_job", process_abort_tablet_job},
{"alter_ram_user", process_alter_ram_user},
{"alter_iam", process_alter_iam},
{"adjust_rate_limit", process_adjust_rate_limit},
{"list_rate_limit", process_query_rate_limit},
{"v1/abort_txn", process_abort_txn},
{"v1/abort_tablet_job", process_abort_tablet_job},
{"v1/alter_ram_user", process_alter_ram_user},
{"v1/alter_iam", process_alter_iam},
{"v1/adjust_rate_limit", process_adjust_rate_limit},
{"v1/list_rate_limit", process_query_rate_limit},
};

auto cntl = static_cast<brpc::Controller*>(controller);
auto* cntl = static_cast<brpc::Controller*>(controller);
brpc::ClosureGuard closure_guard(done);

// Prepare input request info
Expand Down
115 changes: 105 additions & 10 deletions cloud/src/rate-limiter/rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

#include "rate_limiter.h"

#include <bthread/mutex.h>
#include <butil/strings/string_split.h>

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ranges>
#include <shared_mutex>

#include "common/bvars.h"
#include "common/config.h"
#include "common/configbase.h"

namespace doris::cloud {

void RateLimiter::init(google::protobuf::Service* service) {
std::map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const std::string& list_str) {
std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
std::vector<std::string> max_qps_limit_list;
butil::SplitString(config::specific_max_qps_limit, ';', &max_qps_limit_list);
butil::SplitString(list_str, ';', &max_qps_limit_list);
for (const auto& v : max_qps_limit_list) {
auto p = v.find(':');
if (p != std::string::npos && p != (v.size() - 1)) {
Expand All @@ -41,36 +46,92 @@ void RateLimiter::init(google::protobuf::Service* service) {
int64_t max_qps_limit = std::stoll(v.substr(p + 1));
if (max_qps_limit > 0) {
rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " << max_qps_limit;
}
} catch (...) {
LOG(WARNING) << "failed to set max_qps_limit to rpc: " << rpc_name
LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name
<< " config: " << v;
}
}
}
return rpc_name_to_max_qps_limit;
}

template <typename Callable>
void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
auto method_size = service->GetDescriptor()->method_count();
for (auto i = 0; i < method_size; ++i) {
std::string rpc_name = service->GetDescriptor()->method(i)->name();
int64_t max_qps_limit = config::default_max_qps_limit;
cb(rpc_name);
}
}

auto it = rpc_name_to_max_qps_limit.find(rpc_name);
if (it != rpc_name_to_max_qps_limit.end()) {
void RateLimiter::init(google::protobuf::Service* service) {
auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit);
std::unique_lock write_lock(mutex_);
for_each_rpc_name(service, [&](const std::string& rpc_name) {
auto it = rpc_name_to_specific_limit.find(rpc_name);
int64_t max_qps_limit = config::default_max_qps_limit;
if (it != rpc_name_to_specific_limit.end()) {
max_qps_limit = it->second;
}
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, max_qps_limit);
});
for (const auto& [k, _] : rpc_name_to_specific_limit) {
rpc_with_specific_limit_.insert(k);
}
}

std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) {
// no need to be locked, because it is only modified during initialization
auto it = limiters_.find(rpc_name);
if (it == limiters_.end()) {
return nullptr;
}
return it->second;
}

bool RateLimiter::set_rate_limit(int64_t qps_limit) {
std::lock_guard lock(mutex_);
auto filter = [this](const auto& kv) { return !rpc_with_specific_limit_.contains(kv.first); };
for (const auto& [_, v] : limiters_ | std::views::filter(std::move(filter))) {
v->set_max_qps_limit(qps_limit);
}
return true;
}

bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name) {
if (!limiters_.contains(rpc_name)) {
return false;
}
auto limiter = limiters_.at(rpc_name);
std::lock_guard lock(mutex_);
limiter->set_max_qps_limit(qps_limit);
rpc_with_specific_limit_.insert(rpc_name);
return true;
}

bool RateLimiter::set_rate_limit(int64_t qps_limit, const std::string& rpc_name,
const std::string& instance_id) {
if (!limiters_.contains(rpc_name)) {
return false;
}
auto limiter = limiters_.at(rpc_name);
return limiter->set_max_qps_limit(qps_limit, instance_id);
}

bool RateLimiter::set_instance_rate_limit(int64_t qps_limit, const std::string& instance_id) {
return std::ranges::all_of(limiters_, [&](const auto& kv) {
return kv.second->set_max_qps_limit(qps_limit, instance_id);
});
return true;
}

void RateLimiter::for_each_rpc_limiter(
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb) {
for (const auto& [rpc_name, rpc_limiter] : limiters_) {
cb(rpc_name, rpc_limiter);
}
}

bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
std::function<int()>& get_bvar_qps) {
if (!config::use_detailed_metrics || instance_id.empty()) {
Expand All @@ -93,6 +154,35 @@ bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
return qps_token->get_token(get_bvar_qps);
}

void RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit) {
std::lock_guard<bthread::Mutex> l(mutex_);
max_qps_limit_ = max_qps_limit;
auto filter = [this](const auto& kv) {
return !instance_with_specific_limit_.contains(kv.first);
};
for (auto& [k, v] : qps_limiter_ | std::views::filter(std::move(filter))) {
v->set_max_qps_limit(max_qps_limit);
}
}

bool RpcRateLimiter::set_max_qps_limit(int64_t max_qps_limit, const std::string& instance_id) {
std::lock_guard<bthread::Mutex> l(mutex_);
if (!qps_limiter_.contains(instance_id)) {
qps_limiter_[instance_id] = std::make_shared<QpsToken>(max_qps_limit);
} else {
qps_limiter_.at(instance_id)->set_max_qps_limit(max_qps_limit);
}
instance_with_specific_limit_.insert(instance_id);
return true;
}

void RpcRateLimiter::for_each_qps_token(
std::function<void(std::string_view, std::shared_ptr<QpsToken>)> cb) {
for (const auto& [instance_id, qps_token] : qps_limiter_) {
cb(instance_id, qps_token);
}
}

bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
using namespace std::chrono;
auto now = steady_clock::now();
Expand All @@ -110,4 +200,9 @@ bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
return current_qps_ < max_qps_limit_;
}

} // namespace doris::cloud
void RpcRateLimiter::QpsToken::set_max_qps_limit(int64_t max_qps_limit) {
std::lock_guard<bthread::Mutex> l(mutex_);
max_qps_limit_ = max_qps_limit;
}

} // namespace doris::cloud
Loading
Loading