Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(split): add child copy mutation synchronously (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Feb 26, 2021
1 parent c057d35 commit a1a6c2f
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 14 deletions.
6 changes: 6 additions & 0 deletions src/common/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ struct replica_configuration
// Used for bulk load
// secondary will pop all committed mutations even if buffer is not full
6:optional bool pop_all = false;
// Used for partition split when primary send prepare message to secondary
// 1. true - secondary should copy mutation in this prepare message synchronously,
// and _is_sync_to_child in mutation structure should set true
// 2. false - secondary copy mutation in this prepare message asynchronously
// NOTICE: it should always be false when update_local_configuration
7:optional bool split_sync_to_child = false;
}

struct replica_info
Expand Down
1 change: 1 addition & 0 deletions src/replica/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mutation::mutation()
_appro_data_bytes = sizeof(mutation_header);
_create_ts_ns = dsn_now_ns();
_tid = ++s_tid;
_is_sync_to_child = false;
tracer =
std::make_shared<dsn::utils::latency_tracer>(fmt::format("{}[{}]", "mutation", _tid),
false,
Expand Down
15 changes: 14 additions & 1 deletion src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class mutation : public ref_counter
{
return _left_potential_secondary_ack_count;
}
bool is_child_acked() const { return !_wait_child; }
bool is_error_acked() const { return _is_error_acked; }
::dsn::task_ptr &log_task() { return _log_task; }
node_tasks &remote_tasks() { return _prepare_or_commit_tasks; }
bool is_prepare_close_to_timeout(int gap_ms, int timeout_ms)
Expand Down Expand Up @@ -112,6 +114,9 @@ class mutation : public ref_counter
{
_left_potential_secondary_ack_count = count;
}
void wait_child() { _wait_child = true; }
void child_acked() { _wait_child = false; }
void set_error_acked() { _is_error_acked = true; }
int clear_prepare_or_commit_tasks();
void wait_log_task() const;
uint64_t prepare_ts_ms() const { return _prepare_ts_ms; }
Expand Down Expand Up @@ -155,7 +160,15 @@ class mutation : public ref_counter
{
unsigned int _not_logged : 1;
unsigned int _left_secondary_ack_count : 15;
unsigned int _left_potential_secondary_ack_count : 16;
unsigned int _left_potential_secondary_ack_count : 14;
// Used for partition split
// _wait_child = true : child prepare mutation synchronously, its parent should wait for
// child ack
bool _wait_child : 1;
// Used for partition split
// when prepare failed when child prepare mutation synchronously, secondary may try to
// ack to primary twice, we use _is_error_acked to restrict only ack once
bool _is_error_acked : 1;
};
uint32_t _private0;
};
Expand Down
40 changes: 34 additions & 6 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
mu->name(),
mu->tid());

// child should prepare mutation synchronously
mu->set_is_sync_to_child(_primary_states.sync_send_write_request);

// check bounded staleness
if (mu->data.header.decree > last_committed_decree() + _options->staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
Expand Down Expand Up @@ -298,6 +301,9 @@ void replica::send_prepare_message(::dsn::rpc_address addr,
replica_configuration rconfig;
_primary_states.get_replica_config(status, rconfig, learn_signature);
rconfig.__set_pop_all(pop_all_committed_mutations);
if (status == partition_status::PS_SECONDARY && _primary_states.sync_send_write_request) {
rconfig.__set_split_sync_to_child(true);
}

{
rpc_write_stream writer(msg);
Expand Down Expand Up @@ -349,6 +355,8 @@ void replica::on_prepare(dsn::message_ex *request)
rpc_read_stream reader(request);
unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
mu = mutation::read_from(reader, request);
mu->set_is_sync_to_child(rconfig.split_sync_to_child);
rconfig.split_sync_to_child = false;
}

ADD_POINT(mu->tracer);
Expand Down Expand Up @@ -538,6 +546,12 @@ void replica::on_append_log_completed(mutation_ptr &mu, error_code err, size_t s
// always ack
ack_prepare_message(err, mu);
break;
case partition_status::PS_PARTITION_SPLIT:
if (err != ERR_OK) {
handle_local_failure(err);
}
_split_mgr->ack_parent(err, mu);
break;
case partition_status::PS_ERROR:
break;
default:
Expand Down Expand Up @@ -738,14 +752,28 @@ void replica::ack_prepare_message(error_code err, mutation_ptr &mu)

const std::vector<dsn::message_ex *> &prepare_requests = mu->prepare_requests();
dassert(!prepare_requests.empty(), "mutation = %s", mu->name());
for (auto &request : prepare_requests) {
reply(request, resp);
}

if (err == ERR_OK) {
dinfo("%s: mutation %s ack_prepare_message, err = %s", name(), mu->name(), err.to_string());
} else {
dwarn("%s: mutation %s ack_prepare_message, err = %s", name(), mu->name(), err.to_string());
if (mu->is_child_acked()) {
dinfo_replica("mutation {} ack_prepare_message, err = {}", mu->name(), err);
for (auto &request : prepare_requests) {
reply(request, resp);
}
}
return;
}
// only happened when prepare failed during partition split child copy mutation synchronously
if (mu->is_error_acked()) {
dwarn_replica("mutation {} has been ack_prepare_message, err = {}", mu->name(), err);
return;
}

dwarn_replica("mutation {} ack_prepare_message, err = {}", mu->name(), err);
if (mu->is_sync_to_child()) {
mu->set_error_acked();
}
for (auto &request : prepare_requests) {
reply(request, resp);
}
}

Expand Down
79 changes: 72 additions & 7 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,9 @@ void replica_split_manager::copy_mutation(mutation_ptr &mu) // on parent partiti
{
dassert_replica(_child_gpid.get_app_id() > 0, "child_gpid({}) is invalid", _child_gpid);

// TODO(hyc): if copy mutation synchronously, add flags
if (mu->is_sync_to_child()) {
mu->wait_child();
}

mutation_ptr new_mu = mutation::copy_no_reply(mu);
error_code ec = _stub->split_replica_exec(
Expand Down Expand Up @@ -1253,25 +1255,88 @@ void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child parti
mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash());
_replica->_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr, get_gpid().thread_hash());
} else {
// TODO(heyuchen): child copy mutation synchronously
} else { // child sync copy mutation
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
tracker(),
std::bind(&replica::on_append_log_completed,
_replica,
mu,
std::placeholders::_1,
std::placeholders::_2),
get_gpid().thread_hash());
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::ack_parent(error_code ec, mutation_ptr &mu) // on child partition
{
// TODO(heyuchen): when child copy mutation synchronously, child replica send ack to its parent
// TBD
dassert_replica(mu->is_sync_to_child(), "mutation({}) should be copied synchronously");
_stub->split_replica_exec(LPC_PARTITION_SPLIT,
_replica->_split_states.parent_gpid,
std::bind(&replica_split_manager::on_copy_mutation_reply,
std::placeholders::_1,
ec,
mu->data.header.ballot,
mu->data.header.decree));
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation_reply(error_code ec,
ballot b,
decree d) // on parent partition
{
// TODO(heyuchen): when child copy mutation synchronously, parent replica handle child ack
// TBD
_replica->_checker.only_one_thread_access();

auto mu = _replica->_prepare_list->get_mutation_by_decree(d);
if (mu == nullptr) {
derror_replica("failed to get mutation in prepare list, decree = {}", d);
return;
}

if (mu->data.header.ballot != b) {
derror_replica("ballot not match, mutation ballot({}) vs child mutation ballot({})",
mu->data.header.ballot,
b);
return;
}

// set child prepare mutation flag
if (ec == ERR_OK) {
mu->child_acked();
} else {
derror_replica("child({}) copy mutation({}) failed, ballot={}, decree={}, error={}",
_child_gpid,
mu->name(),
b,
d,
ec);
}

// handle child ack
if (mu->data.header.ballot >= get_ballot() && status() != partition_status::PS_INACTIVE) {
switch (status()) {
case partition_status::PS_PRIMARY:
if (ec != ERR_OK) {
_replica->handle_local_failure(ec);
} else {
_replica->do_possible_commit_on_primary(mu);
}
break;
case partition_status::PS_SECONDARY:
case partition_status::PS_POTENTIAL_SECONDARY:
if (ec != ERR_OK) {
_replica->handle_local_failure(ec);
}
_replica->ack_prepare_message(ec, mu);
break;
case partition_status::PS_ERROR:
break;
default:
dassert_replica(false, "wrong status({})", enum_to_string(status()));
break;
}
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down

0 comments on commit a1a6c2f

Please sign in to comment.