Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: initial support for parallel DDL #6955

Merged
merged 20 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (s *testColumnSuite) TearDownSuite(c *C) {
testleak.AfterTest(c)()
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
func buildCreateColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, It's also used in ddl_worker_test.go

pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
col := &model.ColumnInfo{
Name: model.NewCIStr(colName),
Offset: len(tblInfo.Columns),
Expand All @@ -79,22 +79,31 @@ func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col, pos, 0},
}
return job
}

func testCreateColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
colName string, pos *ast.ColumnPosition, defaultValue interface{}) *model.Job {
job := buildCreateColumnJob(dbInfo, tblInfo, colName, pos, defaultValue)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := &model.Job{
func buildDropColumnJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(colName)},
}
}

func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, colName string, isError bool) *model.Job {
job := buildDropColumnJob(dbInfo, tblInfo, colName)
err := d.doDDLJob(ctx, job)
if isError {
c.Assert(err, NotNil)
Expand Down
39 changes: 24 additions & 15 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type ddl struct {
quitCh chan struct{}

*ddlCtx
workers []*worker
workers map[workerType]*worker
}

// ddlCtx is the context when we use worker to handle DDL jobs.
Expand All @@ -221,7 +221,6 @@ type ddlCtx struct {
store kv.Storage
ownerManager owner.Manager
schemaSyncer SchemaSyncer
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
Expand Down Expand Up @@ -299,7 +298,6 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
uuid: id,
store: store,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move the ddlJobDoneCh to the different worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

ownerManager: manager,
schemaSyncer: syncer,
Expand Down Expand Up @@ -340,20 +338,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
err := d.ownerManager.CampaignOwner(ctx)
terror.Log(errors.Trace(err))

d.workers = make([]*worker, 1)
// TODO: Add addIdxWorker.
d.workers[0] = newWorker(generalWorker, 0, d.store, ctxPool)
d.workers = make(map[workerType]*worker, 2)
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker.id here doesn't seem to be particularly useful, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But it will be used in the future.

d.workers[addIdxWorker] = newWorker(addIdxWorker, 0, d.store, ctxPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker id is always 0 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker id is always 0 ?

for _, worker := range d.workers {
worker.wg.Add(1)
go worker.start(d.ddlCtx)
// TODO: Add the type of DDL worker.
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add worker type in the metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are a TODO: in line 357


// For every start, we will send a fake job to let worker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is For every start?

Copy link
Contributor Author

@zimulala zimulala Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's original comment. I think it means "For each call to the start function".

// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}
}

// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
asyncNotify(d.ddlJobCh)
}

func (d *ddl) close() {
Expand Down Expand Up @@ -401,11 +399,9 @@ func (d *ddl) genGlobalID() (int64, error) {

// generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to update the comment.

// It's used for testing.
// TODO: Remove this function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's only used in tests, and we can use d.workers[generalWorker] directly.

func (d *ddl) generalWorker() *worker {
if len(d.workers) == 0 {
return nil
}
return d.workers[0]
return d.workers[generalWorker]
}

// SchemaSyncer implements DDL.SchemaSyncer interface.
Expand All @@ -427,6 +423,19 @@ func checkJobMaxInterval(job *model.Job) time.Duration {
return 1 * time.Second
}

func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {
// If the workes don't run, we needn't to notice workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers or works?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notify is better than notice.

if !RunWorker {
return
}

if jobTp == model.ActionAddIndex {
asyncNotify(d.workers[addIdxWorker].ddlJobCh)
} else {
asyncNotify(d.workers[generalWorker].ddlJobCh)
}
}

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// For every DDL, we must commit current transaction.
if err := ctx.NewTxn(); err != nil {
Expand All @@ -440,7 +449,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}

// Notice worker that we push a new job and wait the job done.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Notice/Notify/g

asyncNotify(d.ddlJobCh)
d.asyncNotifyWorker(job.Type)
log.Infof("[ddl] start DDL job %s, Query:%s", job, job.Query)

var historyJob *model.Job
Expand Down
24 changes: 20 additions & 4 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJo
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := &model.Job{
func buildCreateIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
Expand All @@ -135,26 +135,42 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
Column: &ast.ColumnName{Name: model.NewCIStr(colName)},
Length: types.UnspecifiedLength}}},
}
}

func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := buildCreateIdxJob(dbInfo, tblInfo, unique, indexName, colName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := &model.Job{
func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropIndex,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{model.NewCIStr(indexName)},
}
}

func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
job := buildDropIdxJob(dbInfo, tblInfo, indexName)
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildRebaseAutoID(dbInfo *model.DBInfo, tblInfo *model.TableInfo, newBaseID int64) *model.Job {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildRebaseAutoIDJob

return &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionRebaseAutoID,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBaseID},
}
}
Loading