Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: adapt to rdsn dist cmd refactoring #544

Merged
merged 3 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 32 files
+5 −12 compile_thrift.py
+0 −97 include/dsn/dist/cli/cli.client.h
+0 −8 include/dsn/dist/cli/cli.code.definition.h
+0 −60 include/dsn/dist/cli/cli.server.h
+26 −0 include/dsn/dist/remote_command.h
+1 −1 src/dist/CMakeLists.txt
+0 −14 src/dist/cli/CMakeLists.txt
+0 −13 src/dist/cli/cli.thrift
+0 −21 src/dist/cli/cli_server_impl.cpp
+0 −22 src/dist/cli/shell/CMakeLists.txt
+0 −45 src/dist/cli/shell/cli.main.cpp
+0 −131 src/dist/cli/shell/cli_app.cpp
+0 −57 src/dist/cli/shell/cli_app.h
+0 −63 src/dist/cli/shell/config.ini
+7 −0 src/dist/cmd/CMakeLists.txt
+7 −0 src/dist/cmd/command.thrift
+6 −3 src/dist/cmd/command_types.cpp
+7 −4 src/dist/cmd/command_types.h
+49 −0 src/dist/cmd/remote_command.cpp
+1 −1 src/dist/replication/lib/CMakeLists.txt
+82 −2 src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
+2 −0 src/dist/replication/lib/bulk_load/replica_bulk_loader.h
+83 −2 src/dist/replication/lib/bulk_load/test/replica_bulk_loader_test.cpp
+2 −0 src/dist/replication/lib/replica_context.cpp
+2 −0 src/dist/replication/lib/replica_context.h
+2 −2 src/dist/replication/lib/replica_stub.cpp
+0 −4 src/dist/replication/lib/replica_stub.h
+1 −1 src/dist/replication/meta_server/CMakeLists.txt
+3 −3 src/dist/replication/meta_server/meta_service.cpp
+0 −4 src/dist/replication/meta_server/meta_service.h
+1 −1 src/dist/replication/test/meta_test/unit_test/CMakeLists.txt
+9 −2 src/dist/replication/test/replica_test/unit_test/mock_utils.h
55 changes: 22 additions & 33 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <rocksdb/env.h>
#include <rocksdb/statistics.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/dist/cli/cli.client.h>
#include <dsn/dist/remote_command.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
Expand Down Expand Up @@ -464,32 +464,33 @@ inline bool fill_nodes(shell_context *sc, const std::string &type, std::vector<n
return true;
}

inline void call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
const ::dsn::command &cmd,
std::vector<std::pair<bool, std::string>> &results)
inline std::vector<std::pair<bool, std::string>>
call_remote_command(shell_context *sc,
const std::vector<node_desc> &nodes,
const std::string &cmd,
const std::vector<std::string> &arguments)
{
dsn::cli_client cli;
std::vector<std::pair<bool, std::string>> results;
std::vector<dsn::task_ptr> tasks;
tasks.resize(nodes.size());
results.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
auto callback = [&results,
i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) {
auto callback = [&results, i](::dsn::error_code err, const std::string &resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
::dsn::unmarshall(resp, results[i].second);
results[i].second = resp;
} else {
results[i].first = false;
results[i].second = err.to_string();
}
};
tasks[i] =
cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, nodes[i].address);
tasks[i] = dsn::dist::cmd::async_call_remote(
nodes[i].address, cmd, arguments, callback, std::chrono::milliseconds(5000));
}
for (int i = 0; i < nodes.size(); ++i) {
tasks[i]->wait();
}
return results;
}

inline bool parse_app_pegasus_perf_counter_name(const std::string &name,
Expand Down Expand Up @@ -771,13 +772,8 @@ inline bool get_app_partition_stat(shell_context *sc,
}

// get all of the perf counters with format ".*@.*"
::dsn::command command;
command.cmd = "perf-counters";
char tmp[256];
sprintf(tmp, ".*@.*");
command.arguments.emplace_back(tmp);
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters", {".*@.*"});

for (int i = 0; i < nodes.size(); ++i) {
// decode info of perf-counters on node i
Expand Down Expand Up @@ -841,17 +837,16 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
}
}

::dsn::command command;
command.cmd = "perf-counters";
std::vector<std::string> arguments;
char tmp[256];
if (app_name.empty()) {
sprintf(tmp, ".*@.*");
} else {
sprintf(tmp, ".*@%d\\..*", app_info->app_id);
}
command.arguments.emplace_back(tmp);
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
arguments.emplace_back(tmp);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters", arguments);

if (app_name.empty()) {
std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
Expand Down Expand Up @@ -964,11 +959,8 @@ inline bool get_capacity_unit_stat(shell_context *sc,
return false;
}

::dsn::command command;
command.cmd = "perf-counters-by-substr";
command.arguments.emplace_back(".cu@");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, nodes, "perf-counters-by-substr", {".cu@"});

nodes_stat.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
Expand Down Expand Up @@ -1036,11 +1028,8 @@ inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_s
}
}

::dsn::command command;
command.cmd = "perf-counters-by-prefix";
command.arguments.emplace_back("replica*app.pegasus*disk.storage.sst(MB)");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results = call_remote_command(
sc, nodes, "perf-counters-by-prefix", {"replica*app.pegasus*disk.storage.sst(MB)"});

for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
Expand Down
1 change: 0 additions & 1 deletion src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <dsn/utility/output_utils.h>
#include <dsn/utility/string_conv.h>
#include <dsn/utility/string_view.h>
#include <dsn/dist/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>

Expand Down
71 changes: 35 additions & 36 deletions src/shell/commands/node_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
return true;
}

::dsn::command command;
command.cmd = "perf-counters-by-prefix";
command.arguments.push_back("replica*server*memused.res(MB)");
command.arguments.push_back("replica*app.pegasus*rdb.block_cache.memory_usage");
command.arguments.push_back("replica*eon.replica_stub*disk.available.total.ratio");
command.arguments.push_back("replica*eon.replica_stub*disk.available.min.ratio");
command.arguments.push_back("replica*app.pegasus*rdb.memtable.memory_usage");
command.arguments.push_back("replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-prefix",
{"replica*server*memused.res(MB)",
"replica*app.pegasus*rdb.block_cache.memory_usage",
"replica*eon.replica_stub*disk.available.total.ratio",
"replica*eon.replica_stub*disk.available.min.ratio",
"replica*app.pegasus*rdb.memtable.memory_usage",
"replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage"});

for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
Expand Down Expand Up @@ -242,17 +242,16 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
return true;
}

::dsn::command command;
command.cmd = "perf-counters-by-prefix";
command.arguments.push_back("replica*app.pegasus*get_qps");
command.arguments.push_back("replica*app.pegasus*multi_get_qps");
command.arguments.push_back("replica*app.pegasus*put_qps");
command.arguments.push_back("replica*app.pegasus*multi_put_qps");
command.arguments.push_back("replica*app.pegasus*recent.read.cu");
command.arguments.push_back("replica*app.pegasus*recent.write.cu");

std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-prefix",
{"replica*app.pegasus*get_qps",
"replica*app.pegasus*multi_get_qps",
"replica*app.pegasus*put_qps",
"replica*app.pegasus*multi_put_qps",
"replica*app.pegasus*recent.read.cu",
"replica*app.pegasus*recent.write.cu"});

for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
Expand Down Expand Up @@ -301,14 +300,14 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
return true;
}

::dsn::command command;
command.cmd = "perf-counters-by-postfix";
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_GET.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_PUT.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server");
command.arguments.push_back("zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc,
nodes,
"perf-counters-by-postfix",
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server"});

for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
Expand Down Expand Up @@ -505,10 +504,10 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args)
return false;
}

::dsn::command cmd;
cmd.cmd = args.argv[optind];
std::string cmd = args.argv[optind];
std::vector<std::string> arguments;
for (int i = optind + 1; i < args.argc; i++) {
cmd.arguments.push_back(args.argv[i]);
arguments.push_back(args.argv[i]);
}

std::vector<node_desc> node_list;
Expand All @@ -535,14 +534,14 @@ bool remote_command(command_executor *e, shell_context *sc, arguments args)
}
}

fprintf(stderr, "COMMAND: %s", cmd.cmd.c_str());
for (auto &s : cmd.arguments) {
fprintf(stderr, "COMMAND: %s", cmd.c_str());
for (auto &s : arguments) {
fprintf(stderr, " %s", s.c_str());
}
fprintf(stderr, "\n\n");

std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, node_list, cmd, results);
std::vector<std::pair<bool, std::string>> results =
call_remote_command(sc, node_list, cmd, arguments);

int succeed = 0;
int failed = 0;
Expand Down
16 changes: 7 additions & 9 deletions src/shell/commands/table_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,13 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
return true;
}

::dsn::command command;
command.cmd = "perf-counters-by-prefix";
char tmp[256];
sprintf(tmp, "replica*app.pegasus*disk.storage.sst(MB)@%d.", app_id);
command.arguments.push_back(tmp);
sprintf(tmp, "replica*app.pegasus*disk.storage.sst.count@%d.", app_id);
command.arguments.push_back(tmp);
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
std::vector<std::pair<bool, std::string>> results = call_remote_command(
sc,
nodes,
"perf-counters-by-prefix",
{fmt::format("replica*app.pegasus*disk.storage.sst(MB)@{}.", app_id),
fmt::format("replica*app.pegasus*disk.storage.sst.count@{}.", app_id)});

std::map<dsn::rpc_address, std::map<int32_t, double>> disk_map;
std::map<dsn::rpc_address, std::map<int32_t, double>> count_map;
for (int i = 0; i < nodes.size(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/kill_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set(MY_PROJ_LIBS
pegasus_client_static
dsn.replication.ddlclient
dsn_replication_common
dsn_cli
dsn_dist_cmd
dsn_runtime
)
set(MY_BINPLACES "${CMAKE_CURRENT_SOURCE_DIR}/config.ini")
Expand Down
24 changes: 12 additions & 12 deletions src/test/kill_test/partition_kill_testor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@
#include <atomic>
#include <memory>
#include <sys/time.h>
#include <dsn/dist/remote_command.h>

#include "partition_kill_testor.h"

namespace pegasus {
namespace test {
partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file)
{
cmd.cmd = "replica.kill_partition";
cmd.arguments.resize(2);
}
partition_kill_testor::partition_kill_testor(const char *config_file) : kill_testor(config_file) {}

void partition_kill_testor::Run()
{
Expand All @@ -49,28 +46,31 @@ void partition_kill_testor::run()
std::vector<int> random_indexs;
generate_random(random_indexs, random_num, 0, partitions.size() - 1);

dsn::cli_client cli;
std::vector<dsn::task_ptr> tasks(random_num);
std::vector<std::pair<bool, std::string>> results(random_num);

std::vector<std::string> arguments(2);
for (int i = 0; i < random_indexs.size(); ++i) {
int index = random_indexs[i];
const auto &p = partitions[index];

cmd.arguments[0] = to_string(p.pid.get_app_id());
cmd.arguments[1] = to_string(p.pid.get_partition_index());
arguments[0] = to_string(p.pid.get_app_id());
arguments[1] = to_string(p.pid.get_partition_index());

auto callback = [&results,
i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) {
auto callback = [&results, i](::dsn::error_code err, const std::string &resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
::dsn::unmarshall(resp, results[i].second);
results[i].second = resp;
} else {
results[i].first = false;
results[i].second = err.to_string();
}
};
tasks[i] = cli.call(cmd, callback, std::chrono::milliseconds(5000), 0, 0, 0, p.primary);
tasks[i] = dsn::dist::cmd::async_call_remote(p.primary,
"replica.kill_partition",
arguments,
callback,
std::chrono::milliseconds(5000));
}

for (int i = 0; i < tasks.size(); ++i) {
Expand Down
3 changes: 0 additions & 3 deletions src/test/kill_test/partition_kill_testor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/dist/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <pegasus/client.h>

Expand All @@ -21,8 +20,6 @@ class partition_kill_testor : public kill_testor

private:
void run();

::dsn::command cmd;
};
} // namespace test
} // namespace pegasus