Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Job manager #1424

Merged
merged 28 commits into from
Feb 14, 2020
Merged

Add Job manager #1424

merged 28 commits into from
Feb 14, 2020

Conversation

liuyu85cn
Copy link
Contributor

@liuyu85cn liuyu85cn commented Dec 11, 2019

add job manager to support admin jobs.

usage:

submit job [flush/compact]

    add a"flush" job in the chosen space, return a job id.
    this job will run in async.
    use "show job <id>" to check the status 
    there are two job type now (flush and compact)

show jobs

    list all the not expired jobs*

show job [id]

    show a job and all its tasks

stop job [id]

    stop a job if not finished

recover job

    after fail over, use this command to put all the "queue" back into queue again.

Comments

  • Job:
    a user input command. Send from graph to Meta

  • Task.
    after a 'job' arrived to Meta. Meta will split the job to tasks, and send them to storage.

  • expired jobs
    default 1 week, controlled by meta FLAGs "job_expired_secs"

@nebula-community-bot
Copy link
Member

Unit testing failed.

2 similar comments
@nebula-community-bot
Copy link
Member

Unit testing failed.

@nebula-community-bot
Copy link
Member

Unit testing failed.

@nebula-community-bot
Copy link
Member

Unit testing passed.

src/meta/JobStatus.cpp Outdated Show resolved Hide resolved
src/meta/JobStatus.h Outdated Show resolved Hide resolved
src/meta/SimpleBlockingQueue.h Outdated Show resolved Hide resolved
src/meta/KVJobManager.cpp Outdated Show resolved Hide resolved
src/meta/KVJobManager.cpp Outdated Show resolved Hide resolved
src/meta/KVJobManager.cpp Outdated Show resolved Hide resolved
src/meta/KVJobManager.h Outdated Show resolved Hide resolved
src/graph/AdminExecutor.cpp Outdated Show resolved Hide resolved
@nebula-community-bot
Copy link
Member

Unit testing passed.

2 similar comments
@nebula-community-bot
Copy link
Member

Unit testing passed.

@nebula-community-bot
Copy link
Member

Unit testing passed.

@nebula-community-bot
Copy link
Member

Unit testing passed.

1 similar comment
@nebula-community-bot
Copy link
Member

Unit testing passed.

src/common/http/HttpClient.cpp Show resolved Hide resolved
src/daemons/MetaDaemon.cpp Outdated Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Outdated Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Outdated Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Outdated Show resolved Hide resolved
src/graph/AdminJobExecutor.cpp Outdated Show resolved Hide resolved
src/meta/client/MetaClient.cpp Outdated Show resolved Hide resolved
src/graph/AdminJobExecutor.h Outdated Show resolved Hide resolved
@bright-starry-sky
Copy link
Contributor

Please resolve conflicting . Thanks .

@nebula-community-bot
Copy link
Member

Unit testing passed.

1 similar comment
@nebula-community-bot
Copy link
Member

Unit testing passed.


struct JobDetails {
1: string id
2: string typeAndParas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, why not use AdminJobOp

}

struct AdminJobResult {
// used in "admin flush" and "admin compact"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jobId only be used in "flush" and "compact" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this field used when one new job added.
other job types, which also need a job id in return will use other field.
for example: JobDesc::id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comments


private:
int32_t id_;
std::string type_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not AdminJobOp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they talk about two different levels.
level 1 is : AdminJobOp which contains (add, show, stop ...)
level 2 is : flush and compact (and more if support in the future).

for example. The AdminJobOp of "compact" is "ADD" , and has two paras 1. compact and 2.

if (offset + sizeof(size_t) < rawKey.size()) {
std::stringstream oss;
oss << __func__ << ", offset=" << offset << ", rawKey.size()=" << rawKey.size();
throw std::range_error(oss.str().c_str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you catch up the exceptions outside?
If there is no necessary, we don't use exception to pass errors to outside.

int32_t JobManager::recoverJob() {
int32_t recoveredJobNum = 0;
std::unique_ptr<kvstore::KVIterator> iter;
kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check the returned code.

result.set_backupResult(backupRes);
break;
}
case nebula::meta::cpp2::AdminJobOp::RECOVER:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after FO, the users should call "recover jobs" by themself?


struct JobDesc {
1: i32 id
2: string cmdAndParas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use two fields

1.  string cmd
2   list<string> params

And add some comments for each field.

break;
}
}
}).thenError([&](auto&& e){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space after ")"

LOG(ERROR) << "admin Failed: " << e.what();
successfully = false;
}).wait();
LOG(INFO) << "admin tasks have finished";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add more information about current job

}
status_ = newStatus;
if (newStatus == Status::RUNNING) {
startTime_ = std::time(nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use WallClock::fastNowInSecondes()

}

if (JobStatus::laterThan(newStatus, Status::RUNNING)) {
stopTime_ = std::time(nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// skip expired job, default 1 week
auto jobDesc = optJob->toJobDesc();
auto jobStart = jobDesc.get_startTime();
auto duration = std::difftime(std::time(nullptr), jobStart);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use fastNowInSeconds

Copy link
Contributor

@dangleptr dangleptr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done. The pr LGTM now.
It is a long journey, thanks for your effort.

@liuyu85cn liuyu85cn requested a review from dangleptr February 12, 2020 03:59
Copy link
Contributor

@critical27 critical27 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean code!

@dangleptr dangleptr merged commit 6ba0dc2 into vesoft-inc:master Feb 14, 2020
@jude-zhu
Copy link
Contributor

relate to #762

tong-hao pushed a commit to tong-hao/nebula that referenced this pull request Jun 1, 2021
# add job manager to support admin jobs.
## usage:

### submit job [flush/compact]
        add a"flush" job in the chosen space, return a job id.
        this job will run in async.
        use "show job <id>" to check the status 
        there are two job type now (flush and compact)

### show jobs
        list all the not expired jobs*

### show job [id]
        show a job and all its tasks

### stop job [id]
        stop a job if not finished

### recover job
        after fail over, use this command to put all the "queue" back into queue again.

### Comments
* Job:
   a user input command. Send from graph to Meta

* Task.
   after a 'job' arrived to Meta. Meta will split the job to tasks, and send them to storage.
  
* expired jobs 
   default 1 week, controlled by meta FLAGs "job_expired_secs"

Co-authored-by: dangleptr <37216992+dangleptr@users.noreply.github.com>
yixinglu pushed a commit to yixinglu/nebula that referenced this pull request Jan 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ready-for-testing PR: ready for the CI test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants