Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dup): support shell set fail_mode and collector duplication ops #520

Merged
merged 18 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 49 files
+2 −0 include/dsn/dist/replication/duplication_common.h
+1 −0 include/dsn/dist/replication/replication.codes.h
+2 −0 include/dsn/dist/replication/replication_ddl_client.h
+99 −9 include/dsn/dist/replication/replication_types.h
+1 −1 scripts/linux/build.sh
+10 −0 src/dist/replication/common/duplication_common.cpp
+8 −0 src/dist/replication/common/replication_common.cpp
+1 −0 src/dist/replication/common/replication_common.h
+1,054 −924 src/dist/replication/common/replication_types.cpp
+14 −0 src/dist/replication/ddl_lib/replication_ddl_client.cpp
+10 −1 src/dist/replication/lib/CMakeLists.txt
+186 −0 src/dist/replication/lib/backup/replica_backup_manager.cpp
+35 −0 src/dist/replication/lib/backup/replica_backup_manager.h
+22 −0 src/dist/replication/lib/backup/test/CMakeLists.txt
+34 −0 src/dist/replication/lib/backup/test/config-test.ini
+39 −0 src/dist/replication/lib/backup/test/main.cpp
+34 −0 src/dist/replication/lib/backup/test/replica_backup_manager_test.cpp
+11 −0 src/dist/replication/lib/backup/test/run.sh
+1 −0 src/dist/replication/lib/duplication/duplication_sync_timer.cpp
+1 −0 src/dist/replication/lib/duplication/duplication_sync_timer.h
+47 −0 src/dist/replication/lib/duplication/load_from_private_log.cpp
+11 −0 src/dist/replication/lib/duplication/load_from_private_log.h
+10 −0 src/dist/replication/lib/duplication/replica_duplicator.h
+4 −0 src/dist/replication/lib/duplication/replica_duplicator_manager.cpp
+1 −0 src/dist/replication/lib/duplication/replica_duplicator_manager.h
+126 −34 src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp
+1 −1 src/dist/replication/lib/duplication/test/replica_http_service_test.cpp
+20 −0 src/dist/replication/lib/log_block.cpp
+53 −0 src/dist/replication/lib/log_block.h
+117 −155 src/dist/replication/lib/mutation_log.cpp
+17 −11 src/dist/replication/lib/mutation_log.h
+5 −6 src/dist/replication/lib/replica.cpp
+11 −4 src/dist/replication/lib/replica.h
+11 −132 src/dist/replication/lib/replica_backup.cpp
+1 −1 src/dist/replication/lib/replica_context.h
+1 −0 src/dist/replication/lib/replica_http_service.cpp
+2 −8 src/dist/replication/lib/replica_init.cpp
+58 −12 src/dist/replication/lib/replica_stub.cpp
+8 −0 src/dist/replication/lib/replica_stub.h
+34 −2 src/dist/replication/meta_server/duplication/duplication_info.cpp
+13 −4 src/dist/replication/meta_server/duplication/duplication_info.h
+4 −2 src/dist/replication/meta_server/duplication/meta_duplication_service.cpp
+15 −0 src/dist/replication/meta_server/meta_backup_service.cpp
+25 −4 src/dist/replication/replication.thrift
+26 −3 src/dist/replication/test/meta_test/unit_test/backup_test.cpp
+64 −0 src/dist/replication/test/meta_test/unit_test/meta_duplication_service_test.cpp
+5 −5 src/dist/replication/test/meta_test/unit_test/meta_http_service_test.cpp
+157 −0 src/dist/replication/test/replica_test/unit_test/log_block_test.cpp
+73 −0 src/dist/replication/test/replica_test/unit_test/log_file_test.cpp
5 changes: 5 additions & 0 deletions src/server/config.min.ini
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@

[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
onebox2 = 0.0.0.0:35601

[zookeeper]
hosts_list = 127.0.0.1:22181
Expand Down Expand Up @@ -205,3 +206,7 @@
[task.RPC_RRDB_RRDB_MULTI_GET]
is_profile = true
profiler::size.response.server = true

[duplication-group]
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
onebox = 1
onebox2 = 2
3 changes: 3 additions & 0 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ info_collector::app_stat_counters *info_collector::get_app_counters(const std::s
INIT_COUNTER(check_and_set_qps);
INIT_COUNTER(check_and_mutate_qps);
INIT_COUNTER(scan_qps);
INIT_COUNTER(dup_qps);
INIT_COUNTER(dup_shipped_ops);
INIT_COUNTER(dup_failed_shipping_ops);
INIT_COUNTER(recent_read_cu);
INIT_COUNTER(recent_write_cu);
INIT_COUNTER(recent_expire_count);
Expand Down
6 changes: 6 additions & 0 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class info_collector
check_and_set_qps->set(row_stats.total_check_and_set_qps);
check_and_mutate_qps->set(row_stats.total_check_and_mutate_qps);
scan_qps->set(row_stats.total_scan_qps);
dup_qps->set(row_stats.total_dup_qps);
dup_shipped_ops->set(row_stats.total_dup_shipped_ops);
dup_failed_shipping_ops->set(row_stats.total_dup_failed_shipping_ops);
recent_read_cu->set(row_stats.total_recent_read_cu);
recent_write_cu->set(row_stats.total_recent_write_cu);
recent_expire_count->set(row_stats.total_recent_expire_count);
Expand Down Expand Up @@ -87,6 +90,9 @@ class info_collector
::dsn::perf_counter_wrapper check_and_set_qps;
::dsn::perf_counter_wrapper check_and_mutate_qps;
::dsn::perf_counter_wrapper scan_qps;
::dsn::perf_counter_wrapper dup_qps;
::dsn::perf_counter_wrapper dup_shipped_ops;
::dsn::perf_counter_wrapper dup_failed_shipping_ops;
::dsn::perf_counter_wrapper recent_read_cu;
::dsn::perf_counter_wrapper recent_write_cu;
::dsn::perf_counter_wrapper recent_expire_count;
Expand Down
5 changes: 3 additions & 2 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void pegasus_mutation_duplicator::on_duplicate_reply(uint64_t hash,
// errors are acceptable.
// TODO(wutao1): print the entire request for future debugging.
if (dsn::rand::next_double01() <= 0.01) {
derror_replica("duplicate_rpc failed: {} [code:{}, timestamp:{}]",
derror_replica("duplicate_rpc failed: {} [timestamp:{}]",
err == dsn::ERR_OK ? _client->get_error_string(perr) : err.to_string(),
rpc.request().timestamp);
}
Expand Down Expand Up @@ -175,17 +175,18 @@ void pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
dsn::task_code rpc_code = std::get<1>(mut);
dsn::blob raw_message = std::get<2>(mut);
auto dreq = dsn::make_unique<dsn::apps::duplicate_request>();
uint64_t hash = get_hash_from_request(rpc_code, raw_message);

if (rpc_code == dsn::apps::RPC_RRDB_RRDB_DUPLICATE) {
// ignore if it is a DUPLICATE
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just can't understand your updates in this function, could you please explain to me?

Copy link
Contributor Author

@neverchanje neverchanje Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I left a comment here. It was a bug. DUPLICATE can not participate in duplication like other normal writes. Because duplication is designed to be a directed edge flowing from master to slave. Forwarding will unintentionally add edges in this topology.

} else {
dreq->__set_raw_message(raw_message);
dreq->__set_task_code(rpc_code);
dreq->__set_timestamp(std::get<0>(mut));
dreq->__set_cluster_id(get_current_cluster_id());
}

uint64_t hash = get_hash_from_request(rpc_code, raw_message);
duplicate_rpc rpc(std::move(dreq),
dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
10_s, // TODO(wutao1): configurable timeout.
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_mutation_duplicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class pegasus_mutation_duplicator : public dsn::replication::mutation_duplicator

void duplicate(mutation_tuple_set muts, callback cb) override;

~pegasus_mutation_duplicator() override { _env.__conf.tracker->wait_outstanding_tasks(); }
~pegasus_mutation_duplicator() override { _env.__conf.tracker->cancel_outstanding_tasks(); }

private:
void send(uint64_t hash, callback cb);
Expand Down
12 changes: 11 additions & 1 deletion src/server/table_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ struct table_stats
double get_total_write_qps() const
{
return total_put_qps + total_multi_put_qps + total_remove_qps + total_multi_remove_qps +
total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps;
total_incr_qps + total_check_and_set_qps + total_check_and_mutate_qps +
total_dup_qps;
}

double get_total_read_bytes() const
Expand All @@ -42,6 +43,9 @@ struct table_stats
total_check_and_set_qps += row.check_and_set_qps;
total_check_and_mutate_qps += row.check_and_mutate_qps;
total_scan_qps += row.scan_qps;
total_dup_qps += row.dup_qps;
total_dup_shipped_ops += row.dup_shipped_ops;
total_dup_failed_shipping_ops += row.dup_failed_shipping_ops;
total_recent_read_cu += row.recent_read_cu;
total_recent_write_cu += row.recent_write_cu;
total_recent_expire_count += row.recent_expire_count;
Expand Down Expand Up @@ -78,6 +82,9 @@ struct table_stats
total_check_and_set_qps += row_stats.total_check_and_set_qps;
total_check_and_mutate_qps += row_stats.total_check_and_mutate_qps;
total_scan_qps += row_stats.total_scan_qps;
total_dup_qps += row_stats.total_dup_qps;
total_dup_shipped_ops += row_stats.total_dup_shipped_ops;
total_dup_failed_shipping_ops += row_stats.total_dup_failed_shipping_ops;
total_recent_read_cu += row_stats.total_recent_read_cu;
total_recent_write_cu += row_stats.total_recent_write_cu;
total_recent_expire_count += row_stats.total_recent_expire_count;
Expand Down Expand Up @@ -116,6 +123,9 @@ struct table_stats
double total_check_and_set_qps = 0;
double total_check_and_mutate_qps = 0;
double total_scan_qps = 0;
double total_dup_qps = 0;
double total_dup_shipped_ops = 0;
double total_dup_failed_shipping_ops = 0;
double total_recent_read_cu = 0;
double total_recent_write_cu = 0;
double total_recent_expire_count = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/server/test/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -503,5 +503,5 @@ onebox = 1
onebox2 = 2

[pegasus.clusters]
onebox = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703
onebox2 = @LOCAL_IP@:34701,@LOCAL_IP@:34702,@LOCAL_IP@:34703
onebox = 0.0.0.0:34701
onebox2 = 0.0.0.0:35701
35 changes: 34 additions & 1 deletion src/server/test/pegasus_mutation_duplicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ using namespace dsn::replication;

class pegasus_mutation_duplicator_test : public pegasus_server_test_base
{
protected:
dsn::task_tracker _tracker;
dsn::pipeline::environment _env;

Expand Down Expand Up @@ -258,7 +259,7 @@ TEST_F(pegasus_mutation_duplicator_test, get_hash_from_request)

// Verifies that calls on `get_hash_key_from_request` won't make
// message unable to read. (if `get_hash_key_from_request` doesn't
// use copy the message internally, it will.)
// copy the message internally, it will.)
TEST_F(pegasus_mutation_duplicator_test, read_after_get_hash_key)
{
std::string hash_key("hash");
Expand Down Expand Up @@ -292,5 +293,37 @@ TEST_F(pegasus_mutation_duplicator_test, duplicate_isolated_hashkeys)

TEST_F(pegasus_mutation_duplicator_test, create_duplicator) { test_create_duplicator(); }

TEST_F(pegasus_mutation_duplicator_test, duplicate_duplicate)
{
replica_base replica(dsn::gpid(1, 1), "fake_replica", "temp");
auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
duplicator->set_task_environment(&_env);

dsn::apps::update_request request;
pegasus::pegasus_generate_key(request.key, std::string("hash"), std::string("sort"));
dsn::message_ptr msg =
dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
auto data = dsn::move_message_to_blob(msg.get());

// a duplicate from onebox2
dsn::apps::duplicate_request duplicate;
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
duplicate.cluster_id = 2;
duplicate.raw_message = data;
duplicate.timestamp = 200;
msg = dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_DUPLICATE);
data = dsn::move_message_to_blob(msg.get());

mutation_tuple_set muts;
muts.insert(std::make_tuple(200, dsn::apps::RPC_RRDB_RRDB_DUPLICATE, data));
RPC_MOCKING(duplicate_rpc)
{
duplicator->duplicate(muts, [&](size_t sz) {
// ensure no DUPLICATE is duplicated
ASSERT_EQ(sz, 0);
});
}
_tracker.wait_outstanding_tasks();
}

} // namespace server
} // namespace pegasus
11 changes: 10 additions & 1 deletion src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ struct row_data
double get_total_qps() const
{
return get_qps + multi_get_qps + scan_qps + put_qps + multi_put_qps + remove_qps +
multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps;
multi_remove_qps + incr_qps + check_and_set_qps + check_and_mutate_qps + dup_qps;
}

double get_total_cu() const { return recent_read_cu + recent_write_cu; }
Expand All @@ -560,6 +560,9 @@ struct row_data
double check_and_set_qps = 0;
double check_and_mutate_qps = 0;
double scan_qps = 0;
double dup_qps = 0;
double dup_shipped_ops = 0;
double dup_failed_shipping_ops = 0;
double recent_read_cu = 0;
double recent_write_cu = 0;
double recent_expire_count = 0;
Expand Down Expand Up @@ -607,6 +610,12 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name,
row.check_and_mutate_qps += value;
else if (counter_name == "scan_qps")
row.scan_qps += value;
else if (counter_name == "dup_qps")
row.dup_qps += value;
else if (counter_name == "dup_shipped_ops")
row.dup_shipped_ops += value;
else if (counter_name == "dup_failed_shipping_ops")
row.dup_failed_shipping_ops += value;
else if (counter_name == "recent.read.cu")
row.recent_read_cu += value;
else if (counter_name == "recent.write.cu")
Expand Down
2 changes: 2 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ bool start_dup(command_executor *e, shell_context *sc, arguments args);

bool pause_dup(command_executor *e, shell_context *sc, arguments args);

bool set_dup_fail_mode(command_executor *e, shell_context *sc, arguments args);

// == disk rebalance (see 'commands/disk_rebalance.cpp') == //

bool query_disk_capacity(command_executor *e, shell_context *sc, arguments args);
Expand Down
76 changes: 58 additions & 18 deletions src/shell/commands/duplication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ bool add_dup(command_executor *e, shell_context *sc, arguments args)

auto err_resp = sc->ddl_client->add_dup(app_name, remote_cluster_name, freeze);
dsn::error_s err = err_resp.get_error();
std::string hint;
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
hint = err_resp.get_value().hint;
}
if (!err.is_ok()) {
fmt::print(stderr,
Expand All @@ -62,6 +64,9 @@ bool add_dup(command_executor *e, shell_context *sc, arguments args)
remote_cluster_name,
freeze,
err.description());
if (!hint.empty()) {
fmt::print(stderr, "detail:\n {}\n", hint);
}
} else {
const auto &resp = err_resp.get_value();
fmt::print("adding duplication succeed [app: {}, remote: {}, appid: {}, dupid: "
Expand Down Expand Up @@ -122,10 +127,7 @@ bool query_dup(command_executor *e, shell_context *sc, arguments args)
err.description());
} else if (detail) {
fmt::print("duplications of app [{}] in detail:\n", app_name);
const auto &resp = err_resp.get_value();
for (auto info : resp.entry_list) {
fmt::print("{}\n\n", duplication_entry_to_string(info));
}
fmt::print("{}", duplication_query_response_to_string(err_resp.get_value()) + "\n\n");
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
} else {
const auto &resp = err_resp.get_value();
fmt::print("duplications of app [{}] are listed as below:\n", app_name);
Expand All @@ -152,6 +154,24 @@ bool query_dup(command_executor *e, shell_context *sc, arguments args)
return true;
}

void handle_duplication_modify_response(
const std::string &operation, const dsn::error_with<duplication_modify_response> &err_resp)
{
dsn::error_s err = err_resp.get_error();
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
}
std::string hint;
if (err.code() == dsn::ERR_OBJECT_NOT_FOUND) {
hint = " [duplication not found]";
}
if (err.is_ok()) {
fmt::print("{} succeed\n", operation);
} else {
fmt::print(stderr, "{} failed, error={}{}\n", operation, err.description(), hint);
}
}

bool change_dup_status(command_executor *e,
shell_context *sc,
const arguments &args,
Expand Down Expand Up @@ -184,20 +204,8 @@ bool change_dup_status(command_executor *e,
}

auto err_resp = sc->ddl_client->change_dup_status(app_name, dup_id, status);
dsn::error_s err = err_resp.get_error();
if (err.is_ok()) {
err = dsn::error_s::make(err_resp.get_value().err);
}
if (err.is_ok()) {
fmt::print("{}({}) for app [{}] succeed\n", operation, dup_id, app_name);
} else {
fmt::print(stderr,
"{}({}) for app [{}] failed, error={}\n",
operation,
dup_id,
app_name,
err.description());
}
handle_duplication_modify_response(
fmt::format("{}({}) for app {}", operation, dup_id, app_name), err_resp);
return true;
}

Expand All @@ -215,3 +223,35 @@ bool pause_dup(command_executor *e, shell_context *sc, arguments args)
{
return change_dup_status(e, sc, args, duplication_status::DS_PAUSE);
}

bool set_dup_fail_mode(command_executor *e, shell_context *sc, arguments args)
{
// set_dup_fail_mode <app_name> <dupid> <slow|skip>
using namespace dsn::replication;

argh::parser cmd(args.argc, args.argv);
if (cmd.pos_args().size() > 4) {
fmt::print(stderr, "too many params\n");
return false;
}
std::string app_name = cmd(1).str();
std::string dupid_str = cmd(2).str();
dupid_t dup_id;
if (!dsn::buf2int32(dupid_str, dup_id)) {
fmt::print(stderr, "invalid dup_id {}\n", dupid_str);
return false;
}
std::string fail_mode_str = cmd(3).str();
if (fail_mode_str != "slow" && fail_mode_str != "skip") {
fmt::print(stderr, "fail_mode must be \"slow\" or \"skip\": {}\n", fail_mode_str);
return false;
}
auto fmode = fail_mode_str == "slow" ? duplication_fail_mode::FAIL_SLOW
: duplication_fail_mode::FAIL_SKIP;

auto err_resp = sc->ddl_client->update_dup_fail_mode(app_name, dup_id, fmode);
auto operation = fmt::format(
"set duplication({}) fail_mode ({}) for app {}", dup_id, fail_mode_str, app_name);
handle_duplication_modify_response(operation, err_resp);
return true;
}
4 changes: 4 additions & 0 deletions src/shell/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ static command_executor commands[] = {
"query disk replica count info",
"[-n|--node replica_server(ip:port)][-a|-app app_name][-o|--out file_name][-j|--json]",
query_disk_replica},
{"set_dup_fail_mode",
"set fail_mode of duplication",
"<app_name> <dup_id> <slow|skip>",
set_dup_fail_mode},
{
"exit", "exit shell", "", exit_shell,
},
Expand Down