Skip to content
This repository was archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): continue bulk load while meta server start part2 #563

Merged
merged 2 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1538,8 +1538,123 @@ void bulk_load_service::do_continue_app_bulk_load(
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set)
{
// TODO(heyuchen): TBD
// continue bulk load
const int32_t app_id = ainfo.app_id;
const int32_t partition_count = ainfo.partition_count;
const auto app_status = ainfo.status;
const int32_t different_count = different_status_pidx_set.size();
const int32_t same_count = pinfo_map.size() - different_count;
const int32_t invalid_count = partition_count - pinfo_map.size();

ddebug_f(
"app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} "
"partitions have bulk_load_info, {} partitions have same status with app, {} "
"partitions different",
ainfo.app_name,
app_id,
partition_count,
dsn::enum_to_string(app_status),
pinfo_map.size(),
same_count,
different_count);

int32_t in_progress_partition_count = partition_count;
if (app_status == bulk_load_status::BLS_DOWNLOADING) {
if (invalid_count > 0) {
// create missing partition, so the in_progress_count should be invalid_count
in_progress_partition_count = invalid_count;
} else if (different_count > 0) {
// it is hard to distinguish that bulk load is normal downloading or rollback to
// downloading before meta server crash, when app status is downloading, we consider
// bulk load is rollback to downloading for convenience, for partitions whose status is
// not downloading, update them to downloading, so the in_progress_count should be
// different_count
in_progress_partition_count = different_count;
}
} else if (app_status == bulk_load_status::BLS_DOWNLOADED ||
app_status == bulk_load_status::BLS_INGESTING ||
app_status == bulk_load_status::BLS_SUCCEED) {
// for app status is downloaded, when all partitions turn to ingesting, app partition will
// turn to ingesting, so the in_progress_count should be same_count, ingesting and succeed
// are same
in_progress_partition_count = same_count;
} // for other cases, in_progress_count should be partition_count
{
zauto_write_lock l(_lock);
_apps_in_progress_count[app_id] = in_progress_partition_count;
}

// if app is paused, no need to send bulk_load_request, just return
if (app_status == bulk_load_status::BLS_PAUSED) {
return;
}

// create all missing partitions then send request to all partitions
if (app_status == bulk_load_status::BLS_DOWNLOADING && invalid_count > 0) {
for (auto i = 0; i < partition_count; ++i) {
if (pinfo_map.find(i) == pinfo_map.end()) {
create_missing_partition_dir(ainfo.app_name, gpid(app_id, i), partition_count);
}
}
return;
}

// update all partition status to app_status
if ((app_status == bulk_load_status::BLS_FAILED ||
app_status == bulk_load_status::BLS_CANCELED ||
app_status == bulk_load_status::BLS_PAUSING ||
app_status == bulk_load_status::BLS_DOWNLOADING) &&
different_count > 0) {
for (auto pidx : different_status_pidx_set) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, pidx), app_status);
}
}

// send bulk_load_request to all partitions
for (auto i = 0; i < partition_count; ++i) {
gpid pid = gpid(app_id, i);
partition_bulk_load(ainfo.app_name, pid);
if (app_status == bulk_load_status::BLS_INGESTING) {
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid));
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count)
{
partition_bulk_load_info pinfo;
pinfo.status = bulk_load_status::BLS_DOWNLOADING;
blob value = dsn::json::json_forwarder<partition_bulk_load_info>::encode(pinfo);

_meta_svc->get_meta_storage()->create_node(
get_partition_bulk_load_path(pid),
std::move(value),
[app_name, pid, partition_count, pinfo, this]() {
const int32_t app_id = pid.get_app_id();
bool send_request = false;
ddebug_f("app({}) create partition({}) bulk_load_info", app_name, pid);
{
zauto_write_lock l(_lock);
_partition_bulk_load_info[pid] = pinfo;

if (--_apps_in_progress_count[app_id] == 0) {
_apps_in_progress_count[app_id] = partition_count;
send_request = true;
}
}
if (send_request) {
ddebug_f("app({}) start to bulk load", app_name);
for (auto i = 0; i < partition_count; ++i) {
partition_bulk_load(app_name, gpid(app_id, i));
}
}
});
}

} // namespace replication
Expand Down
7 changes: 7 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ class bulk_load_service
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set);

// called by `do_continue_app_bulk_load`
// only used when app status is downloading and some partition bulk load info not existed on
// remote storage
void create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count);

///
/// helper functions
///
Expand Down
Loading