Skip to content

Commit

Permalink
[SKV-627] check leader status (apache#1406)
Browse files Browse the repository at this point in the history
  • Loading branch information
王浩 authored and acelyc111 committed Apr 27, 2023
1 parent 59ae514 commit 1c17f15
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 29 deletions.
17 changes: 9 additions & 8 deletions src/rdsn/src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,27 +483,28 @@ void meta_service::register_rpc_handlers()
&meta_service::on_set_max_replica_count);
}

int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address)
meta_leader_state meta_service::check_leader(dsn::message_ex *req,
dsn::rpc_address *forward_address)
{
dsn::rpc_address leader;
if (!_failure_detector->get_leader(&leader)) {
if (!req->header->context.u.is_forward_supported) {
if (forward_address != nullptr)
*forward_address = leader;
return -1;
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}

dinfo("leader address: %s", leader.to_string());
if (!leader.is_invalid()) {
dsn_rpc_forward(req, leader);
return 0;
return meta_leader_state::kNotLeaderAndCanForwardRpc;
} else {
if (forward_address != nullptr)
forward_address->set_invalid();
return -1;
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}
}
return 1;
return meta_leader_state::kIsLeader;
}

// table operations
Expand Down Expand Up @@ -749,13 +750,13 @@ void meta_service::on_start_recovery(configuration_recovery_rpc rpc)
{
configuration_recovery_response &response = rpc.response();
ddebug("got start recovery request, start to do recovery");
int result = check_leader(rpc, nullptr);
auto result = check_leader(rpc, nullptr);
// request has been forwarded to others
if (result == 0) {
if (result == meta_leader_state::kNotLeaderAndCanForwardRpc) {
return;
}

if (result == -1) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) {
response.err = ERR_FORWARD_TO_OTHERS;
} else {
zauto_write_lock l(_meta_lock);
Expand Down
46 changes: 25 additions & 21 deletions src/rdsn/src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ ENUM_REG(meta_op_status::RESTORE)
ENUM_REG(meta_op_status::MANUAL_COMPACT)
ENUM_END(meta_op_status)

// The leader status of meta server
enum class meta_leader_state : int
{
kIsLeader, // the meta is leader
kNotLeaderAndCanForwardRpc, // meta isn't leader, and rpc-msg can forward to others
kNotLeaderAndCannotForwardRpc, // meta isn't leader, and rpc-msg can't forward to others
};

class meta_service : public serverlet<meta_service>
{
public:
Expand Down Expand Up @@ -254,15 +262,11 @@ class meta_service : public serverlet<meta_service>
void on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc);
void on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc);

// common routines
// ret:
// 1. the meta is leader
// 0. meta isn't leader, and rpc-msg can forward to others
// -1. meta isn't leader, and rpc-msg can't forward to others
// if return -1 and `forward_address' != nullptr, then return leader by `forward_address'.
int check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address);
// if return 'kNotLeaderAndCannotForwardRpc' and 'forward_address' != nullptr, then return
// leader by 'forward_address'.
meta_leader_state check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address);
template <typename TRpcHolder>
int check_leader(TRpcHolder rpc, /*out*/ rpc_address *forward_address);
meta_leader_state check_leader(TRpcHolder rpc, /*out*/ rpc_address *forward_address);
// ret:
// false: check failed
// true: check succeed
Expand Down Expand Up @@ -344,27 +348,27 @@ class meta_service : public serverlet<meta_service>
};

template <typename TRpcHolder>
int meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address)
meta_leader_state meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address)
{
dsn::rpc_address leader;
if (!_failure_detector->get_leader(&leader)) {
if (!rpc.dsn_request()->header->context.u.is_forward_supported) {
if (forward_address != nullptr)
*forward_address = leader;
return -1;
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}

dinfo("leader address: %s", leader.to_string());
if (!leader.is_invalid()) {
rpc.forward(leader);
return 0;
return meta_leader_state::kNotLeaderAndCanForwardRpc;
} else {
if (forward_address != nullptr)
forward_address->set_invalid();
return -1;
return meta_leader_state::kNotLeaderAndCannotForwardRpc;
}
}
return 1;
return meta_leader_state::kIsLeader;
}

template <typename TRpcHolder>
Expand All @@ -376,11 +380,11 @@ bool meta_service::check_status(TRpcHolder rpc, rpc_address *forward_address)
return false;
}

int result = check_leader(rpc, forward_address);
if (result == 0)
auto result = check_leader(rpc, forward_address);
if (result == meta_leader_state::kNotLeaderAndCanForwardRpc)
return false;
if (result == -1 || !_started) {
if (result == -1) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc || !_started) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) {
rpc.response().err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
rpc.response().err = ERR_UNDER_RECOVERY;
Expand All @@ -404,12 +408,12 @@ bool meta_service::check_status_with_msg(message_ex *req, TRespType &response_st
return false;
}

int result = check_leader(req, nullptr);
if (result == 0) {
auto result = check_leader(req, nullptr);
if (result == meta_leader_state::kNotLeaderAndCanForwardRpc) {
return false;
}
if (result == -1 || !_started) {
if (result == -1) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc || !_started) {
if (result == meta_leader_state::kNotLeaderAndCannotForwardRpc) {
response_struct.err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
response_struct.err = ERR_UNDER_RECOVERY;
Expand Down

0 comments on commit 1c17f15

Please sign in to comment.