Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): add child copy mutation synchronously #727

Merged
merged 7 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/common/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ struct replica_configuration
// Used for bulk load
// secondary will pop all committed mutations even if buffer is not full
6:optional bool pop_all = false;
// Used for partition split when primary send prepare message to secondary
// 1. true - secondary should copy mutation in this prepare message synchronously,
// and _is_sync_to_child in mutation structure should set true
// 2. false - secondary copy mutation in this prepare message asynchronously
// NOTICE: it should always be false when update_local_configuration
7:optional bool split_sync_to_child = false;
}

struct replica_info
Expand Down
1 change: 1 addition & 0 deletions src/replica/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mutation::mutation()
_appro_data_bytes = sizeof(mutation_header);
_create_ts_ns = dsn_now_ns();
_tid = ++s_tid;
_is_sync_to_child = false;
tracer =
std::make_shared<dsn::utils::latency_tracer>(fmt::format("{}[{}]", "mutation", _tid),
false,
Expand Down
15 changes: 14 additions & 1 deletion src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class mutation : public ref_counter
{
return _left_potential_secondary_ack_count;
}
bool is_child_acked() const { return !_wait_child; }
bool is_error_acked() const { return _is_error_acked; }
::dsn::task_ptr &log_task() { return _log_task; }
node_tasks &remote_tasks() { return _prepare_or_commit_tasks; }
bool is_prepare_close_to_timeout(int gap_ms, int timeout_ms)
Expand Down Expand Up @@ -112,6 +114,9 @@ class mutation : public ref_counter
{
_left_potential_secondary_ack_count = count;
}
void wait_child() { _wait_child = true; }
void child_acked() { _wait_child = false; }
void set_error_acked() { _is_error_acked = true; }
int clear_prepare_or_commit_tasks();
void wait_log_task() const;
uint64_t prepare_ts_ms() const { return _prepare_ts_ms; }
Expand Down Expand Up @@ -155,7 +160,15 @@ class mutation : public ref_counter
{
unsigned int _not_logged : 1;
unsigned int _left_secondary_ack_count : 15;
unsigned int _left_potential_secondary_ack_count : 16;
unsigned int _left_potential_secondary_ack_count : 14;
// Used for partition split
// _wait_child = true : child prepare mutation synchronously, its parent should wait for
// child ack
bool _wait_child : 1;
// Used for partition split
// when prepare failed when child prepare mutation synchronously, secondary may try to
// ack to primary twice, we use _is_error_acked to restrict only ack once
hycdong marked this conversation as resolved.
Show resolved Hide resolved
bool _is_error_acked : 1;
};
uint32_t _private0;
};
Expand Down
40 changes: 34 additions & 6 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
mu->name(),
mu->tid());

// child should prepare mutation synchronously
mu->set_is_sync_to_child(_primary_states.sync_send_write_request);

// check bounded staleness
if (mu->data.header.decree > last_committed_decree() + _options->staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
Expand Down Expand Up @@ -298,6 +301,9 @@ void replica::send_prepare_message(::dsn::rpc_address addr,
replica_configuration rconfig;
_primary_states.get_replica_config(status, rconfig, learn_signature);
rconfig.__set_pop_all(pop_all_committed_mutations);
if (status == partition_status::PS_SECONDARY && _primary_states.sync_send_write_request) {
rconfig.__set_split_sync_to_child(true);
}

{
rpc_write_stream writer(msg);
Expand Down Expand Up @@ -349,6 +355,8 @@ void replica::on_prepare(dsn::message_ex *request)
rpc_read_stream reader(request);
unmarshall(reader, rconfig, DSF_THRIFT_BINARY);
mu = mutation::read_from(reader, request);
mu->set_is_sync_to_child(rconfig.split_sync_to_child);
rconfig.split_sync_to_child = false;
}

ADD_POINT(mu->tracer);
Expand Down Expand Up @@ -538,6 +546,12 @@ void replica::on_append_log_completed(mutation_ptr &mu, error_code err, size_t s
// always ack
ack_prepare_message(err, mu);
break;
case partition_status::PS_PARTITION_SPLIT:
if (err != ERR_OK) {
handle_local_failure(err);
}
_split_mgr->ack_parent(err, mu);
break;
case partition_status::PS_ERROR:
break;
default:
Expand Down Expand Up @@ -738,14 +752,28 @@ void replica::ack_prepare_message(error_code err, mutation_ptr &mu)

const std::vector<dsn::message_ex *> &prepare_requests = mu->prepare_requests();
dassert(!prepare_requests.empty(), "mutation = %s", mu->name());
for (auto &request : prepare_requests) {
reply(request, resp);
}

if (err == ERR_OK) {
dinfo("%s: mutation %s ack_prepare_message, err = %s", name(), mu->name(), err.to_string());
} else {
dwarn("%s: mutation %s ack_prepare_message, err = %s", name(), mu->name(), err.to_string());
if (mu->is_child_acked()) {
dinfo_replica("mutation {} ack_prepare_message, err = {}", mu->name(), err);
for (auto &request : prepare_requests) {
reply(request, resp);
}
}
return;
}
// only happened when prepare failed during partition split child copy mutation synchronously
if (mu->is_error_acked()) {
dwarn_replica("mutation {} has been ack_prepare_message, err = {}", mu->name(), err);
return;
}

dwarn_replica("mutation {} ack_prepare_message, err = {}", mu->name(), err);
if (mu->is_sync_to_child()) {
mu->set_error_acked();
}
for (auto &request : prepare_requests) {
reply(request, resp);
}
}

Expand Down
79 changes: 72 additions & 7 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,9 @@ void replica_split_manager::copy_mutation(mutation_ptr &mu) // on parent partiti
{
dassert_replica(_child_gpid.get_app_id() > 0, "child_gpid({}) is invalid", _child_gpid);

// TODO(hyc): if copy mutation synchronously, add flags
if (mu->is_sync_to_child()) {
mu->wait_child();
}

mutation_ptr new_mu = mutation::copy_no_reply(mu);
error_code ec = _stub->split_replica_exec(
Expand Down Expand Up @@ -1253,25 +1255,88 @@ void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child parti
mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash());
_replica->_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr, get_gpid().thread_hash());
} else {
// TODO(heyuchen): child copy mutation synchronously
} else { // child sync copy mutation
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
tracker(),
std::bind(&replica::on_append_log_completed,
_replica,
mu,
std::placeholders::_1,
std::placeholders::_2),
get_gpid().thread_hash());
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::ack_parent(error_code ec, mutation_ptr &mu) // on child partition
{
// TODO(heyuchen): when child copy mutation synchronously, child replica send ack to its parent
// TBD
dassert_replica(mu->is_sync_to_child(), "mutation({}) should be copied synchronously");
_stub->split_replica_exec(LPC_PARTITION_SPLIT,
_replica->_split_states.parent_gpid,
std::bind(&replica_split_manager::on_copy_mutation_reply,
std::placeholders::_1,
ec,
mu->data.header.ballot,
mu->data.header.decree));
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation_reply(error_code ec,
ballot b,
decree d) // on parent partition
{
// TODO(heyuchen): when child copy mutation synchronously, parent replica handle child ack
// TBD
_replica->_checker.only_one_thread_access();

auto mu = _replica->_prepare_list->get_mutation_by_decree(d);
if (mu == nullptr) {
derror_replica("failed to get mutation in prepare list, decree = {}", d);
return;
}

if (mu->data.header.ballot != b) {
derror_replica("ballot not match, mutation ballot({}) vs child mutation ballot({})",
mu->data.header.ballot,
b);
return;
}

// set child prepare mutation flag
if (ec == ERR_OK) {
mu->child_acked();
} else {
derror_replica("child({}) copy mutation({}) failed, ballot={}, decree={}, error={}",
_child_gpid,
mu->name(),
b,
d,
ec);
}

// handle child ack
if (mu->data.header.ballot >= get_ballot() && status() != partition_status::PS_INACTIVE) {
switch (status()) {
case partition_status::PS_PRIMARY:
if (ec != ERR_OK) {
_replica->handle_local_failure(ec);
} else {
_replica->do_possible_commit_on_primary(mu);
}
break;
case partition_status::PS_SECONDARY:
case partition_status::PS_POTENTIAL_SECONDARY:
if (ec != ERR_OK) {
_replica->handle_local_failure(ec);
}
_replica->ack_prepare_message(ec, mu);
break;
case partition_status::PS_ERROR:
break;
default:
dassert_replica(false, "wrong status({})", enum_to_string(status()));
break;
}
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down