Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limits for add and recover balance data or zone balance jobs #4104

Merged
merged 10 commits into from
Apr 7, 2022
Merged
2 changes: 2 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not existed in chosen space!");
case nebula::cpp2::ErrorCode::E_JOB_MAYBE_RECOVER:
return Status::Error("There is not finished data balance or zone balance job!");
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down
2 changes: 2 additions & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
X(E_BALANCER_FAILURE, -2047) \
X(E_JOB_NOT_FINISHED, -2048) \
X(E_TASK_REPORT_OUT_DATE, -2049) \
X(E_JOB_NOT_IN_SPACE, -2050) \
X(E_JOB_MAYBE_RECOVER, -2051) \
X(E_INVALID_JOB, -2065) \
\
/* Backup Failure */ \
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ enum ErrorCode {
E_JOB_NOT_FINISHED = -2048,
E_TASK_REPORT_OUT_DATE = -2049,
E_JOB_NOT_IN_SPACE = -2050,
E_JOB_MAYBE_RECOVER = -2051,
E_INVALID_JOB = -2065,

// Backup Failure
Expand Down
11 changes: 9 additions & 2 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,19 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq

// Check if job not exists
JobID jId = 0;
auto jobExist = jobMgr_->checkJobExist(spaceId_, type, paras, jId);
if (jobExist) {
auto runningJobExist = jobMgr_->checkOnRunningJobExist(spaceId_, type, paras, jId);
if (runningJobExist) {
LOG(INFO) << "Job has already exists: " << jId;
result.job_id_ref() = jId;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) {
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
auto retCode = jobMgr_->checkNotFinishedJobExist(spaceId_, type);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "There are not finished data balance jobs or zone balance jobs.";
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
return retCode;
}
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto jobId = autoIncrementId();
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ class JobDescription {
cpp2::JobDesc toJobDesc();

bool operator==(const JobDescription& that) const {
return space_ == that.space_ && type_ == that.type_ && paras_ == that.paras_ &&
status_ == that.status_;
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
return space_ == that.space_ && type_ == that.type_ && paras_ == that.paras_;
}

bool operator!=(const JobDescription& that) const {
Expand Down
41 changes: 36 additions & 5 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,10 +571,10 @@ nebula::cpp2::ErrorCode JobManager::removeExpiredJobs(
return ret;
}

bool JobManager::checkJobExist(GraphSpaceID spaceId,
const cpp2::JobType& jobType,
const std::vector<std::string>& paras,
JobID& jobId) {
bool JobManager::checkOnRunningJobExist(GraphSpaceID spaceId,
const cpp2::JobType& jobType,
const std::vector<std::string>& paras,
JobID& jobId) {
JobDescription jobDesc(spaceId, 0, jobType, paras);
auto it = inFlightJobs_.begin();
while (it != inFlightJobs_.end()) {
Expand All @@ -587,6 +587,36 @@ bool JobManager::checkJobExist(GraphSpaceID spaceId,
return false;
}

nebula::cpp2::ErrorCode JobManager::checkNotFinishedJobExist(GraphSpaceID spaceId,
const cpp2::JobType& jobType) {
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
auto tup = MetaKeyUtils::parseJobVal(iter->val());
auto type = std::get<0>(tup);
auto status = std::get<2>(tup);
if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) {
if (status != cpp2::JobStatus::FINISHED && status != cpp2::JobStatus::INVALID) {
LOG(INFO) << "There are some data balance or zone balance jobs that did not complete "
"successfully";
return nebula::cpp2::ErrorCode::E_JOB_MAYBE_RECOVER;
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

ErrorOr<nebula::cpp2::ErrorCode, std::pair<cpp2::JobDesc, std::vector<cpp2::TaskDesc>>>
JobManager::showJob(GraphSpaceID spaceId, JobID jobId) {
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
Expand Down Expand Up @@ -684,7 +714,8 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
optJob.getStatus() == cpp2::JobStatus::STOPPED))) {
// Check if the job exists
JobID jId = 0;
auto jobExist = checkJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId);
auto jobExist =
checkOnRunningJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId);
if (!jobExist) {
auto jobId = optJob.getJobId();
enqueue(spaceId, jobId, JbOp::RECOVER, optJob.getJobType());
Expand Down
22 changes: 16 additions & 6 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,28 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
nebula::cpp2::ErrorCode addJob(JobDescription& jobDesc, AdminClient* client);

/**
* @brief The same job is in jobMap
* @brief The same job is queue or running status.
*
* @param spaceId
* @param type
* @param paras
* @param jobId If the job exists, jobId is the id of the existing job
* @return
* @return True if job exists.
*/
bool checkOnRunningJobExist(GraphSpaceID spaceId,
const cpp2::JobType& type,
const std::vector<std::string>& paras,
JobID& jobId);
/**
* @brief In the space, for jobs of DATA_BALANCE and ZONE_BALANCE,
* except for FINISHED and INVALID status, only one job of other status is allowed in the queue.
*
* @param spaceId
* @param jobType
* @return nebula::cpp2::ErrorCode
*/
bool checkJobExist(GraphSpaceID spaceId,
const cpp2::JobType& type,
const std::vector<std::string>& paras,
JobID& jobId);
nebula::cpp2::ErrorCode checkNotFinishedJobExist(GraphSpaceID spaceId,
const cpp2::JobType& jobType);

/**
* @brief Load all jobs of the space from kvStore and convert to cpp2::JobDesc
Expand Down
22 changes: 20 additions & 2 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ TEST_F(JobManagerTest, AddJob) {
JobDescription jobDesc(spaceId, jobId, cpp2::JobType::COMPACT);
auto rc = jobMgr->addJob(jobDesc, adminClient_.get());
ASSERT_EQ(rc, nebula::cpp2::ErrorCode::SUCCEEDED);

// If there is a failed data balance job, a new job cannot be added
JobID jobId3 = 3;
JobDescription jobDesc3(spaceId, jobId3, cpp2::JobType::DATA_BALANCE);
jobDesc3.setStatus(cpp2::JobStatus::FAILED);
auto jobKey = MetaKeyUtils::jobKey(jobDesc3.getSpace(), jobDesc3.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jobDesc3.getJobType(),
jobDesc3.getParas(),
jobDesc3.getStatus(),
jobDesc3.getStartTime(),
jobDesc3.getStopTime(),
jobDesc3.getErrorCode());
jobMgr->save(std::move(jobKey), std::move(jobVal));

rc = jobMgr->checkNotFinishedJobExist(spaceId, jobDesc3.getJobType());
ASSERT_EQ(rc, nebula::cpp2::ErrorCode::E_JOB_MAYBE_RECOVER);
}

TEST_F(JobManagerTest, AddRebuildTagIndexJob) {
Expand Down Expand Up @@ -287,7 +303,8 @@ TEST_F(JobManagerTest, JobDeduplication) {
JobID jobId3 = 17;
JobDescription jobDesc3(spaceId, jobId3, cpp2::JobType::LEADER_BALANCE);
JobID jId3 = 0;
auto jobExist = jobMgr->checkJobExist(spaceId, jobDesc3.getJobType(), jobDesc3.getParas(), jId3);
auto jobExist =
jobMgr->checkOnRunningJobExist(spaceId, jobDesc3.getJobType(), jobDesc3.getParas(), jId3);
if (!jobExist) {
auto rc3 = jobMgr->addJob(jobDesc3, adminClient_.get());
ASSERT_EQ(rc3, nebula::cpp2::ErrorCode::SUCCEEDED);
Expand All @@ -296,7 +313,8 @@ TEST_F(JobManagerTest, JobDeduplication) {
JobID jobId4 = 18;
JobDescription jobDesc4(spaceId, jobId4, cpp2::JobType::COMPACT);
JobID jId4 = 0;
jobExist = jobMgr->checkJobExist(spaceId, jobDesc4.getJobType(), jobDesc4.getParas(), jId4);
jobExist =
jobMgr->checkOnRunningJobExist(spaceId, jobDesc4.getJobType(), jobDesc4.getParas(), jId4);
if (!jobExist) {
auto rc4 = jobMgr->addJob(jobDesc4, adminClient_.get());
ASSERT_NE(rc4, nebula::cpp2::ErrorCode::SUCCEEDED);
Expand Down