-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Job manager #1424
Add Job manager #1424
Conversation
Unit testing failed. |
2 similar comments
Unit testing failed. |
Unit testing failed. |
Unit testing passed. |
Unit testing passed. |
2 similar comments
Unit testing passed. |
Unit testing passed. |
Unit testing passed. |
1 similar comment
Unit testing passed. |
Please resolve conflicting . Thanks . |
Unit testing passed. |
1 similar comment
Unit testing passed. |
…2. handle error if read wrong input form kv store
src/interface/meta.thrift
Outdated
|
||
struct JobDetails { | ||
1: string id | ||
2: string typeAndParas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, why not use AdminJobOp
src/interface/meta.thrift
Outdated
} | ||
|
||
struct AdminJobResult { | ||
// used in "admin flush" and "admin compact" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobId only be used in "flush" and "compact" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this field used when one new job added.
other job types, which also need a job id in return will use other field.
for example: JobDesc::id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the comments
|
||
private: | ||
int32_t id_; | ||
std::string type_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not AdminJobOp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they talk about two different levels.
level 1 is : AdminJobOp which contains (add, show, stop ...)
level 2 is : flush and compact (and more if support in the future).
for example. The AdminJobOp of "compact" is "ADD" , and has two paras 1. compact and 2.
if (offset + sizeof(size_t) < rawKey.size()) { | ||
std::stringstream oss; | ||
oss << __func__ << ", offset=" << offset << ", rawKey.size()=" << rawKey.size(); | ||
throw std::range_error(oss.str().c_str()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you catch up the exceptions outside?
If there is no necessary, we don't use exception to pass errors to outside.
int32_t JobManager::recoverJob() { | ||
int32_t recoveredJobNum = 0; | ||
std::unique_ptr<kvstore::KVIterator> iter; | ||
kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check the returned code.
result.set_backupResult(backupRes); | ||
break; | ||
} | ||
case nebula::meta::cpp2::AdminJobOp::RECOVER: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So after FO, the users should call "recover jobs" by themself?
src/interface/meta.thrift
Outdated
|
||
struct JobDesc { | ||
1: i32 id | ||
2: string cmdAndParas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use two fields
1. string cmd
2 list<string> params
And add some comments for each field.
break; | ||
} | ||
} | ||
}).thenError([&](auto&& e){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space after ")"
LOG(ERROR) << "admin Failed: " << e.what(); | ||
successfully = false; | ||
}).wait(); | ||
LOG(INFO) << "admin tasks have finished"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more information about current job
} | ||
status_ = newStatus; | ||
if (newStatus == Status::RUNNING) { | ||
startTime_ = std::time(nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use WallClock::fastNowInSecondes()
} | ||
|
||
if (JobStatus::laterThan(newStatus, Status::RUNNING)) { | ||
stopTime_ = std::time(nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// skip expired job, default 1 week | ||
auto jobDesc = optJob->toJobDesc(); | ||
auto jobStart = jobDesc.get_startTime(); | ||
auto duration = std::difftime(std::time(nullptr), jobStart); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use fastNowInSeconds
…emove expired jobs 4. change some return type of JobManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done. The pr LGTM now.
It is a long journey, thanks for your effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean code!
relate to #762 |
# add job manager to support admin jobs. ## usage: ### submit job [flush/compact] add a"flush" job in the chosen space, return a job id. this job will run in async. use "show job <id>" to check the status there are two job type now (flush and compact) ### show jobs list all the not expired jobs* ### show job [id] show a job and all its tasks ### stop job [id] stop a job if not finished ### recover job after fail over, use this command to put all the "queue" back into queue again. ### Comments * Job: a user input command. Send from graph to Meta * Task. after a 'job' arrived to Meta. Meta will split the job to tasks, and send them to storage. * expired jobs default 1 week, controlled by meta FLAGs "job_expired_secs" Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
add job manager to support admin jobs.
usage:
submit job [flush/compact]
show jobs
show job [id]
stop job [id]
recover job
Comments
Job:
a user input command. Send from graph to Meta
Task.
after a 'job' arrived to Meta. Meta will split the job to tasks, and send them to storage.
expired jobs
default 1 week, controlled by meta FLAGs "job_expired_secs"