Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): sync bulk load from remote storage #553

Merged
merged 6 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 76 additions & 36 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string &
// ThreadPool: THREAD_POOL_META_SERVER
void bulk_load_service::initialize_bulk_load_service()
{
task_tracker tracker;
error_code err = ERR_OK;
_sync_bulk_load_storage =
make_unique<mss::meta_storage>(_meta_svc->get_remote_storage(), &_sync_tracker);

create_bulk_load_root_dir(err, tracker);
tracker.wait_outstanding_tasks();
create_bulk_load_root_dir();
_sync_tracker.wait_outstanding_tasks();

if (err == ERR_OK) {
try_to_continue_bulk_load();
}
try_to_continue_bulk_load();
}

// ThreadPool: THREAD_POOL_META_SERVER
Expand Down Expand Up @@ -487,7 +485,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons

// if replica report metadata, update metadata on remote storage
if (response.__isset.metadata && is_partition_metadata_not_updated(pid)) {
update_partition_metadata_on_remote_stroage(app_name, pid, response.metadata);
update_partition_metadata_on_remote_storage(app_name, pid, response.metadata);
}

// update download progress
Expand Down Expand Up @@ -744,7 +742,7 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_partition_metadata_on_remote_stroage(
void bulk_load_service::update_partition_metadata_on_remote_storage(
const std::string &app_name, const gpid &pid, const bulk_load_metadata &metadata)
{
zauto_read_lock l(_lock);
Expand Down Expand Up @@ -1215,39 +1213,81 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc)
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker)
// ThreadPool: THREAD_POOL_META_SERVER
void bulk_load_service::create_bulk_load_root_dir()
{
blob value = blob();
_meta_svc->get_remote_storage()->create_node(
_bulk_load_root,
LPC_META_CALLBACK,
[this, &err, &tracker](error_code ec) {
if (ERR_OK == ec || ERR_NODE_ALREADY_EXIST == ec) {
ddebug_f("create bulk load root({}) succeed", _bulk_load_root);
sync_apps_bulk_load_from_remote_stroage(err, tracker);
} else if (ERR_TIMEOUT == ec) {
dwarn_f("create bulk load root({}) failed, retry later", _bulk_load_root);
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::create_bulk_load_root_dir, this, err, tracker),
0,
std::chrono::seconds(1));
} else {
err = ec;
dfatal_f(
"create bulk load root({}) failed, error={}", _bulk_load_root, ec.to_string());
std::string path = _bulk_load_root;
_sync_bulk_load_storage->create_node(std::move(path), std::move(value), [this]() {
ddebug_f("create bulk load root({}) succeed", _bulk_load_root);
sync_apps_from_remote_storage();
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::sync_apps_from_remote_storage()
{
std::string path = _bulk_load_root;
_sync_bulk_load_storage->get_children(
std::move(path), [this](bool flag, const std::vector<std::string> &children) {
if (flag && children.size() > 0) {
ddebug_f("There are {} apps need to sync bulk load status", children.size());
for (const auto &elem : children) {
int32_t app_id = boost::lexical_cast<int32_t>(elem);
ddebug_f("start to sync app({}) bulk load status", app_id);
do_sync_app(app_id);
}
}
},
value,
&tracker);
});
}

void bulk_load_service::sync_apps_bulk_load_from_remote_stroage(error_code &err,
task_tracker &tracker)
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::do_sync_app(int32_t app_id)
{
// TODO(heyuchen): TBD
std::string app_path = get_app_bulk_load_path(app_id);
_sync_bulk_load_storage->get_data(std::move(app_path), [this, app_id](const blob &value) {
app_bulk_load_info ainfo;
dsn::json::json_forwarder<app_bulk_load_info>::decode(value, ainfo);
{
zauto_write_lock l(_lock);
_bulk_load_app_id.insert(app_id);
_app_bulk_load_info[app_id] = ainfo;
}
sync_partitions_from_remote_storage(ainfo.app_id, ainfo.app_name);
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::sync_partitions_from_remote_storage(int32_t app_id,
const std::string &app_name)
{
std::string app_path = get_app_bulk_load_path(app_id);
_sync_bulk_load_storage->get_children(
std::move(app_path),
[this, app_path, app_id, app_name](bool flag, const std::vector<std::string> &children) {
ddebug_f("app(name={},app_id={}) has {} partition bulk load info to be synced",
app_name,
app_id,
children.size());
for (const auto &child_pidx : children) {
int32_t pidx = boost::lexical_cast<int32_t>(child_pidx);
std::string partition_path = get_partition_bulk_load_path(app_path, pidx);
do_sync_partition(gpid(app_id, pidx), partition_path);
}
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::do_sync_partition(const gpid &pid, std::string &partition_path)
{
_sync_bulk_load_storage->get_data(std::move(partition_path), [this, pid](const blob &value) {
partition_bulk_load_info pinfo;
dsn::json::json_forwarder<partition_bulk_load_info>::decode(value, pinfo);
{
zauto_write_lock l(_lock);
_partition_bulk_load_info[pid] = pinfo;
}
});
}

void bulk_load_service::try_to_continue_bulk_load()
Expand Down
21 changes: 15 additions & 6 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class bulk_load_service

// Called by `handle_app_downloading`
// update partition bulk load metadata reported by replica server on remote storage
void update_partition_metadata_on_remote_stroage(const std::string &app_name,
void update_partition_metadata_on_remote_storage(const std::string &app_name,
const gpid &pid,
const bulk_load_metadata &metadata);

Expand Down Expand Up @@ -209,12 +209,18 @@ class bulk_load_service
/// sync bulk load states from remote storage
/// called when service initialized or meta server leader switch
///
void create_bulk_load_root_dir(error_code &err, task_tracker &tracker);
void create_bulk_load_root_dir();

void sync_apps_bulk_load_from_remote_stroage(error_code &err, task_tracker &tracker);
void sync_apps_from_remote_storage();

void do_sync_app(int32_t app_id);

void sync_partitions_from_remote_storage(int32_t app_id, const std::string &app_name);

void do_sync_partition(const gpid &pid, std::string &partition_path);

///
/// try to continue bulk load according to states from remote stroage
/// try to continue bulk load according to states from remote storage
/// called when service initialized or meta server leader switch
///
void try_to_continue_bulk_load();
Expand All @@ -233,7 +239,7 @@ class bulk_load_service
return oss.str();
}

// get app_bulk_load_info path on remote stroage
// get app_bulk_load_info path on remote storage
// <_bulk_load_root>/<app_id>
inline std::string get_app_bulk_load_path(int32_t app_id) const
{
Expand All @@ -242,7 +248,7 @@ class bulk_load_service
return oss.str();
}

// get partition_bulk_load_info path on remote stroage
// get partition_bulk_load_info path on remote storage
// <_bulk_load_root>/<app_id>/<partition_id>
inline std::string get_partition_bulk_load_path(const std::string &app_bulk_load_path,
int partition_id) const
Expand Down Expand Up @@ -312,6 +318,9 @@ class bulk_load_service
meta_service *_meta_svc;
server_state *_state;

std::unique_ptr<mss::meta_storage> _sync_bulk_load_storage;
task_tracker _sync_tracker;

zrwlock_nr &app_lock() const { return _state->_lock; }
zrwlock_nr _lock; // bulk load states lock

Expand Down
Loading