diff --git a/rdsn b/rdsn index 88dd590d94..0fa841a781 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 88dd590d94f6417b9f42cd322f70ab5f2add837d +Subproject commit 0fa841a781111c5950e543f232181c23961202ec diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index bf91441c75..8f2f5ec66c 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -464,32 +464,33 @@ inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector &nodes, - const ::dsn::command &cmd, - std::vector> &results) +inline std::vector> +call_remote_command(shell_context *sc, + const std::vector &nodes, + const std::string &cmd, + const std::vector &arguments) { - dsn::cli_client cli; + std::vector> results; std::vector tasks; tasks.resize(nodes.size()); results.resize(nodes.size()); for (int i = 0; i < nodes.size(); ++i) { - auto callback = [&results, - i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + auto callback = [&results, i](::dsn::error_code err, const std::string &resp) { if (err == ::dsn::ERR_OK) { results[i].first = true; - ::dsn::unmarshall(resp, results[i].second); + results[i].second = resp; } else { results[i].first = false; results[i].second = err.to_string(); } }; - tasks[i] = - cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, nodes[i].address); + tasks[i] = dsn::dist::cmd::async_call_remote( + nodes[i].address, cmd, arguments, callback, std::chrono::milliseconds(5000)); } for (int i = 0; i < nodes.size(); ++i) { tasks[i]->wait(); } + return results; } inline bool parse_app_pegasus_perf_counter_name(const std::string &name, @@ -771,13 +772,8 @@ inline bool get_app_partition_stat(shell_context *sc, } // get all of the perf counters with format ".*@.*" - ::dsn::command command; - command.cmd = "perf-counters"; - char tmp[256]; - sprintf(tmp, ".*@.*"); - command.arguments.emplace_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters", {".*@.*"}); for (int i = 0; i < nodes.size(); ++i) { // decode info of perf-counters on node i @@ -841,17 +837,16 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector arguments; char tmp[256]; if (app_name.empty()) { sprintf(tmp, ".*@.*"); } else { sprintf(tmp, ".*@%d\\..*", app_info->app_id); } - command.arguments.emplace_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + arguments.emplace_back(tmp); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters", arguments); if (app_name.empty()) { std::map> app_partitions; @@ -964,11 +959,8 @@ inline bool get_capacity_unit_stat(shell_context *sc, return false; } - ::dsn::command command; - command.cmd = "perf-counters-by-substr"; - command.arguments.emplace_back(".cu@"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"}); nodes_stat.resize(nodes.size()); for (int i = 0; i < nodes.size(); ++i) { @@ -1036,11 +1028,8 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s } } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.emplace_back("replica*app.pegasus*disk.storage.sst(MB)"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = call_remote_command( + sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; diff --git a/src/shell/commands.h b/src/shell/commands.h index c42199148d..5f5a80dc62 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -15,7 +15,6 @@ #include #include #include -#include #include #include diff --git a/src/shell/commands/node_management.cpp b/src/shell/commands/node_management.cpp index 54ec5473a8..4869981927 100644 --- a/src/shell/commands/node_management.cpp +++ b/src/shell/commands/node_management.cpp @@ -183,16 +183,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.push_back("replica*server*memused.res(MB)"); - command.arguments.push_back("replica*app.pegasus*rdb.block_cache.memory_usage"); - command.arguments.push_back("replica*eon.replica_stub*disk.available.total.ratio"); - command.arguments.push_back("replica*eon.replica_stub*disk.available.min.ratio"); - command.arguments.push_back("replica*app.pegasus*rdb.memtable.memory_usage"); - command.arguments.push_back("replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-prefix", + {"replica*server*memused.res(MB)", + "replica*app.pegasus*rdb.block_cache.memory_usage", + "replica*eon.replica_stub*disk.available.total.ratio", + "replica*eon.replica_stub*disk.available.min.ratio", + "replica*app.pegasus*rdb.memtable.memory_usage", + "replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -242,17 +242,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - command.arguments.push_back("replica*app.pegasus*get_qps"); - command.arguments.push_back("replica*app.pegasus*multi_get_qps"); - command.arguments.push_back("replica*app.pegasus*put_qps"); - command.arguments.push_back("replica*app.pegasus*multi_put_qps"); - command.arguments.push_back("replica*app.pegasus*recent.read.cu"); - command.arguments.push_back("replica*app.pegasus*recent.write.cu"); - - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-prefix", + {"replica*app.pegasus*get_qps", + "replica*app.pegasus*multi_get_qps", + "replica*app.pegasus*put_qps", + "replica*app.pegasus*multi_put_qps", + "replica*app.pegasus*recent.read.cu", + "replica*app.pegasus*recent.write.cu"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -301,14 +300,14 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-postfix"; - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_GET.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_PUT.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server"); - command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = + call_remote_command(sc, + nodes, + "perf-counters-by-postfix", + {"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", + "zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", + "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", + "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"}); for (int i = 0; i < nodes.size(); ++i) { dsn::rpc_address node_addr = nodes[i].address; @@ -505,10 +504,10 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) return false; } - ::dsn::command cmd; - cmd.cmd = args.argv[optind]; + std::string cmd = args.argv[optind]; + std::vector arguments; for (int i = optind + 1; i < args.argc; i++) { - cmd.arguments.push_back(args.argv[i]); + arguments.push_back(args.argv[i]); } std::vector node_list; @@ -535,14 +534,14 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args) } } - fprintf(stderr, "COMMAND: %s", cmd.cmd.c_str()); - for (auto &s : cmd.arguments) { + fprintf(stderr, "COMMAND: %s", cmd.c_str()); + for (auto &s : arguments) { fprintf(stderr, " %s", s.c_str()); } fprintf(stderr, "\n\n"); - std::vector> results; - call_remote_command(sc, node_list, cmd, results); + std::vector> results = + call_remote_command(sc, node_list, cmd, arguments); int succeed = 0; int failed = 0; diff --git a/src/shell/commands/table_management.cpp b/src/shell/commands/table_management.cpp index 152c1980c1..41e226f23a 100644 --- a/src/shell/commands/table_management.cpp +++ b/src/shell/commands/table_management.cpp @@ -210,15 +210,13 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args) return true; } - ::dsn::command command; - command.cmd = "perf-counters-by-prefix"; - char tmp[256]; - sprintf(tmp, "replica*app.pegasus*disk.storage.sst(MB)@%d.", app_id); - command.arguments.push_back(tmp); - sprintf(tmp, "replica*app.pegasus*disk.storage.sst.count@%d.", app_id); - command.arguments.push_back(tmp); - std::vector> results; - call_remote_command(sc, nodes, command, results); + std::vector> results = call_remote_command( + sc, + nodes, + "perf-counters-by-prefix", + {fmt::format("replica*app.pegasus*disk.storage.sst(MB)@{}.", app_id), + fmt::format("replica*app.pegasus*disk.storage.sst.count@{}.", app_id)}); + std::map> disk_map; std::map> count_map; for (int i = 0; i < nodes.size(); ++i) { diff --git a/src/test/kill_test/CMakeLists.txt b/src/test/kill_test/CMakeLists.txt index e1e6217b3a..6246d209e2 100644 --- a/src/test/kill_test/CMakeLists.txt +++ b/src/test/kill_test/CMakeLists.txt @@ -15,7 +15,7 @@ set(MY_PROJ_LIBS pegasus_client_static dsn.replication.ddlclient dsn_replication_common - dsn_cli + dsn_dist_cmd dsn_runtime ) set(MY_BINPLACES "${CMAKE_CURRENT_SOURCE_DIR}/config.ini") diff --git a/src/test/kill_test/partition_kill_testor.cpp b/src/test/kill_test/partition_kill_testor.cpp index 4efa609f95..2162c01f16 100644 --- a/src/test/kill_test/partition_kill_testor.cpp +++ b/src/test/kill_test/partition_kill_testor.cpp @@ -13,16 +13,13 @@ #include #include #include +#include #include "partition_kill_testor.h" namespace pegasus { namespace test { -partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file) -{ - cmd.cmd = "replica.kill_partition"; - cmd.arguments.resize(2); -} +partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file) {} void partition_kill_testor::Run() { @@ -49,28 +46,31 @@ void partition_kill_testor::run() std::vector random_indexs; generate_random(random_indexs, random_num, 0, partitions.size() - 1); - dsn::cli_client cli; std::vector tasks(random_num); std::vector> results(random_num); + std::vector arguments(2); for (int i = 0; i < random_indexs.size(); ++i) { int index = random_indexs[i]; const auto &p = partitions[index]; - cmd.arguments[0] = to_string(p.pid.get_app_id()); - cmd.arguments[1] = to_string(p.pid.get_partition_index()); + arguments[0] = to_string(p.pid.get_app_id()); + arguments[1] = to_string(p.pid.get_partition_index()); - auto callback = [&results, - i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) { + auto callback = [&results, i](::dsn::error_code err, const std::string &resp) { if (err == ::dsn::ERR_OK) { results[i].first = true; - ::dsn::unmarshall(resp, results[i].second); + results[i].second = resp; } else { results[i].first = false; results[i].second = err.to_string(); } }; - tasks[i] = cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, p.primary); + tasks[i] = dsn::dist::cmd::async_call_remote(p.primary, + "replica.kill_partition", + arguments, + callback, + std::chrono::milliseconds(5000)); } for (int i = 0; i < tasks.size(); ++i) { diff --git a/src/test/kill_test/partition_kill_testor.h b/src/test/kill_test/partition_kill_testor.h index 141419e302..519812ca4b 100644 --- a/src/test/kill_test/partition_kill_testor.h +++ b/src/test/kill_test/partition_kill_testor.h @@ -2,7 +2,6 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. -#include #include #include @@ -21,8 +20,6 @@ class partition_kill_testor : public kill_testor private: void run(); - - ::dsn::command cmd; }; } // namespace test } // namespace pegasus