Skip to content

Commit 453770f

Browse files
[feature](meta-service) Support querying and adjusting rpc qps limit on meta service
1 parent 6dcc221 commit 453770f

File tree

5 files changed

+320
-41
lines changed

5 files changed

+320
-41
lines changed

cloud/src/meta-service/meta_service_http.cpp

+69-1
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
#include <rapidjson/prettywriter.h>
3131
#include <rapidjson/stringbuffer.h>
3232

33+
#include <cstdint>
3334
#include <memory>
3435
#include <optional>
36+
#include <string>
37+
#include <string_view>
3538
#include <type_traits>
3639
#include <variant>
3740
#include <vector>
@@ -42,6 +45,7 @@
4245
#include "meta-service/txn_kv.h"
4346
#include "meta-service/txn_kv_error.h"
4447
#include "meta_service.h"
48+
#include "rate-limiter/rate_limiter.h"
4549

4650
namespace doris::cloud {
4751

@@ -331,6 +335,66 @@ static HttpResponse process_alter_iam(MetaServiceImpl* service, brpc::Controller
331335
return http_json_reply(resp.status());
332336
}
333337

338+
static HttpResponse process_adjust_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
339+
const auto& uri = cntl->http_request().uri();
340+
bool is_attr_set = false;
341+
int64_t default_qps_limit = -1;
342+
if (auto default_qps_limit_str = std::string(http_query(uri, "default_qps_limit"));
343+
!default_qps_limit_str.empty()) {
344+
try {
345+
default_qps_limit = std::stoll(default_qps_limit_str);
346+
} catch (const std::exception& ex) {
347+
return http_json_reply(
348+
MetaServiceCode::INVALID_ARGUMENT,
349+
fmt::format("param `default_qps_limit` is not a legal int64 type:{}",
350+
ex.what()));
351+
}
352+
if (default_qps_limit < 0) {
353+
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
354+
"`default_qps_limit` should not be less than 0");
355+
}
356+
is_attr_set |= true;
357+
}
358+
std::string_view specific_max_qps_limit = http_query(uri, "specific_max_qps_limit");
359+
is_attr_set |= (!specific_max_qps_limit.empty());
360+
if (!is_attr_set) {
361+
return http_json_reply(
362+
MetaServiceCode::INVALID_ARGUMENT,
363+
"default_qps_limit(int64) or "
364+
"specific_max_qps_limit(list of[rpcname:qps(int64);]) is required as query param");
365+
}
366+
auto rate_limiter = service->rate_limiter();
367+
rate_limiter->reset_rate_limit(service, default_qps_limit, specific_max_qps_limit.data());
368+
return http_json_reply(MetaServiceCode::OK, "success to adjust rate limit");
369+
}
370+
371+
static HttpResponse process_query_rate_limit(MetaServiceImpl* service, brpc::Controller* cntl) {
372+
const auto& uri = cntl->http_request().uri();
373+
auto rpc_name = std::string(http_query(uri, "rpc_name"));
374+
auto rate_limiter = service->rate_limiter();
375+
rapidjson::Document d;
376+
if (rpc_name.empty()) {
377+
auto get_qps_limit = [&d](std::string_view rpc_name,
378+
std::shared_ptr<RpcRateLimiter> rpc_limiter) {
379+
d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()),
380+
std::to_string(rpc_limiter->max_qps_limit()), d.GetAllocator());
381+
};
382+
rate_limiter->for_each_rpc_limiter(std::move(get_qps_limit));
383+
} else {
384+
auto rpc_limiter = rate_limiter->get_rpc_rate_limiter(rpc_name);
385+
if (rpc_limiter == nullptr) [[unlikely]] {
386+
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
387+
fmt::format("rpc_name={} is not exists", rpc_name));
388+
}
389+
d.AddMember(rapidjson::StringRef(rpc_name.data(), rpc_name.size()),
390+
std::to_string(rpc_limiter->max_qps_limit()), d.GetAllocator());
391+
}
392+
rapidjson::StringBuffer sb;
393+
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(sb);
394+
d.Accept(writer);
395+
return http_json_reply(MetaServiceCode::OK, sb.GetString());
396+
}
397+
334398
static HttpResponse process_decode_key(MetaServiceImpl*, brpc::Controller* ctrl) {
335399
auto& uri = ctrl->http_request().uri();
336400
std::string_view key = http_query(uri, "key");
@@ -598,13 +662,17 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
598662
{"abort_tablet_job", process_abort_tablet_job},
599663
{"alter_ram_user", process_alter_ram_user},
600664
{"alter_iam", process_alter_iam},
665+
{"adjust_rate_limit", process_adjust_rate_limit},
666+
{"query_rate_limit", process_query_rate_limit},
601667
{"v1/abort_txn", process_abort_txn},
602668
{"v1/abort_tablet_job", process_abort_tablet_job},
603669
{"v1/alter_ram_user", process_alter_ram_user},
604670
{"v1/alter_iam", process_alter_iam},
671+
{"v1/adjust_rate_limit", process_adjust_rate_limit},
672+
{"v1/query_rate_limit", process_query_rate_limit},
605673
};
606674

607-
auto cntl = static_cast<brpc::Controller*>(controller);
675+
auto* cntl = static_cast<brpc::Controller*>(controller);
608676
brpc::ClosureGuard closure_guard(done);
609677

610678
// Prepare input request info

cloud/src/rate-limiter/rate_limiter.cpp

+72-10
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,21 @@
2020
#include <butil/strings/string_split.h>
2121

2222
#include <chrono>
23+
#include <cstdint>
2324
#include <memory>
2425
#include <mutex>
26+
#include <shared_mutex>
2527

2628
#include "common/bvars.h"
2729
#include "common/config.h"
2830
#include "common/configbase.h"
2931

3032
namespace doris::cloud {
3133

32-
void RateLimiter::init(google::protobuf::Service* service) {
33-
std::map<std::string, int64_t> rpc_name_to_max_qps_limit;
34+
std::unordered_map<std::string, int64_t> parse_specific_qps_limit(const std::string& list_str) {
35+
std::unordered_map<std::string, int64_t> rpc_name_to_max_qps_limit;
3436
std::vector<std::string> max_qps_limit_list;
35-
butil::SplitString(config::specific_max_qps_limit, ';', &max_qps_limit_list);
37+
butil::SplitString(list_str, ';', &max_qps_limit_list);
3638
for (const auto& v : max_qps_limit_list) {
3739
auto p = v.find(':');
3840
if (p != std::string::npos && p != (v.size() - 1)) {
@@ -41,36 +43,96 @@ void RateLimiter::init(google::protobuf::Service* service) {
4143
int64_t max_qps_limit = std::stoll(v.substr(p + 1));
4244
if (max_qps_limit > 0) {
4345
rpc_name_to_max_qps_limit[rpc_name] = max_qps_limit;
44-
LOG(INFO) << "set rpc: " << rpc_name << " max_qps_limit: " << max_qps_limit;
4546
}
4647
} catch (...) {
47-
LOG(WARNING) << "failed to set max_qps_limit to rpc: " << rpc_name
48+
LOG(WARNING) << "failed to parse max_qps_limit to rpc: " << rpc_name
4849
<< " config: " << v;
4950
}
5051
}
5152
}
53+
return rpc_name_to_max_qps_limit;
54+
}
55+
56+
template <typename Callable>
57+
void for_each_rpc_name(google::protobuf::Service* service, Callable cb) {
5258
auto method_size = service->GetDescriptor()->method_count();
5359
for (auto i = 0; i < method_size; ++i) {
5460
std::string rpc_name = service->GetDescriptor()->method(i)->name();
55-
int64_t max_qps_limit = config::default_max_qps_limit;
61+
cb(rpc_name);
62+
}
63+
}
5664

57-
auto it = rpc_name_to_max_qps_limit.find(rpc_name);
58-
if (it != rpc_name_to_max_qps_limit.end()) {
65+
void RateLimiter::init(google::protobuf::Service* service) {
66+
auto rpc_name_to_specific_limit = parse_specific_qps_limit(config::specific_max_qps_limit);
67+
std::unique_lock write_lock(shared_mtx_);
68+
for_each_rpc_name(service, [&](const std::string& rpc_name) {
69+
auto it = rpc_name_to_specific_limit.find(rpc_name);
70+
int64_t max_qps_limit = config::default_max_qps_limit;
71+
if (it != rpc_name_to_specific_limit.end()) {
5972
max_qps_limit = it->second;
6073
}
6174
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, max_qps_limit);
75+
});
76+
for (const auto& [k, _] : rpc_name_to_specific_limit) {
77+
rpc_with_specific_limit_.insert(k);
6278
}
6379
}
6480

6581
std::shared_ptr<RpcRateLimiter> RateLimiter::get_rpc_rate_limiter(const std::string& rpc_name) {
66-
// no need to be locked, because it is only modified during initialization
82+
std::shared_lock read_lock(shared_mtx_);
6783
auto it = limiters_.find(rpc_name);
6884
if (it == limiters_.end()) {
6985
return nullptr;
7086
}
7187
return it->second;
7288
}
7389

90+
void RateLimiter::reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit,
91+
const std::string& specific_max_qps_limit) {
92+
// TODO: merge specific_max_qps_limit
93+
auto specific_limits = parse_specific_qps_limit(specific_max_qps_limit);
94+
95+
auto reset_specific_limit = [&](const std::string& rpc_name) -> bool {
96+
if (auto it = specific_limits.find(rpc_name); it != specific_limits.end()) {
97+
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, it->second);
98+
return true;
99+
}
100+
return false;
101+
};
102+
auto reset_default_limit = [&](const std::string& rpc_name) {
103+
if (rpc_with_specific_limit_.contains(rpc_name)) {
104+
return;
105+
}
106+
limiters_[rpc_name] = std::make_shared<RpcRateLimiter>(rpc_name, default_qps_limit);
107+
};
108+
109+
std::unique_lock write_lock(shared_mtx_);
110+
for (const auto& [k, _] : specific_limits) {
111+
rpc_with_specific_limit_.insert(k);
112+
}
113+
if (default_qps_limit < 0) {
114+
for_each_rpc_name(service, std::move(reset_specific_limit));
115+
return;
116+
}
117+
if (specific_limits.empty()) {
118+
for_each_rpc_name(service, std::move(reset_default_limit));
119+
return;
120+
}
121+
for_each_rpc_name(service, [&](const std::string& rpc_name) {
122+
if (reset_specific_limit(rpc_name)) {
123+
return;
124+
}
125+
reset_default_limit(rpc_name);
126+
});
127+
}
128+
129+
void RateLimiter::for_each_rpc_limiter(
130+
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb) {
131+
for (const auto& [rpc_name, rpc_limiter] : limiters_) {
132+
cb(rpc_name, rpc_limiter);
133+
}
134+
}
135+
74136
bool RpcRateLimiter::get_qps_token(const std::string& instance_id,
75137
std::function<int()>& get_bvar_qps) {
76138
if (!config::use_detailed_metrics || instance_id.empty()) {
@@ -110,4 +172,4 @@ bool RpcRateLimiter::QpsToken::get_token(std::function<int()>& get_bvar_qps) {
110172
return current_qps_ < max_qps_limit_;
111173
}
112174

113-
} // namespace doris::cloud
175+
} // namespace doris::cloud

cloud/src/rate-limiter/rate_limiter.h

+18-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
#include <brpc/server.h>
2121
#include <bthread/mutex.h>
22+
#include <google/protobuf/service.h>
2223

2324
#include <cstdint>
2425
#include <memory>
26+
#include <shared_mutex>
2527
#include <string>
28+
#include <string_view>
2629
#include <unordered_map>
2730

2831
#include "common/config.h"
@@ -35,12 +38,22 @@ class RateLimiter {
3538
public:
3639
RateLimiter() = default;
3740
~RateLimiter() = default;
41+
3842
void init(google::protobuf::Service* service);
43+
3944
std::shared_ptr<RpcRateLimiter> get_rpc_rate_limiter(const std::string& rpc_name);
4045

46+
void reset_rate_limit(google::protobuf::Service* service, int64_t default_qps_limit,
47+
const std::string& specific_max_qps_limit);
48+
49+
void for_each_rpc_limiter(
50+
std::function<void(std::string_view, std::shared_ptr<RpcRateLimiter>)> cb);
51+
4152
private:
4253
// rpc_name -> RpcRateLimiter
4354
std::unordered_map<std::string, std::shared_ptr<RpcRateLimiter>> limiters_;
55+
std::unordered_set<std::string> rpc_with_specific_limit_;
56+
std::shared_mutex shared_mtx_;
4457
};
4558

4659
class RpcRateLimiter {
@@ -58,6 +71,10 @@ class RpcRateLimiter {
5871
*/
5972
bool get_qps_token(const std::string& instance_id, std::function<int()>& get_bvar_qps);
6073

74+
std::string_view rpc_name() const { return rpc_name_; }
75+
76+
int64_t max_qps_limit() const { return max_qps_limit_; }
77+
6178
// Todo: Recycle outdated instance_id
6279

6380
private:
@@ -75,12 +92,11 @@ class RpcRateLimiter {
7592
int64_t max_qps_limit_;
7693
};
7794

78-
private:
7995
bthread::Mutex mutex_;
8096
// instance_id -> QpsToken
8197
std::unordered_map<std::string, std::shared_ptr<QpsToken>> qps_limiter_;
8298
std::string rpc_name_;
8399
int64_t max_qps_limit_;
84100
};
85101

86-
} // namespace doris::cloud
102+
} // namespace doris::cloud

cloud/test/meta_service_http_test.cpp

+83
Original file line numberDiff line numberDiff line change
@@ -1454,4 +1454,87 @@ TEST(MetaServiceHttpTest, TxnLazyCommit) {
14541454
}
14551455
}
14561456

1457+
TEST(MetaServiceHttpTest, AdjustRateLimit) {
1458+
HttpContext ctx;
1459+
{
1460+
auto [status_code, content] = ctx.query<std::string>(
1461+
"adjust_rate_limit",
1462+
"default_qps_limit=10000&specific_max_qps_limit=get_cluster:10000");
1463+
ASSERT_EQ(status_code, 200);
1464+
}
1465+
{
1466+
auto [status_code, content] =
1467+
ctx.query<std::string>("adjust_rate_limit", "default_qps_limit=10000");
1468+
ASSERT_EQ(status_code, 200);
1469+
}
1470+
{
1471+
auto [status_code, content] = ctx.query<std::string>(
1472+
"adjust_rate_limit", "specific_max_qps_limit=get_cluster:10000");
1473+
ASSERT_EQ(status_code, 200);
1474+
}
1475+
{
1476+
auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", "");
1477+
ASSERT_EQ(status_code, 400);
1478+
std::string msg =
1479+
"default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) "
1480+
"is required as query param";
1481+
ASSERT_TRUE(content.find(msg) != std::string::npos);
1482+
}
1483+
{
1484+
auto [status_code, content] = ctx.query<std::string>("adjust_rate_limit", "key=abc");
1485+
ASSERT_EQ(status_code, 400);
1486+
std::string msg =
1487+
"default_qps_limit(int64) or specific_max_qps_limit(list of[rpcname:qps(int64);]) "
1488+
"is required as query param";
1489+
ASSERT_TRUE(content.find(msg) != std::string::npos);
1490+
}
1491+
{
1492+
auto [status_code, content] =
1493+
ctx.query<std::string>("adjust_rate_limit", "default_qps_limit=invalid");
1494+
ASSERT_EQ(status_code, 400);
1495+
std::string msg = "param `qps_limit` is not a legal int64 type:";
1496+
ASSERT_TRUE(content.find(msg) != std::string::npos);
1497+
}
1498+
{
1499+
auto [status_code, content] = ctx.query<std::string>(
1500+
"adjust_rate_limit",
1501+
"specific_max_qps_limit=get_cluster:10000&default_qps_limit=invalid");
1502+
ASSERT_EQ(status_code, 400);
1503+
std::string msg = "param `qps_limit` is not a legal int64 type:";
1504+
ASSERT_TRUE(content.find(msg) != std::string::npos);
1505+
}
1506+
{
1507+
auto [status_code, content] = ctx.query<std::string>(
1508+
"adjust_rate_limit",
1509+
"specific_max_qps_limit=get_cluster:invalid&default_qps_limit=10000");
1510+
// note: invalid so will not take effect, but return ok, by design
1511+
ASSERT_EQ(status_code, 200);
1512+
}
1513+
{
1514+
auto [status_code, content] = ctx.query<std::string>(
1515+
"adjust_rate_limit", "specific_max_qps_limit=xxx:10000&default_qps_limit=10000");
1516+
// note: invalid so will not take effect, but return ok, by design
1517+
ASSERT_EQ(status_code, 200);
1518+
}
1519+
}
1520+
1521+
TEST(MetaServiceHttpTest, QueryRateLimit) {
1522+
HttpContext ctx;
1523+
{
1524+
auto [status_code, content] = ctx.query<std::string>("query_rate_limit", "");
1525+
ASSERT_EQ(status_code, 200);
1526+
}
1527+
{
1528+
auto [status_code, content] =
1529+
ctx.query<std::string>("query_rate_limit", "rpc_name=get_cluster");
1530+
ASSERT_EQ(status_code, 200);
1531+
}
1532+
{
1533+
auto [status_code, content] = ctx.query<std::string>("query_rate_limit", "rpc_name=xxx");
1534+
ASSERT_EQ(status_code, 400);
1535+
std::string msg = "rpc_name=xxx is not exists";
1536+
ASSERT_TRUE(content.find(msg) != std::string::npos);
1537+
}
1538+
}
1539+
14571540
} // namespace doris::cloud

0 commit comments

Comments
 (0)