Skip to content

Commit

Permalink
modify jobmanager ut (#5175)
Browse files Browse the repository at this point in the history
* modify jobmanager ut

* add expired ut

* avoid recover expired job

* add ut

* address review

* move status

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
SuperYoko and Sophie-Xie authored Jan 6, 2023
1 parent 9a7d2d8 commit 8b3c832
Show file tree
Hide file tree
Showing 7 changed files with 1,210 additions and 56 deletions.
6 changes: 6 additions & 0 deletions src/meta/processors/job/JobDescription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <stdexcept>

#include "common/utils/MetaKeyUtils.h"
#include "interface/gen-cpp2/meta_types.h"
#include "kvstore/KVIterator.h"
#include "meta/processors/Common.h"

Expand Down Expand Up @@ -83,6 +84,11 @@ cpp2::JobDesc JobDescription::toJobDesc() {
}

bool JobDescription::setStatus(Status newStatus, bool force) {
if (JobStatus::notSetable(status_)) {
// no need to change time.
return status_ == newStatus;
}

if (JobStatus::laterThan(status_, newStatus) && !force) {
return false;
}
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/job/JobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace meta {
class JobExecutor {
public:
JobExecutor() = default;
explicit JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {}
JobExecutor(kvstore::KVStore* kv, GraphSpaceID space) : kvstore_(kv), space_(space) {}
virtual ~JobExecutor() = default;

/**
Expand Down Expand Up @@ -99,9 +99,10 @@ class JobExecutor {

class JobExecutorFactory {
public:
static std::unique_ptr<JobExecutor> createJobExecutor(const JobDescription& jd,
kvstore::KVStore* store,
AdminClient* client);
virtual ~JobExecutorFactory() = default;
virtual std::unique_ptr<JobExecutor> createJobExecutor(const JobDescription& jd,
kvstore::KVStore* store,
AdminClient* client);
};

} // namespace meta
Expand Down
31 changes: 21 additions & 10 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <boost/stacktrace.hpp>
#include <memory>

#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
Expand Down Expand Up @@ -42,7 +43,10 @@ JobManager* JobManager::getInstance() {
return &inst;
}

bool JobManager::init(nebula::kvstore::KVStore* store, AdminClient* adminClient) {
bool JobManager::init(nebula::kvstore::KVStore* store,
AdminClient* adminClient,
std::shared_ptr<JobExecutorFactory> factory) {
executorFactory_ = factory;
adminClient_ = adminClient;
if (store == nullptr) {
return false;
Expand Down Expand Up @@ -97,7 +101,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(optJob, kvStore_, adminClient_);
executorFactory_->createJobExecutor(optJob, kvStore_, adminClient_);
// Only balance would change
if (optJob.getStatus() == cpp2::JobStatus::RUNNING && je->isMetaJob()) {
jds.emplace_back(std::move(optJob));
Expand Down Expand Up @@ -235,7 +239,7 @@ folly::Future<nebula::cpp2::ErrorCode> JobManager::runJobInternal(const JobDescr
iter = this->muJobFinished_.emplace(spaceId, std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(iter->second));
auto je = JobExecutorFactory::createJobExecutor(jobDesc, kvStore_, adminClient_);
auto je = executorFactory_->createJobExecutor(jobDesc, kvStore_, adminClient_);
jobExec = je.get();

runningJobs_.emplace(jobDesc.getJobId(), std::move(je));
Expand Down Expand Up @@ -440,7 +444,7 @@ nebula::cpp2::ErrorCode JobManager::saveTaskStatus(TaskDescription& td,
}

auto optJobDesc = nebula::value(optJobDescRet);
auto jobExec = JobExecutorFactory::createJobExecutor(optJobDesc, kvStore_, adminClient_);
auto jobExec = executorFactory_->createJobExecutor(optJobDesc, kvStore_, adminClient_);

if (!jobExec) {
LOG(INFO) << folly::sformat("createJobExecutor failed(), jobId={}", jobId);
Expand Down Expand Up @@ -682,6 +686,11 @@ bool JobManager::isExpiredJob(JobDescription& jobDesc) {
return false;
}
auto jobStart = jobDesc.getStartTime();
if (jobStart == 0) {
// should not happend, but just in case keep this job
LOG(INFO) << "Job " << jobDesc.getJobId() << " start time is not set, keep it for now";
return false;
}
auto duration = std::difftime(nebula::time::WallClock::fastNowInSec(), jobStart);
return duration > FLAGS_job_expired_secs;
}
Expand Down Expand Up @@ -848,7 +857,9 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
for (auto& [id, job] : allJobs) {
auto status = job.getStatus();
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) {
jobsMaybeRecover.emplace(id);
if (!isExpiredJob(job)) {
jobsMaybeRecover.emplace(id);
}
}
}
std::set<JobID>::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend();
Expand All @@ -869,7 +880,8 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
JobID jid;
bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid);
if (!jobExist) {
job.setStatus(cpp2::JobStatus::QUEUE, true);
job.setStatus(cpp2::JobStatus::QUEUE, true); // which cause the job execute again
job.setErrorCode(nebula::cpp2::ErrorCode::E_JOB_SUBMITTED);
auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId);
auto jobVal = MetaKeyUtils::jobVal(job.getJobType(),
job.getParas(),
Expand Down Expand Up @@ -919,9 +931,8 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
if (jobIdSet.empty() || jobIdSet.count(*it)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " when there's a newer balance job " << *lastBalaceJobRecoverIt
<< " stopped or failed";
LOG(INFO) << "can't recover a balance job " << *it << " when there's a newer balance job "
<< *lastBalaceJobRecoverIt << " stopped or failed";
}
it = jobsMaybeRecover.erase(it);
} else {
Expand All @@ -931,7 +942,7 @@ ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) {
if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " that before a finished balance job " << lastBalanceJobFinished;
<< " when there's a newer balance job " << lastBalanceJobFinished << " finished";
}
jobsMaybeRecover.erase(*lastBalaceJobRecoverIt);
}
Expand Down
7 changes: 6 additions & 1 deletion src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
#include <gtest/gtest_prod.h>

#include <boost/core/noncopyable.hpp>
#include <memory>

#include "common/base/Base.h"
#include "common/base/ErrorOr.h"
#include "common/stats/StatsManager.h"
#include "interface/gen-cpp2/meta_types.h"
#include "kvstore/NebulaStore.h"
#include "meta/processors/job/JobDescription.h"
#include "meta/processors/job/JobExecutor.h"
#include "meta/processors/job/JobStatus.h"
#include "meta/processors/job/StorageJobExecutor.h"
#include "meta/processors/job/TaskDescription.h"
Expand Down Expand Up @@ -80,7 +82,9 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
* @param adminClient
* @return true if the init is successful
*/
bool init(nebula::kvstore::KVStore* store, AdminClient* adminClient);
bool init(nebula::kvstore::KVStore* store,
AdminClient* adminClient,
std::shared_ptr<JobExecutorFactory> factory = std::make_shared<JobExecutorFactory>());

/**
* @brief Called when receive a system signal
Expand Down Expand Up @@ -331,6 +335,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
std::atomic<JbmgrStatus> status_ = JbmgrStatus::NOT_START;

std::unique_ptr<folly::Executor> executor_;
std::shared_ptr<JobExecutorFactory> executorFactory_;
};

} // namespace meta
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/job/JobStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ bool JobStatus::laterThan(Status lhs, Status rhs) {
return phaseNumber(lhs) > phaseNumber(rhs);
}

bool JobStatus::notSetable(Status st) {
return st == Status::FINISHED;
}

std::string JobStatus::toString(Status st) {
switch (st) {
case Status::QUEUE:
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/JobStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <string>
#include <vector>

#include "common/base/Status.h"
#include "interface/gen-cpp2/meta_types.h"

namespace nebula {
Expand All @@ -20,6 +21,7 @@ class JobStatus {
public:
static std::string toString(Status st);
static bool laterThan(Status lhs, Status rhs);
static bool notSetable(Status st);

private:
static int phaseNumber(Status st);
Expand Down
Loading

0 comments on commit 8b3c832

Please sign in to comment.