Skip to content

Commit

Permalink
replica-server: remove checkpoint dirs when cold backup completed (#216
Browse files Browse the repository at this point in the history
)
  • Loading branch information
qinzuoyan authored Jan 14, 2019
1 parent 813824f commit 3a2a054
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/core/tests/service_api_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ TEST(core, dsn_file)
ASSERT_NE(nullptr, tin);

if (dsn::tools::get_current_tool()->name() != "simulator") {
// 1 for tin, 1 for disk_engine
ASSERT_EQ(2, tin->get_count());
// at least 1 for tin, but if already read completed, then only 1
ASSERT_LE(1, tin->get_count());
}

tin->wait();
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required)
// FIXME : the file could have been closed
lf->flush();

// update _private_max_commit_on_disk after writen into log file done
// update _private_max_commit_on_disk after written into log file done
update_max_commit_on_disk(max_commit);
} else {
derror("write private log failed, err = %s", err.to_string());
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ class log_file : public ref_counter
static log_block *prepare_log_block();

// async write log entry into the file
// 'block' is the date to be writen
// 'block' is the date to be written
// 'offset' is start offset of the entry in the global space
// 'evt' is to indicate which thread pool to execute the callback
// 'callback_host' is used to get tracer
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

/////////////////////////////////////////////////////////////////
// cold backup
void clear_backup_checkpoint(const std::string &policy_name);
void generate_backup_checkpoint(cold_backup_context_ptr backup_context);
void trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
void wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
Expand Down
139 changes: 117 additions & 22 deletions src/dist/replication/lib/replica_backup.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <boost/lexical_cast.hpp>

#include <dsn/utility/filesystem.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>

#include "dist/replication/common/block_service_manager.h"
Expand All @@ -13,6 +14,7 @@
namespace dsn {
namespace replication {

// backup_id == 0 means clear backup context and checkpoint dirs of the policy.
void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response)
{
_checker.only_one_thread_access();
Expand All @@ -22,13 +24,30 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
cold_backup_context_ptr new_context(
new cold_backup_context(this, request, _options->max_concurrent_uploading_file_count));

ddebug("%s: received cold backup request, partition_status = %s%s",
new_context->name,
enum_to_string(status()),
backup_id == 0 ? ", this is a clear request" : "");

if (status() == partition_status::type::PS_PRIMARY ||
status() == partition_status::type::PS_SECONDARY) {
cold_backup_context_ptr backup_context = nullptr;
auto find = _cold_backup_contexts.find(policy_name);
if (find != _cold_backup_contexts.end()) {
backup_context = find->second;
} else {
if (backup_id == 0) {
if (status() == partition_status::type::PS_PRIMARY) {
// send clear request to secondaries
send_backup_request_to_secondary(request);
}
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
return;
}

/// TODO: policy may change provider
dist::block_service::block_filesystem *block_service =
_stub->_block_service_manager.get_block_filesystem(
Expand All @@ -55,17 +74,29 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
policy_name.c_str());
cold_backup_status backup_status = backup_context->status();

if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) {
// clear obsoleted backup firstly
/// TODO: clear dir
ddebug("%s: clear obsoleted cold backup, old_backup_id = %" PRId64
if (backup_id == 0 || backup_context->request.backup_id < backup_id ||
backup_status == ColdBackupCanceled) {
// clear obsoleted backup context firstly
ddebug("%s: clear obsoleted cold backup context, old_backup_id = %" PRId64
", old_backup_status = %s",
new_context->name,
backup_context->request.backup_id,
cold_backup_status_to_string(backup_status));
backup_context->cancel();
_cold_backup_contexts.erase(policy_name);
on_cold_backup(request, response);
if (backup_id != 0) {
// go to another round
on_cold_backup(request, response);
} else { // backup_id == 0
if (status() == partition_status::type::PS_PRIMARY) {
// send clear request to secondaries
send_backup_request_to_secondary(request);
}
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
}
return;
}

Expand Down Expand Up @@ -144,6 +175,14 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
_cold_backup_contexts.erase(policy_name);
} else if (backup_status == ColdBackupCompleted) {
ddebug("%s: upload checkpoint completed, response ERR_OK", backup_context->name);
// send clear request to secondaries
backup_request new_request = request;
new_request.backup_id = 0;
send_backup_request_to_secondary(new_request);
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
response.err = ERR_OK;
} else {
dwarn(
Expand Down Expand Up @@ -201,6 +240,39 @@ backup_get_tmp_dir_name(const std::string &policy_name, int64_t backup_id, int64
return std::string(buffer);
}

// returns true if this checkpoint dir belongs to the policy
static bool is_policy_checkpoint(const std::string &chkpt_dirname, const std::string &policy_name)
{
std::vector<std::string> strs;
utils::split_args(chkpt_dirname.c_str(), strs, '.');
// backup_tmp.<policy_name>.* or backup.<policy_name>.*
return strs.size() >= 2 &&
(strs[0] == std::string("backup_tmp") || strs[0] == std::string("backup")) &&
strs[1] == policy_name;
}

// get all backup checkpoint dirs which belong to the policy
static bool get_policy_checkpoint_dirs(const std::string &dir,
const std::string &policy,
/*out*/ std::vector<std::string> &chkpt_dirs)
{
chkpt_dirs.clear();
// list sub dirs
std::vector<std::string> sub_dirs;
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
derror("list sub dirs of dir %s failed", dir.c_str());
return false;
}

for (std::string &d : sub_dirs) {
std::string dirname = utils::filesystem::get_file_name(d);
if (is_policy_checkpoint(dirname, policy)) {
chkpt_dirs.emplace_back(std::move(dirname));
}
}
return true;
}

// returns:
// 0 : not related
// 1 : related (belong to this policy but not belong to this backup_context)
Expand Down Expand Up @@ -263,13 +335,13 @@ static bool filter_checkpoint(const std::string &dir,
related_chkpt_dirs.clear();
// list sub dirs
std::vector<std::string> sub_dirs;
if (!dsn::utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
derror("%s: list sub dirs of dir %s failed", backup_context->name, dir.c_str());
return false;
}

for (std::string &d : sub_dirs) {
std::string dirname = ::dsn::utils::filesystem::get_file_name(d);
std::string dirname = utils::filesystem::get_file_name(d);
int ret = is_related_or_valid_checkpoint(dirname, backup_context);
if (ret == 1) {
related_chkpt_dirs.emplace_back(std::move(dirname));
Expand All @@ -291,7 +363,7 @@ statistic_file_infos_under_dir(const std::string &dir,
/*out*/ int64_t &total_size)
{
std::vector<std::string> sub_files;
if (!dsn::utils::filesystem::get_subfiles(dir, sub_files, false)) {
if (!utils::filesystem::get_subfiles(dir, sub_files, false)) {
derror("list sub files of dir %s failed", dir.c_str());
return false;
}
Expand All @@ -302,11 +374,11 @@ statistic_file_infos_under_dir(const std::string &dir,
for (std::string &file : sub_files) {
std::pair<std::string, int64_t> file_info;

if (!::dsn::utils::filesystem::file_size(file, file_info.second)) {
if (!utils::filesystem::file_size(file, file_info.second)) {
derror("get file size of %s failed", file.c_str());
return false;
}
file_info.first = ::dsn::utils::filesystem::get_file_name(file);
file_info.first = utils::filesystem::get_file_name(file);
total_size += file_info.second;

file_infos.emplace_back(std::move(file_info));
Expand Down Expand Up @@ -334,6 +406,29 @@ static bool backup_parse_dir_name(const char *name,
}
}

// clear all checkpoint dirs of the policy
void replica::clear_backup_checkpoint(const std::string &policy_name)
{
ddebug_replica("clear all checkpoint dirs of policy({})", policy_name);
auto backup_dir = _app->backup_dir();
if (!utils::filesystem::directory_exists(backup_dir)) {
return;
}
std::vector<std::string> chkpt_dirs;
if (!get_policy_checkpoint_dirs(backup_dir, policy_name, chkpt_dirs)) {
dwarn_replica("get checkpoint dirs in backup dir({}) failed", backup_dir);
return;
}
for (const std::string &dirname : chkpt_dirs) {
std::string full_path = utils::filesystem::path_combine(backup_dir, dirname);
if (utils::filesystem::remove_path(full_path)) {
ddebug_replica("remove backup checkpoint dir({}) succeed", full_path);
} else {
dwarn_replica("remove backup checkpoint dir({}) failed", full_path);
}
}
}

// run in REPLICATION_LONG thread
// Effection:
// - may ignore_checkpoint() if in invalid status
Expand All @@ -353,8 +448,8 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context)

// prepare back dir
auto backup_dir = _app->backup_dir();
if (!dsn::utils::filesystem::directory_exists(backup_dir) &&
!dsn::utils::filesystem::create_directory(backup_dir)) {
if (!utils::filesystem::directory_exists(backup_dir) &&
!utils::filesystem::create_directory(backup_dir)) {
derror("%s: create backup dir %s failed", backup_context->name, backup_dir.c_str());
backup_context->fail_checkpoint("create backup dir failed");
return;
Expand All @@ -372,7 +467,7 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context)
std::vector<std::pair<std::string, int64_t>> file_infos;
int64_t total_size = 0;
std::string valid_chkpt_full_path =
::dsn::utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname);
utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname);
// parse checkpoint dirname
std::string policy_name;
int64_t backup_id = 0, decree = 0, timestamp = 0;
Expand Down Expand Up @@ -423,11 +518,11 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context)

// clear related but not valid checkpoint
for (const std::string &dirname : related_backup_chkpt_dirname) {
std::string full_path = ::dsn::utils::filesystem::path_combine(backup_dir, dirname);
std::string full_path = utils::filesystem::path_combine(backup_dir, dirname);
ddebug("%s: found obsolete backup checkpoint dir(%s), remove it",
backup_context->name,
full_path.c_str());
if (!dsn::utils::filesystem::remove_path(full_path)) {
if (!utils::filesystem::remove_path(full_path)) {
dwarn("%s: remove obsolete backup checkpoint dir(%s) failed",
backup_context->name,
full_path.c_str());
Expand Down Expand Up @@ -578,7 +673,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont

// the real checkpoint decree may be larger than backup_context->checkpoint_decree,
// so we need copy checkpoint to backup_checkpoint_tmp_dir_path, and then rename it.
std::string backup_checkpoint_tmp_dir_path = ::dsn::utils::filesystem::path_combine(
std::string backup_checkpoint_tmp_dir_path = utils::filesystem::path_combine(
_app->backup_dir(),
backup_get_tmp_dir_name(backup_context->request.policy.policy_name,
backup_context->request.backup_id,
Expand All @@ -592,7 +687,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont
"local_create_backup_checkpoint 10s later",
backup_context->name,
err.to_string());
dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
tasking::enqueue(
LPC_BACKGROUND_COLD_BACKUP,
&_tracker,
Expand All @@ -605,20 +700,20 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont
last_decree,
backup_context->checkpoint_decree);
backup_context->checkpoint_decree = last_decree; // update to real decree
std::string backup_checkpoint_dir_path = ::dsn::utils::filesystem::path_combine(
std::string backup_checkpoint_dir_path = utils::filesystem::path_combine(
_app->backup_dir(),
backup_get_dir_name(backup_context->request.policy.policy_name,
backup_context->request.backup_id,
backup_context->checkpoint_decree,
backup_context->checkpoint_timestamp));
if (!dsn::utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path,
backup_checkpoint_dir_path)) {
if (!utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path,
backup_checkpoint_dir_path)) {
derror("%s: rename checkpoint dir(%s) to dir(%s) failed",
backup_context->name,
backup_checkpoint_tmp_dir_path.c_str(),
backup_checkpoint_dir_path.c_str());
dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
dsn::utils::filesystem::remove_path(backup_checkpoint_dir_path);
utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path);
utils::filesystem::remove_path(backup_checkpoint_dir_path);
backup_context->fail_checkpoint("rename checkpoint dir failed");
return;
}
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/replication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ struct configuration_restore_request
8:bool skip_bad_partition;
}

// if backup_id == 0, means clear all backup resources (including backup contexts and
// checkpoint dirs) of this policy.
struct backup_request
{
1:dsn.gpid pid;
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/test/simple_kv/case-402.act
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ client:begin_write:id=169,key=k169,value=v169,timeout=0
inject:on_aio_call:node=r2,task_code=LPC_WRITE_REPLICATION_LOG_SHARED
config:{4,r1,[r3]}

state:{{r1,pri,4,23},{r3,sec,4,20}}
state:{{r1,pri,4,20},{r3,sec,4,11}}

set:disable_load_balance=0

Expand Down

0 comments on commit 3a2a054

Please sign in to comment.