Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: move rpc handler on_query_configuration_by_index from meta_service to server_state #597

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 0 additions & 71 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,51 +93,6 @@ bool meta_service::check_freeze() const
return _alive_set.size() * 100 < _node_live_percentage_threshold_for_update * total;
}

template <typename TRpcHolder>
int meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address)
{
dsn::rpc_address leader;
if (!_failure_detector->get_leader(&leader)) {
if (!rpc.dsn_request()->header->context.u.is_forward_supported) {
if (forward_address != nullptr)
*forward_address = leader;
return -1;
}

dinfo("leader address: %s", leader.to_string());
if (!leader.is_invalid()) {
rpc.forward(leader);
return 0;
} else {
if (forward_address != nullptr)
forward_address->set_invalid();
return -1;
}
}
return 1;
}

template <typename TRpcHolder>
bool meta_service::check_status(TRpcHolder rpc, rpc_address *forward_address)
{
int result = check_leader(rpc, forward_address);
if (result == 0)
return false;
if (result == -1 || !_started) {
if (result == -1) {
rpc.response().err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
rpc.response().err = ERR_UNDER_RECOVERY;
} else {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
}
ddebug("reject request with %s", rpc.response().err.to_string());
return false;
}

return true;
}

template <typename TRespType>
bool meta_service::check_status_with_msg(message_ex *req, TRespType &response_struct)
{
Expand Down Expand Up @@ -412,9 +367,6 @@ void meta_service::register_rpc_handlers()
{
register_rpc_handler_with_rpc_holder(
RPC_CM_CONFIG_SYNC, "config_sync", &meta_service::on_config_sync);
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
"query_configuration_by_index",
&meta_service::on_query_configuration_by_index);
register_rpc_handler(RPC_CM_UPDATE_PARTITION_CONFIGURATION,
"update_configuration",
&meta_service::on_update_configuration);
Expand Down Expand Up @@ -612,29 +564,6 @@ void meta_service::on_query_cluster_info(configuration_cluster_info_rpc rpc)
response.err = dsn::ERR_OK;
}

// client => meta server
void meta_service::on_query_configuration_by_index(configuration_query_by_index_rpc rpc)
{
configuration_query_by_index_response &response = rpc.response();
rpc_address forward_address;
if (!check_status(rpc, &forward_address)) {
if (!forward_address.is_invalid()) {
partition_configuration config;
config.primary = forward_address;
response.partitions.push_back(std::move(config));
}
return;
}

_state->query_configuration_by_index(rpc.request(), response);
if (ERR_OK == response.err) {
ddebug_f("client {} queried an available app {} with appid {}",
rpc.dsn_request()->header->from_address.to_string(),
rpc.request().app_name,
response.app_id);
}
}

// partition sever => meta sever
// as get stale configuration is not allowed for partition server, we need to dispatch it to the
// meta state thread pool
Expand Down
63 changes: 53 additions & 10 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@
#include "meta_backup_service.h"
#include "meta_state_service_utils.h"
#include "block_service/block_service_manager.h"
#include "meta_server_failure_detector.h"

namespace dsn {
namespace replication {

class server_state;
class meta_server_failure_detector;
class server_load_balancer;
class meta_duplication_service;
class meta_split_service;
Expand Down Expand Up @@ -128,14 +128,14 @@ class meta_service : public serverlet<meta_service>

dsn::task_tracker *tracker() { return &_tracker; }

template <typename TRpcHolder>
bool check_status(TRpcHolder rpc, /*out*/ rpc_address *forward_address = nullptr);

private:
void register_rpc_handlers();
void register_ctrl_commands();
void unregister_ctrl_commands();

// client => meta server
void on_query_configuration_by_index(configuration_query_by_index_rpc rpc);

// partition server => meta server
void on_config_sync(configuration_query_by_node_rpc rpc);

Expand Down Expand Up @@ -193,14 +193,9 @@ class meta_service : public serverlet<meta_service>
// 0. meta isn't leader, and rpc-msg can forward to others
// -1. meta isn't leader, and rpc-msg can't forward to others
// if return -1 and `forward_address' != nullptr, then return leader by `forward_address'.
int check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address);
int check_leader(dsn::message_ex *req, /*out*/ dsn::rpc_address *forward_address);
template <typename TRpcHolder>
int check_leader(TRpcHolder rpc, /*out*/ rpc_address *forward_address);
// ret:
// false: check failed
// true: check succeed
template <typename TRpcHolder>
bool check_status(TRpcHolder rpc, /*out*/ rpc_address *forward_address = nullptr);
template <typename TRespType>
bool check_status_with_msg(message_ex *req, TRespType &response_struct);

Expand Down Expand Up @@ -263,5 +258,53 @@ class meta_service : public serverlet<meta_service>
dsn::task_tracker _tracker;
};

template <typename TRpcHolder>
bool meta_service::check_status(TRpcHolder rpc, rpc_address *forward_address)
{
int result = check_leader(rpc, forward_address);
if (result == 0) {
return false;
}
if (result == -1 || !_started) {
if (result == -1) {
rpc.response().err = ERR_FORWARD_TO_OTHERS;
} else if (_recovering) {
rpc.response().err = ERR_UNDER_RECOVERY;
} else {
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
}
ddebug("reject request with %s", rpc.response().err.to_string());
return false;
}

return true;
}

template <typename TRpcHolder>
int meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address)
{
dsn::rpc_address leader;
if (!_failure_detector->get_leader(&leader)) {
if (!rpc.dsn_request()->header->context.u.is_forward_supported) {
if (forward_address != nullptr) {
*forward_address = leader;
}
return -1;
}

dinfo("leader address: %s", leader.to_string());
if (!leader.is_invalid()) {
rpc.forward(leader);
return 0;
} else {
if (forward_address != nullptr) {
forward_address->set_invalid();
}
return -1;
}
}
return 1;
}

} // namespace replication
} // namespace dsn
41 changes: 40 additions & 1 deletion src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ static const char *lock_state = "lock";
static const char *unlock_state = "unlock";

server_state::server_state()
: _meta_svc(nullptr),
: serverlet("server_state"),
_meta_svc(nullptr),
_add_secondary_enable_flow_control(false),
_add_secondary_max_count_for_one_node(0),
_cli_dump_handle(nullptr),
Expand All @@ -86,6 +87,8 @@ server_state::~server_state()
_ctrl_add_secondary_max_count_for_one_node);
_ctrl_add_secondary_max_count_for_one_node = nullptr;
}

unregister_rpc_handlers();
}

void server_state::register_cli_commands()
Expand Down Expand Up @@ -194,6 +197,8 @@ void server_state::initialize(meta_service *meta_svc, const std::string &apps_ro
"recent_partition_change_writable_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"partition change to writable count in the recent period");

register_rpc_handlers();
}

bool server_state::spin_wait_staging(int timeout_seconds)
Expand Down Expand Up @@ -2830,5 +2835,39 @@ void server_state::clear_app_envs(const app_env_rpc &env_rpc)
new_envs.c_str());
});
}

void server_state::register_rpc_handlers()
{
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
"query_configuration_by_index",
&server_state::on_query_configuration_by_index);
}

void server_state::unregister_rpc_handlers()
{
unregister_rpc_handler(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX);
}

void server_state::on_query_configuration_by_index(configuration_query_by_index_rpc rpc)
{
configuration_query_by_index_response &response = rpc.response();
rpc_address forward_address;
if (!_meta_svc->check_status(rpc, &forward_address)) {
if (!forward_address.is_invalid()) {
partition_configuration config;
config.primary = forward_address;
response.partitions.push_back(std::move(config));
}
return;
}

query_configuration_by_index(rpc.request(), response);
if (ERR_OK == response.err) {
ddebug_f("client {} queried an available app {} with appid {}",
rpc.dsn_request()->header->from_address.to_string(),
rpc.request().app_name,
response.app_id);
}
}
} // namespace replication
} // namespace dsn
8 changes: 7 additions & 1 deletion src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class meta_service;
// D. thread-model of meta server
// E. load balancer

class server_state
class server_state : public serverlet<server_state>
{
public:
static const int sStateHash = 0;
Expand Down Expand Up @@ -291,6 +291,12 @@ class server_state
void process_one_partition(std::shared_ptr<app_state> &app);
void transition_staging_state(std::shared_ptr<app_state> &app);

void register_rpc_handlers();
void unregister_rpc_handlers();

// client => meta server
void on_query_configuration_by_index(configuration_query_by_index_rpc rpc);
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

private:
friend class test::test_checker;
friend class meta_service_test_app;
Expand Down
1 change: 1 addition & 0 deletions src/meta/test/config-ddl-test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_DLOCK,THREAD_POO
[core]
;tool = simulator
tool = nativerun
enable_register_rpc_handler = false

;toollets = tracer, profiler
toollets = fault_injector
Expand Down
1 change: 1 addition & 0 deletions src/meta/test/config-test.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_DLOCK,THREAD_POO
[core]
;tool = simulator
tool = nativerun
enable_register_rpc_handler = false

;toollets = tracer, profiler
;fault_injector
Expand Down
10 changes: 10 additions & 0 deletions src/runtime/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@
#include <dsn/tool-api/async_calls.h>
#include <dsn/cpp/serialization.h>
#include <dsn/utility/rand.h>
#include <dsn/utility/flags.h>
#include <set>

namespace dsn {

DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)

DSN_DEFINE_bool("core",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this config item used for? when to config true and when false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this configuration is used for unit test to disable register rpc handlers. Because in some unit tests, one same rpc handler will register twice more times, and this will produce coredump. So you can set this configuration to false to avoid this situation.

enable_register_rpc_handler,
true,
"whether enable the register of rpc handler");

class rpc_timeout_task : public task
{
public:
Expand Down Expand Up @@ -335,6 +341,10 @@ bool rpc_server_dispatcher::register_rpc_handler(dsn::task_code code,
const char *extra_name,
const rpc_request_handler &h)
{
if (!FLAGS_enable_register_rpc_handler) {
return false;
}

std::unique_ptr<handler_entry> ctx(new handler_entry{code, extra_name, h});

utils::auto_write_lock l(_handlers_lock);
Expand Down