Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: initial support for parallel DDL #6955

Merged
merged 20 commits into from
Jul 25, 2018
Merged

Conversation

zimulala
Copy link
Contributor

@zimulala zimulala commented Jul 2, 2018

What have you changed? (mandatory)

ddl: remove cleanAddIndexQueueJobs and initial support for parallel DDL.
Initial support for parallel DDL is as follows:

  • The DDL of "add index" and the other types of DDL can be executed parallelly when they are on the different tables. We use two queues to save the "add index" and other DDLs in storage. And we have two workers to handle these DDL jobs. The "add index" worker handles the "add index" queue. Another worker handles another queue.

  • If the DDL of "add index" and the other types of DDL are on the same table, we need to perform these two operations serially.

What are the type of the changes (mandatory)?

The currently defined types are listed below, please pick one of the types for this PR by removing the others:

  • Improvement

How has this PR been tested (mandatory)?

unit test

Does this PR affect documentation (docs/docs-cn) update? (optional)

Yes.

if err != nil {
return nil, errors.Trace(err)
}
return append(generalJobs, addIdxJobs...), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return jobs may be not sorted by job ID, should we return sorted jobs? Because the older function naturally return a sorted jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be done in the next PR. I added a "TODO" now.

ddl/ddl.go Outdated
for _, worker := range d.workers {
worker.wg.Add(1)
go worker.start(d.ddlCtx)
// TODO: Add the type of DDL worker.
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc()

// For every start, we will send a fake job to let worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is For every start?

Copy link
Contributor Author

@zimulala zimulala Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's original comment. I think it means "For each call to the start function".

ddl/ddl.go Outdated
@@ -427,6 +423,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
// If the workes don't run, we needn't to notice workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers or works?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notify is better than notice.

@@ -282,41 +293,56 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be multiple job DDL dependencies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only record the maximum job ID in multiple dependent jobs.

@zimulala
Copy link
Contributor Author

zimulala commented Jul 7, 2018

/run-all-tests

1 similar comment
@zimulala
Copy link
Contributor Author

zimulala commented Jul 9, 2018

/run-all-tests

@zimulala
Copy link
Contributor Author

zimulala commented Jul 9, 2018

/run-common-test
/run-integration-common-test

@zimulala
Copy link
Contributor Author

zimulala commented Jul 9, 2018

/run-common-test

@zimulala
Copy link
Contributor Author

ddl/ddl.go Outdated
@@ -427,6 +423,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
// If the workers don't run, we needn't to notice workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/notice/notify/

wg sync.WaitGroup
id int
tp workerType
ddlJobCh chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this used for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ddlJobCh used to be member of ddl, but now, we have 2 kinds of workers, we need two ddlJobCh for every kind of worker. So move ddlJobCh from ddl to worker

@@ -134,6 +134,22 @@ func GetDDLJobs(txn kv.Transaction) ([]*model.Job, error) {
return jobs, nil
}

// GetDDLJobs returns the DDL jobs and an error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not need to mention the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other functions also mention the error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment provides nothing more than the function name.

crazycs520
crazycs520 previously approved these changes Jul 12, 2018
@shenli shenli self-assigned this Jul 12, 2018
Copy link
Contributor

@ciscoxll ciscoxll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reset

@crazycs520 crazycs520 dismissed their stale review July 13, 2018 04:12

misoperation

ddl/ddl.go Outdated
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers = make(map[workerType]*worker, 2)
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers[addIdxWorker] = newWorker(addIdxWorker, 0, d.store, ctxPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker id is always 0 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker id is always 0 ?

@@ -61,6 +61,7 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP
worker := &worker{
id: id,
tp: tp,
ddlJobCh: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it need to be make(chan struct{}, 1 ) rather than make(chan struct{})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to push info to the channel.

for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%s to print time? what's the result looks like

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not print worker type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the old code, I will handle it.

@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) {
// buildJobDependence sets the curjob's dependency-ID.
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
switch curJob.Type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to understand here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also hard to understand for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think add comment helps @shenli
meta.Meta should not store the information about queue key.
If the caller has to modify status in meta.Meta, before calling its method, why not provide that status as argument?

t.SetJobListKey(meta.AddIndexJobListKey)
defer t.SetJobListKey(meta.DefaultJobListKey)
}

jobs, err := t.GetAllDDLJobs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest rename GetAllDDLJobs to GetDDLJobsInQueue and pass the queue, rather than change meta's jobListKey status, it's very tricky.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAllDDLJobs definitely get jobs from mDDLJobListKey? Is that correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mDDLJobListKey will change between two queue, I don't know which one, that's the problem.

So I suggest:

GetDDLJobsInQueue(general)
GetDDLJobsInQueue(addindex)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I changed the name of the function.

return true, nil
}

historyJob, err := t.GetHistoryDDLJob(job.DependencyID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will t.GetHistoryDDLJob select the right job queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to find the job in the waiting job list. Because the history job list maybe long.

@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) {
// buildJobDependence sets the curjob's dependency-ID.
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be read/write conflict in GetALLDDLJobs, because each worker check the other worker's queue to dependence.
Is the error retryable and properly handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be retried.

once := sync.Once{}
var checkErr error
tc.onJobRunBefore = func(job *model.Job) {
// TODO: extract a unified function for use by other tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/for use by other tests/for other tests

if lastJob != nil {
finishedJobs, err := m.GetAllHistoryDDLJobs()
c.Assert(err, IsNil)
// get the last 11 jobs completed。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find a strange char here

@@ -104,6 +104,9 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
errs[i] = errors.Trace(err)
continue
}
if job.Type == model.ActionAddIndex {
t.SetJobListKey(meta.AddIndexJobListKey)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an else branch so that the code is more robust without the assumption about job.Type default value.

ddl/ddl.go Outdated
@@ -440,7 +449,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}

// Notice worker that we push a new job and wait the job done.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Notice/Notify/g

ddl/ddl_test.go Outdated
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildRebaseAutoID(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildRebaseAutoIDJob

@@ -149,6 +146,15 @@ func asyncNotify(ch chan struct{}) {
// buildJobDependence sets the curjob's dependency-ID.
// The dependency-job's ID must less than the current job's ID, and we need the largest one in the list.
func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
switch curJob.Type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments.

@@ -162,6 +168,7 @@ func buildJobDependence(t *meta.Meta, curJob *model.Job) error {
return errors.Trace(err)
}
if isDependent {
log.Infof("[ddl] current DDL job %v is dependent job %v", curJob, job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "current DDL job %v depends on job %v"?

@@ -348,7 +374,8 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error {
return errors.Trace(w.handleUpdateJobError(t, job, err))
})

if runJobErr != nil {
waitDependencyJob := job != nil && job.DependencyID != 0
if runJobErr != nil || waitDependencyJob {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If waitDependencyJob is true, it is not an error.

Copy link
Contributor Author

@zimulala zimulala Jul 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we'd better wait a moment. I will add a comment for it.

@shenli
Copy link
Member

shenli commented Jul 16, 2018

/run-all-tests

@@ -306,7 +305,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
uuid: id,
store: store,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the ddlJobDoneCh to the different worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

if historyJob == nil {
return false, nil
}
log.Infof("[ddl] DDL job %v isn't dependent on job ID %d", job, job.DependencyID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDL job %v isn't dependent on job ID %d ? What about DDL job %v dependent job ID %d is finished ?

@@ -104,7 +104,11 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
errs[i] = errors.Trace(err)
continue
}
err = t.UpdateDDLJob(int64(j), job, true)
if job.Type == model.ActionAddIndex {
err = t.UpdateDDLJob(int64(j), job, true, meta.AddIndexJobListKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just new a meta with meta.AddIndexJobListKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a meta here, I think it's OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we can use a new meta to avoid add a param to the function.

ddl/ddl.go Outdated
@@ -413,11 +411,9 @@ func (d *ddl) genGlobalID() (int64, error) {

// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update the comment.

@zimulala
Copy link
Contributor Author

PTAL @winkyao @shenli

@shenli
Copy link
Member

shenli commented Jul 23, 2018

@zimulala Please resolve the conflicts.

@@ -282,41 +299,61 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) {
if job.DependencyID == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a constant for 0 or add comment for the if statement.

return true, nil
}

func newMetaWithQueueTp(txn kv.Transaction, tp string) *meta.Meta {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address it.

meta/meta.go Outdated
@@ -86,11 +86,20 @@ type Meta struct {
}

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
// We don't change the value of the jobListKey in a Meta.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not understand the comment.

for {
kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
// Get the number of jobs from the adding index queue.
addIdxLen, err1 := m.DDLJobQueueLen(meta.AddIndexJobListKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use GetDDLJobs and get the length of the return value?

@@ -175,14 +190,18 @@ func (d *ddl) addDDLJob(ctx sessionctx.Context, job *model.Job) error {
job.Version = currentVersion
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
t := newMetaWithQueueTp(txn, job.Type.String())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use meta.AddIndexJobListKey as the second parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job may not be add index, So has to according to the job type to create meta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost. I want to put the check of job type or worker type into newMetaWithQueueTp.

@@ -362,6 +401,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error {
// No job now, return and retry getting later.
return nil
}
w.waitDependencyJobFinished(job, &waitDependencyJobCnt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its dependencyJob is not done yet, it would return at line 357. So why we need to wait here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 357 return is in a txn func, not return in handleDDLJobQueue func

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @crazycs520 said. And if put it in line357, we need wait for 200ms. I am afraid this txn is easy to conflict. So I put it here.

@@ -86,11 +86,19 @@ type Meta struct {
}

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always specify the JobListKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of places use this function, so I use this method to handle it.
And In other packages, I think we needn't distinguish the type of jobListKeys.

Copy link
Member

@shenli shenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@shenli shenli added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Jul 24, 2018
Copy link
Contributor

@winkyao winkyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zimulala
Copy link
Contributor Author

/run-all-tests

@zimulala
Copy link
Contributor Author

/run-common-test
/run-integration-ddl-test

@zimulala
Copy link
Contributor Author

/run-common-test -tidb-test=pr/592
/run-integration-common-test -tidb-test=pr/592

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sig/sql-infra SIG: SQL Infra status/LGT2 Indicates that a PR has LGTM 2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants