diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp index d56b64a715..77046ab7c3 100644 --- a/src/meta/meta_service.cpp +++ b/src/meta/meta_service.cpp @@ -93,51 +93,6 @@ bool meta_service::check_freeze() const return _alive_set.size() * 100 < _node_live_percentage_threshold_for_update * total; } -template -int meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address) -{ - dsn::rpc_address leader; - if (!_failure_detector->get_leader(&leader)) { - if (!rpc.dsn_request()->header->context.u.is_forward_supported) { - if (forward_address != nullptr) - *forward_address = leader; - return -1; - } - - dinfo("leader address: %s", leader.to_string()); - if (!leader.is_invalid()) { - rpc.forward(leader); - return 0; - } else { - if (forward_address != nullptr) - forward_address->set_invalid(); - return -1; - } - } - return 1; -} - -template -bool meta_service::check_status(TRpcHolder rpc, rpc_address *forward_address) -{ - int result = check_leader(rpc, forward_address); - if (result == 0) - return false; - if (result == -1 || !_started) { - if (result == -1) { - rpc.response().err = ERR_FORWARD_TO_OTHERS; - } else if (_recovering) { - rpc.response().err = ERR_UNDER_RECOVERY; - } else { - rpc.response().err = ERR_SERVICE_NOT_ACTIVE; - } - ddebug("reject request with %s", rpc.response().err.to_string()); - return false; - } - - return true; -} - template bool meta_service::check_status_with_msg(message_ex *req, TRespType &response_struct) { @@ -412,9 +367,6 @@ void meta_service::register_rpc_handlers() { register_rpc_handler_with_rpc_holder( RPC_CM_CONFIG_SYNC, "config_sync", &meta_service::on_config_sync); - register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, - "query_configuration_by_index", - &meta_service::on_query_configuration_by_index); register_rpc_handler(RPC_CM_UPDATE_PARTITION_CONFIGURATION, "update_configuration", &meta_service::on_update_configuration); @@ -612,29 +564,6 @@ void meta_service::on_query_cluster_info(configuration_cluster_info_rpc rpc) response.err = dsn::ERR_OK; } -// client => meta server -void meta_service::on_query_configuration_by_index(configuration_query_by_index_rpc rpc) -{ - configuration_query_by_index_response &response = rpc.response(); - rpc_address forward_address; - if (!check_status(rpc, &forward_address)) { - if (!forward_address.is_invalid()) { - partition_configuration config; - config.primary = forward_address; - response.partitions.push_back(std::move(config)); - } - return; - } - - _state->query_configuration_by_index(rpc.request(), response); - if (ERR_OK == response.err) { - ddebug_f("client {} queried an available app {} with appid {}", - rpc.dsn_request()->header->from_address.to_string(), - rpc.request().app_name, - response.app_id); - } -} - // partition sever => meta sever // as get stale configuration is not allowed for partition server, we need to dispatch it to the // meta state thread pool diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h index 900ab55fc0..8fa7f4a920 100644 --- a/src/meta/meta_service.h +++ b/src/meta/meta_service.h @@ -47,12 +47,12 @@ #include "meta_backup_service.h" #include "meta_state_service_utils.h" #include "block_service/block_service_manager.h" +#include "meta_server_failure_detector.h" namespace dsn { namespace replication { class server_state; -class meta_server_failure_detector; class server_load_balancer; class meta_duplication_service; class meta_split_service; @@ -128,14 +128,14 @@ class meta_service : public serverlet dsn::task_tracker *tracker() { return &_tracker; } + template + bool check_status(TRpcHolder rpc, /*out*/ rpc_address *forward_address = nullptr); + private: void register_rpc_handlers(); void register_ctrl_commands(); void unregister_ctrl_commands(); - // client => meta server - void on_query_configuration_by_index(configuration_query_by_index_rpc rpc); - // partition server => meta server void on_config_sync(configuration_query_by_node_rpc rpc); @@ -193,14 +193,9 @@ class meta_service : public serverlet // 0. meta isn't leader, and rpc-msg can forward to others // -1. meta isn't leader, and rpc-msg can't forward to others // if return -1 and `forward_address' != nullptr, then return leader by `forward_address'. - int check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address); + int check_leader(dsn::message_ex *req, /*out*/ dsn::rpc_address *forward_address); template int check_leader(TRpcHolder rpc, /*out*/ rpc_address *forward_address); - // ret: - // false: check failed - // true: check succeed - template - bool check_status(TRpcHolder rpc, /*out*/ rpc_address *forward_address = nullptr); template bool check_status_with_msg(message_ex *req, TRespType &response_struct); @@ -263,5 +258,53 @@ class meta_service : public serverlet dsn::task_tracker _tracker; }; +template +bool meta_service::check_status(TRpcHolder rpc, rpc_address *forward_address) +{ + int result = check_leader(rpc, forward_address); + if (result == 0) { + return false; + } + if (result == -1 || !_started) { + if (result == -1) { + rpc.response().err = ERR_FORWARD_TO_OTHERS; + } else if (_recovering) { + rpc.response().err = ERR_UNDER_RECOVERY; + } else { + rpc.response().err = ERR_SERVICE_NOT_ACTIVE; + } + ddebug("reject request with %s", rpc.response().err.to_string()); + return false; + } + + return true; +} + +template +int meta_service::check_leader(TRpcHolder rpc, rpc_address *forward_address) +{ + dsn::rpc_address leader; + if (!_failure_detector->get_leader(&leader)) { + if (!rpc.dsn_request()->header->context.u.is_forward_supported) { + if (forward_address != nullptr) { + *forward_address = leader; + } + return -1; + } + + dinfo("leader address: %s", leader.to_string()); + if (!leader.is_invalid()) { + rpc.forward(leader); + return 0; + } else { + if (forward_address != nullptr) { + forward_address->set_invalid(); + } + return -1; + } + } + return 1; +} + } // namespace replication } // namespace dsn diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 7314fa88c5..ae948c31b1 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -60,7 +60,8 @@ static const char *lock_state = "lock"; static const char *unlock_state = "unlock"; server_state::server_state() - : _meta_svc(nullptr), + : serverlet("server_state"), + _meta_svc(nullptr), _add_secondary_enable_flow_control(false), _add_secondary_max_count_for_one_node(0), _cli_dump_handle(nullptr), @@ -86,6 +87,8 @@ server_state::~server_state() _ctrl_add_secondary_max_count_for_one_node); _ctrl_add_secondary_max_count_for_one_node = nullptr; } + + unregister_rpc_handlers(); } void server_state::register_cli_commands() @@ -194,6 +197,8 @@ void server_state::initialize(meta_service *meta_svc, const std::string &apps_ro "recent_partition_change_writable_count", COUNTER_TYPE_VOLATILE_NUMBER, "partition change to writable count in the recent period"); + + register_rpc_handlers(); } bool server_state::spin_wait_staging(int timeout_seconds) @@ -2830,5 +2835,39 @@ void server_state::clear_app_envs(const app_env_rpc &env_rpc) new_envs.c_str()); }); } + +void server_state::register_rpc_handlers() +{ + register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, + "query_configuration_by_index", + &server_state::on_query_configuration_by_index); +} + +void server_state::unregister_rpc_handlers() +{ + unregister_rpc_handler(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX); +} + +void server_state::on_query_configuration_by_index(configuration_query_by_index_rpc rpc) +{ + configuration_query_by_index_response &response = rpc.response(); + rpc_address forward_address; + if (!_meta_svc->check_status(rpc, &forward_address)) { + if (!forward_address.is_invalid()) { + partition_configuration config; + config.primary = forward_address; + response.partitions.push_back(std::move(config)); + } + return; + } + + query_configuration_by_index(rpc.request(), response); + if (ERR_OK == response.err) { + ddebug_f("client {} queried an available app {} with appid {}", + rpc.dsn_request()->header->from_address.to_string(), + rpc.request().app_name, + response.app_id); + } +} } // namespace replication } // namespace dsn diff --git a/src/meta/server_state.h b/src/meta/server_state.h index a3320e35ba..95e90a85fb 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -106,7 +106,7 @@ class meta_service; // D. thread-model of meta server // E. load balancer -class server_state +class server_state : public serverlet { public: static const int sStateHash = 0; @@ -291,6 +291,12 @@ class server_state void process_one_partition(std::shared_ptr &app); void transition_staging_state(std::shared_ptr &app); + void register_rpc_handlers(); + void unregister_rpc_handlers(); + + // client => meta server + void on_query_configuration_by_index(configuration_query_by_index_rpc rpc); + private: friend class test::test_checker; friend class meta_service_test_app; diff --git a/src/meta/test/config-ddl-test.ini b/src/meta/test/config-ddl-test.ini index d455d80a57..73a5412933 100644 --- a/src/meta/test/config-ddl-test.ini +++ b/src/meta/test/config-ddl-test.ini @@ -15,6 +15,7 @@ pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_DLOCK,THREAD_POO [core] ;tool = simulator tool = nativerun +enable_register_rpc_handler = false ;toollets = tracer, profiler toollets = fault_injector diff --git a/src/meta/test/config-test.ini b/src/meta/test/config-test.ini index b2215bec42..2553ddeaa2 100644 --- a/src/meta/test/config-test.ini +++ b/src/meta/test/config-test.ini @@ -15,6 +15,7 @@ pools = THREAD_POOL_DEFAULT,THREAD_POOL_META_SERVER,THREAD_POOL_DLOCK,THREAD_POO [core] ;tool = simulator tool = nativerun +enable_register_rpc_handler = false ;toollets = tracer, profiler ;fault_injector diff --git a/src/runtime/rpc/rpc_engine.cpp b/src/runtime/rpc/rpc_engine.cpp index 2bf70dec71..6f70dc3dfe 100644 --- a/src/runtime/rpc/rpc_engine.cpp +++ b/src/runtime/rpc/rpc_engine.cpp @@ -38,12 +38,18 @@ #include #include #include +#include #include namespace dsn { DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT) +DSN_DEFINE_bool("core", + enable_register_rpc_handler, + true, + "whether enable the register of rpc handler"); + class rpc_timeout_task : public task { public: @@ -335,6 +341,10 @@ bool rpc_server_dispatcher::register_rpc_handler(dsn::task_code code, const char *extra_name, const rpc_request_handler &h) { + if (!FLAGS_enable_register_rpc_handler) { + return false; + } + std::unique_ptr ctx(new handler_entry{code, extra_name, h}); utils::auto_write_lock l(_handlers_lock);