Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor(backup): move collect_backup_info to replica_backup_manager #434

Merged
merged 37 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
632d4f0
change fds to https
Mar 24, 2020
e58daf1
delay to delete cold backup checkpoint
Mar 24, 2020
3d532f4
delay clear checkpoint
Mar 25, 2020
55b89ad
fix
Mar 25, 2020
28135fc
fix
Mar 25, 2020
a0dc220
Merge remote-tracking branch 'upstream/master' into delay-clear-check…
Mar 25, 2020
3e52a1b
fix
Mar 25, 2020
0c718d9
clear cold backup
Mar 26, 2020
948a09c
develop
Mar 26, 2020
449fa06
develop
Mar 27, 2020
c23c5b6
refactor
Mar 30, 2020
e5cd104
fix
Mar 30, 2020
f50914e
add unit test
Mar 31, 2020
e530ce6
unit test
Mar 31, 2020
8938fce
Merge remote-tracking branch 'origin/master' into refactor-cold-backup
Mar 31, 2020
38dad93
format
Mar 31, 2020
d7b3059
refactor
Mar 31, 2020
2b9b2c2
fix
Mar 31, 2020
a270ce6
collect-backup-info
Mar 31, 2020
3dc3882
fix
Mar 31, 2020
8913e09
refactor
Mar 31, 2020
c6c8a9a
fix
Mar 31, 2020
6365696
Merge branch 'refactor-cold-backup' into collect-backup-info
Mar 31, 2020
a15d8d7
fix
Mar 31, 2020
699e04b
collect
Apr 1, 2020
4cbc209
fix
Apr 1, 2020
c8a2930
Merge branch 'refactor-cold-backup' into collect-backup-info
Apr 1, 2020
99c2116
fix
Apr 1, 2020
ad3ed96
Merge remote-tracking branch 'origin/master' into collect-backup-info
Apr 7, 2020
d82850d
fix
Apr 7, 2020
4c8ac8f
fix
Apr 7, 2020
d73fa7c
fix
Apr 7, 2020
aaff11c
Update src/dist/replication/lib/backup/replica_backup_manager.cpp
levy5307 Apr 7, 2020
05b1e8d
Update src/dist/replication/lib/backup/replica_backup_manager.h
levy5307 Apr 7, 2020
0ddaf0e
Update src/dist/replication/lib/backup/replica_backup_manager.cpp
levy5307 Apr 7, 2020
5b3473c
format
Apr 7, 2020
7e47e6e
fix
Apr 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/dist/replication/lib/backup/replica_backup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static bool get_policy_checkpoint_dirs(const std::string &dir,
// list sub dirs
std::vector<std::string> sub_dirs;
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
derror("list sub dirs of dir {} failed", dir.c_str());
derror_f("list sub dirs of dir {} failed", dir.c_str());
return false;
}

Expand All @@ -44,6 +44,15 @@ static bool get_policy_checkpoint_dirs(const std::string &dir,
return true;
}

replica_backup_manager::replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}

replica_backup_manager::~replica_backup_manager()
{
if (_collect_info_timer != nullptr) {
_collect_info_timer->cancel(true);
hycdong marked this conversation as resolved.
Show resolved Hide resolved
}
}

void replica_backup_manager::on_clear_cold_backup(const backup_clear_request &request)
{
_replica->_checker.only_one_thread_access();
Expand Down Expand Up @@ -73,6 +82,53 @@ void replica_backup_manager::on_clear_cold_backup(const backup_clear_request &re
background_clear_backup_checkpoint(request.policy_name);
}

void replica_backup_manager::start_collect_backup_info()
{
if (_collect_info_timer == nullptr) {
_collect_info_timer =
tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
&_replica->_tracker,
[this]() { collect_backup_info(); },
std::chrono::milliseconds(_replica->options()->gc_interval_ms),
get_gpid().thread_hash());
}
}

void replica_backup_manager::collect_backup_info()
{
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
uint64_t now_ms = dsn_now_ms();

// collect backup info from all of the cold backup contexts
for (const auto &p : _replica->_cold_backup_contexts) {
const cold_backup_context_ptr &backup_context = p.second;
cold_backup_status backup_status = backup_context->status();
if (_replica->status() == partition_status::type::PS_PRIMARY) {
if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCanceled) {
cold_backup_running_count++;
}
} else if (_replica->status() == partition_status::type::PS_SECONDARY) {
// secondary end backup with status ColdBackupCheckpointed
if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCheckpointed) {
cold_backup_running_count++;
}
}

if (backup_status == ColdBackupUploading) {
cold_backup_max_duration_time_ms = std::max(
cold_backup_max_duration_time_ms, now_ms - backup_context->get_start_time_ms());
cold_backup_max_upload_file_size =
std::max(cold_backup_max_upload_file_size, backup_context->get_upload_file_size());
}
}

_replica->_cold_backup_running_count.store(cold_backup_running_count);
_replica->_cold_backup_max_duration_time_ms.store(cold_backup_max_duration_time_ms);
_replica->_cold_backup_max_upload_file_size.store(cold_backup_max_upload_file_size);
}

void replica_backup_manager::background_clear_backup_checkpoint(const std::string &policy_name)
{
ddebug_replica("schedule to clear all checkpoint dirs of policy({}) after {} minutes",
Expand Down
7 changes: 6 additions & 1 deletion src/dist/replication/lib/backup/replica_backup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ namespace replication {
class replica_backup_manager : replica_base
{
public:
replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}
explicit replica_backup_manager(replica *r);
~replica_backup_manager();

void on_clear_cold_backup(const backup_clear_request &request);
void start_collect_backup_info();

private:
void clear_backup_checkpoint(const std::string &policy_name);
void send_clear_request_to_secondaries(const gpid &pid, const std::string &policy_name);
void background_clear_backup_checkpoint(const std::string &policy_name);
void collect_backup_info();

replica *_replica;
dsn::task_ptr _collect_info_timer;

friend class replica;
friend class replica_backup_manager_test;
Expand Down
7 changes: 1 addition & 6 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "dist/replication/lib/backup/replica_backup_manager.h"
#include "backup/replica_backup_manager.h"

#include <dsn/cpp/json_helper.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -368,11 +368,6 @@ void replica::close()
_checkpoint_timer = nullptr;
}

if (_collect_info_timer != nullptr) {
_collect_info_timer->cancel(true);
_collect_info_timer = nullptr;
}

if (_app != nullptr) {
_app->cancel_background_work(true);
}
Expand Down
3 changes: 0 additions & 3 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void set_backup_context_pause();
void clear_cold_backup_state();

void collect_backup_info();

/////////////////////////////////////////////////////////////////
// replica restore from backup
bool read_cold_backup_metadata(const std::string &file, cold_backup_metadata &backup_metadata);
Expand Down Expand Up @@ -477,7 +475,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
partition_split_context _split_states;

// timer task that running in replication-thread
dsn::task_ptr _collect_info_timer;
std::atomic<uint64_t> _cold_backup_running_count;
std::atomic<uint64_t> _cold_backup_max_duration_time_ms;
std::atomic<uint64_t> _cold_backup_max_upload_file_size;
Expand Down
37 changes: 0 additions & 37 deletions src/dist/replication/lib/replica_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,42 +703,5 @@ void replica::set_backup_context_pause()
}

void replica::clear_cold_backup_state() { _cold_backup_contexts.clear(); }

void replica::collect_backup_info()
{
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
if (!_cold_backup_contexts.empty()) {
for (const auto &p : _cold_backup_contexts) {
const cold_backup_context_ptr &backup_context = p.second;
cold_backup_status backup_status = backup_context->status();
if (status() == partition_status::type::PS_PRIMARY) {
if (backup_status != ColdBackupInvalid && backup_status != ColdBackupCompleted &&
backup_status != ColdBackupCanceled && backup_status != ColdBackupFailed) {
cold_backup_running_count++;
}
} else if (status() == partition_status::type::PS_SECONDARY) {
if (backup_status != ColdBackupInvalid && backup_status != ColdBackupFailed &&
backup_status != ColdBackupCanceled &&
backup_status != ColdBackupCheckpointed) {
// secondary end backup with status ColdBackupCheckpointed
cold_backup_running_count++;
}
}

if (backup_status == ColdBackupUploading) {
cold_backup_max_duration_time_ms =
std::max(cold_backup_max_duration_time_ms,
(dsn_now_ms() - backup_context->get_start_time_ms()));
cold_backup_max_upload_file_size = std::max(cold_backup_max_upload_file_size,
backup_context->get_upload_file_size());
}
}
}
_cold_backup_running_count.store(cold_backup_running_count);
_cold_backup_max_duration_time_ms.store(cold_backup_max_duration_time_ms);
_cold_backup_max_upload_file_size.store(cold_backup_max_upload_file_size);
}
}
} // namespace
10 changes: 2 additions & 8 deletions src/dist/replication/lib/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"
#include "backup/replica_backup_manager.h"
#include <dsn/utility/factory_store.h>
#include <dsn/utility/filesystem.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -351,14 +352,7 @@ error_code replica::init_app_and_prepare_list(bool create_new)
get_gpid().thread_hash());
}

if (_collect_info_timer == nullptr) {
_collect_info_timer =
tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
&_tracker,
[this]() { collect_backup_info(); },
std::chrono::milliseconds(_options->gc_interval_ms),
get_gpid().thread_hash());
}
_backup_mgr->start_collect_backup_info();
}
}

Expand Down