Skip to content

Commit

Permalink
recover job
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Apr 6, 2022
1 parent 8ca6583 commit 1228f9b
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::JobOp::RECOVER: {
// Note that the last parameter is no longer spaceId
// Note that the last parameter is no longer spaceName
std::vector<int32_t> jobIds;
jobIds.reserve(paras.size());
for (size_t i = 0; i < paras.size(); i++) {
Expand Down
95 changes: 94 additions & 1 deletion src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
} else {
std::vector<std::string> jobKeys;
jobKeys.reserve(jobIds.size());
std::vector<std::pair<std::string, std::string>> totalJobKVs;
for (int jobId : jobIds) {
jobKeys.emplace_back(MetaKeyUtils::jobKey(spaceId, jobId));
}
Expand All @@ -699,7 +700,99 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
return retCode.first;
}
for (size_t i = 0; i < jobKeys.size(); i++) {
jobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i]));
totalJobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i]));
}

// For DATA_BALANCE and ZONE_BALANCE, jobs with status stopped or failed
// If only one stopped jobId is specified, there will not be finished job or failed jobId after
// the job. If multiple jobIds are specified, only one jobId will be recovered. The failed jobid
// will be executed first. Otherwise recover the first stop jobId
std::unordered_map<cpp2::JobType, std::tuple<JobID, int64_t, cpp2::JobStatus>> dupResult;
std::unordered_map<JobID, std::pair<std::string, std::string>> dupkeyVal;

for (auto& jobkv : totalJobKVs) {
auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second);
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
auto jobStatus = optJob.getStatus();
auto jobId = optJob.getJobId();
auto jobType = optJob.getJobType();
auto jobStartTime = optJob.getStartTime();
if (jobStatus != cpp2::JobStatus::QUEUE && jobStatus != cpp2::JobStatus::FAILED &&
jobStatus != cpp2::JobStatus::STOPPED) {
continue;
}

// handle DATA_BALANCE and ZONE_BALANCE
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
// queue and failed, will not exist at the same time
if (jobStatus == cpp2::JobStatus::FAILED || jobStatus == cpp2::JobStatus::QUEUE) {
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
continue;
}

// current recover job status is stopped
// Prioritize the recovery of not stopped jobs
auto findJobIter = dupResult.find(jobType);
if (findJobIter != dupResult.end()) {
continue;
}

// For a stopped job, check whether there is a finished or failed job after it.
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto code = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(code);
return code;
}

bool findRest = false;
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}

// eliminate oneself
auto keyPair = MetaKeyUtils::parseJobKey(iter->key());
auto destJobId = keyPair.second;
if (destJobId == jobId) {
continue;
}
auto tup = MetaKeyUtils::parseJobVal(iter->val());
auto destJobType = std::get<0>(tup);
auto destJobStatus = std::get<2>(tup);
auto destJobStartTime = std::get<3>(tup);
if (jobType == destJobType) {
// There is a specific type of failed job that does not allow recovery for the type of
// stopped job
if (destJobStatus == cpp2::JobStatus::FAILED) {
LOG(ERROR) << "There is a specific type of failed job that does not allow recovery "
"for the type of stopped job";
findRest = true;
break;
} else if (destJobStatus == cpp2::JobStatus::FINISHED) {
// Compare the start time of the job
if (destJobStartTime > jobStartTime) {
findRest = true;
break;
}
}
}
}
if (!findRest) {
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
}
} else {
jobKVs.emplace_back(std::make_pair(jobkv.first, jobkv.second));
}
}
}
for (auto& key : dupResult) {
auto jId = std::get<0>(key.second);
jobKVs.emplace_back(dupkeyVal[jId]);
}
}

Expand Down

0 comments on commit 1228f9b

Please sign in to comment.