Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): continue bulk load while meta server start part2 #563

Merged
merged 2 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 121 additions & 2 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1538,8 +1538,127 @@ void bulk_load_service::do_continue_app_bulk_load(
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set)
{
// TODO(heyuchen): TBD
// continue bulk load
const int32_t app_id = ainfo.app_id;
const int32_t partition_count = ainfo.partition_count;
const auto app_status = ainfo.status;
const int32_t different_count = different_status_pidx_set.size();
const int32_t same_count = pinfo_map.size() - different_count;
const int32_t invalid_count = partition_count - pinfo_map.size();

ddebug_f(
"app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} "
"partitions have bulk_load_info, {} partitions have same status with app, {} "
"partitions different",
ainfo.app_name,
app_id,
partition_count,
dsn::enum_to_string(app_status),
pinfo_map.size(),
same_count,
different_count);

// _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0
// means app bulk load status can transfer to next stage, for example, when app status is
// downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting
// see more in function `update_partition_status_on_remote_storage_reply`
int32_t in_progress_partition_count = partition_count;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
if (app_status == bulk_load_status::BLS_DOWNLOADING) {
if (invalid_count > 0) {
// create missing partition, so the in_progress_count should be invalid_count
in_progress_partition_count = invalid_count;
} else if (different_count > 0) {
// it is hard to distinguish that bulk load is normal downloading or rollback to
// downloading before meta server crash, when app status is downloading, we consider
// bulk load as rolling back to downloading for convenience, for partitions whose status
// is not downloading, update them to downloading, so the in_progress_count should be
// different_count
in_progress_partition_count = different_count;
}
} else if (app_status == bulk_load_status::BLS_DOWNLOADED ||
app_status == bulk_load_status::BLS_INGESTING ||
app_status == bulk_load_status::BLS_SUCCEED) {
// for app status is downloaded, when all partitions turn to ingesting, app partition will
// turn to ingesting, so the in_progress_count should be same_count, ingesting and succeed
// are same
in_progress_partition_count = same_count;
} // for other cases, in_progress_count should be partition_count
{
zauto_write_lock l(_lock);
_apps_in_progress_count[app_id] = in_progress_partition_count;
}

// if app is paused, no need to send bulk_load_request, just return
if (app_status == bulk_load_status::BLS_PAUSED) {
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// create all missing partitions then send request to all partitions
if (app_status == bulk_load_status::BLS_DOWNLOADING && invalid_count > 0) {
for (auto i = 0; i < partition_count; ++i) {
if (pinfo_map.find(i) == pinfo_map.end()) {
create_missing_partition_dir(ainfo.app_name, gpid(app_id, i), partition_count);
}
}
return;
}

// update all partition status to app_status
if ((app_status == bulk_load_status::BLS_FAILED ||
app_status == bulk_load_status::BLS_CANCELED ||
app_status == bulk_load_status::BLS_PAUSING ||
app_status == bulk_load_status::BLS_DOWNLOADING) &&
different_count > 0) {
for (auto pidx : different_status_pidx_set) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, pidx), app_status);
}
}

// send bulk_load_request to all partitions
for (auto i = 0; i < partition_count; ++i) {
gpid pid = gpid(app_id, i);
partition_bulk_load(ainfo.app_name, pid);
if (app_status == bulk_load_status::BLS_INGESTING) {
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid));
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count)
{
partition_bulk_load_info pinfo;
pinfo.status = bulk_load_status::BLS_DOWNLOADING;
blob value = dsn::json::json_forwarder<partition_bulk_load_info>::encode(pinfo);

_meta_svc->get_meta_storage()->create_node(
get_partition_bulk_load_path(pid),
std::move(value),
[app_name, pid, partition_count, pinfo, this]() {
const int32_t app_id = pid.get_app_id();
bool send_request = false;
ddebug_f("app({}) create partition({}) bulk_load_info", app_name, pid);
{
zauto_write_lock l(_lock);
_partition_bulk_load_info[pid] = pinfo;

if (--_apps_in_progress_count[app_id] == 0) {
_apps_in_progress_count[app_id] = partition_count;
send_request = true;
}
}
if (send_request) {
ddebug_f("app({}) start to bulk load", app_name);
for (auto i = 0; i < partition_count; ++i) {
partition_bulk_load(app_name, gpid(app_id, i));
}
}
});
}

} // namespace replication
Expand Down
7 changes: 7 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ class bulk_load_service
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set);

// called by `do_continue_app_bulk_load`
// only used when app status is downloading and some partition bulk load info not existed on
// remote storage
void create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count);

///
/// helper functions
///
Expand Down
Loading