Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk-load): continue bulk load while meta server start part2 (#563)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Jul 17, 2020
1 parent f13589e commit 0c424fb
Show file tree
Hide file tree
Showing 3 changed files with 388 additions and 80 deletions.
123 changes: 121 additions & 2 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1538,8 +1538,127 @@ void bulk_load_service::do_continue_app_bulk_load(
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set)
{
// TODO(heyuchen): TBD
// continue bulk load
const int32_t app_id = ainfo.app_id;
const int32_t partition_count = ainfo.partition_count;
const auto app_status = ainfo.status;
const int32_t different_count = different_status_pidx_set.size();
const int32_t same_count = pinfo_map.size() - different_count;
const int32_t invalid_count = partition_count - pinfo_map.size();

ddebug_f(
"app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} "
"partitions have bulk_load_info, {} partitions have same status with app, {} "
"partitions different",
ainfo.app_name,
app_id,
partition_count,
dsn::enum_to_string(app_status),
pinfo_map.size(),
same_count,
different_count);

// _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0
// means app bulk load status can transfer to next stage, for example, when app status is
// downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting
// see more in function `update_partition_status_on_remote_storage_reply`
int32_t in_progress_partition_count = partition_count;
if (app_status == bulk_load_status::BLS_DOWNLOADING) {
if (invalid_count > 0) {
// create missing partition, so the in_progress_count should be invalid_count
in_progress_partition_count = invalid_count;
} else if (different_count > 0) {
// it is hard to distinguish that bulk load is normal downloading or rollback to
// downloading before meta server crash, when app status is downloading, we consider
// bulk load as rolling back to downloading for convenience, for partitions whose status
// is not downloading, update them to downloading, so the in_progress_count should be
// different_count
in_progress_partition_count = different_count;
}
} else if (app_status == bulk_load_status::BLS_DOWNLOADED ||
app_status == bulk_load_status::BLS_INGESTING ||
app_status == bulk_load_status::BLS_SUCCEED) {
// for app status is downloaded, when all partitions turn to ingesting, app partition will
// turn to ingesting, so the in_progress_count should be same_count, ingesting and succeed
// are same
in_progress_partition_count = same_count;
} // for other cases, in_progress_count should be partition_count
{
zauto_write_lock l(_lock);
_apps_in_progress_count[app_id] = in_progress_partition_count;
}

// if app is paused, no need to send bulk_load_request, just return
if (app_status == bulk_load_status::BLS_PAUSED) {
return;
}

// create all missing partitions then send request to all partitions
if (app_status == bulk_load_status::BLS_DOWNLOADING && invalid_count > 0) {
for (auto i = 0; i < partition_count; ++i) {
if (pinfo_map.find(i) == pinfo_map.end()) {
create_missing_partition_dir(ainfo.app_name, gpid(app_id, i), partition_count);
}
}
return;
}

// update all partition status to app_status
if ((app_status == bulk_load_status::BLS_FAILED ||
app_status == bulk_load_status::BLS_CANCELED ||
app_status == bulk_load_status::BLS_PAUSING ||
app_status == bulk_load_status::BLS_DOWNLOADING) &&
different_count > 0) {
for (auto pidx : different_status_pidx_set) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, pidx), app_status);
}
}

// send bulk_load_request to all partitions
for (auto i = 0; i < partition_count; ++i) {
gpid pid = gpid(app_id, i);
partition_bulk_load(ainfo.app_name, pid);
if (app_status == bulk_load_status::BLS_INGESTING) {
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid));
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count)
{
partition_bulk_load_info pinfo;
pinfo.status = bulk_load_status::BLS_DOWNLOADING;
blob value = dsn::json::json_forwarder<partition_bulk_load_info>::encode(pinfo);

_meta_svc->get_meta_storage()->create_node(
get_partition_bulk_load_path(pid),
std::move(value),
[app_name, pid, partition_count, pinfo, this]() {
const int32_t app_id = pid.get_app_id();
bool send_request = false;
ddebug_f("app({}) create partition({}) bulk_load_info", app_name, pid);
{
zauto_write_lock l(_lock);
_partition_bulk_load_info[pid] = pinfo;

if (--_apps_in_progress_count[app_id] == 0) {
_apps_in_progress_count[app_id] = partition_count;
send_request = true;
}
}
if (send_request) {
ddebug_f("app({}) start to bulk load", app_name);
for (auto i = 0; i < partition_count; ++i) {
partition_bulk_load(app_name, gpid(app_id, i));
}
}
});
}

} // namespace replication
Expand Down
7 changes: 7 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ class bulk_load_service
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set);

// called by `do_continue_app_bulk_load`
// only used when app status is downloading and some partition bulk load info not existed on
// remote storage
void create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count);

///
/// helper functions
///
Expand Down
Loading

0 comments on commit 0c424fb

Please sign in to comment.