Skip to content

Commit

Permalink
ddl: fix canceling add index and add column, port from #8171 (#8513)
Browse files Browse the repository at this point in the history
  • Loading branch information
winkyao authored Dec 13, 2018
1 parent 7a7bb0e commit ffc8f6c
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 68 deletions.
16 changes: 14 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,15 @@ func (d *ddl) createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnIn
return colInfo, position, nil
}

func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = d.onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -256,7 +264,11 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
Expand Down
55 changes: 55 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,61 @@ LOOP:
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
}

// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started.
func (s *testDBSuite) TestCancelAddIndex1(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
for i := 0; i < 50; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
s.dom.DDL().SetHook(hook)
rs, err := s.tk.Exec("alter table t add index idx_c2(c2)")
if rs != nil {
rs.Close()
}
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
s.dom.DDL().SetHook(&ddl.TestDDLCallback{})
t := s.testGetTable(c, "t")
for _, idx := range t.Indices() {
c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse)
}

s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,18 @@ func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, t
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddColumn,
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}
13 changes: 2 additions & 11 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,16 +348,7 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) {
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl] run the cancelling DDL job %s", job)
d.reorgCtx.notifyReorgCancel()
} else {
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
return convertJob2RollbackJob(d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down Expand Up @@ -581,7 +572,7 @@ func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error {
return errors.Trace(err)
}
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
_, err = d.convert2RollbackJob(m, job, tblInfo, indexInfo, nil)
_, err = convertAddIdxJob2RollbackJob(m, job, tblInfo, indexInfo, nil)
if err == nil {
_, err = m.DeQueueDDLJob()
}
Expand Down
115 changes: 88 additions & 27 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -439,10 +440,7 @@ func doDDLJobErr(c *C, schemaID, tableID int64, tp model.ActionType, args []inte

func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) error {
var checkErr error
addIndexFirstReorg := test.act == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
if test.cancelState == job.SchemaState {
if job.SchemaState == model.StateNone && job.State != model.JobStateDone {
// If the schema state is none, we only test the job is finished.
} else {
Expand All @@ -452,7 +450,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
return checkErr
}
// It only tests cancel one DDL job.
if errs[0] != test.cancelRetErrs[0] {
if !terror.ErrorEqual(errs[0], test.cancelRetErrs[0]) {
checkErr = errors.Trace(errs[0])
return checkErr
}
Expand All @@ -474,22 +472,51 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: errs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},

{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},

// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
}

return tests
}

func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64, idxName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, idxInfo := range changedTable.Meta().Indices {
if idxInfo.Name.O == idxName {
found = true
break
}
}
c.Assert(found, Equals, success)
}
func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -519,47 +546,60 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
var checkErr error
var test *testCancelJob
tc.onJobUpdated = func(job *model.Job) {
if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling {
return
}

if checkErr != nil {
return
}
hookCtx := mock.NewContext()
hookCtx.Store = store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
var err1 error
err1 = hookCtx.NewTxn()
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
checkErr = checkCancelState(hookCtx.Txn(true), job, test)
if checkErr != nil {
return
}
checkCancelState(hookCtx.Txn(true), job, test)
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
err1 = hookCtx.Txn(true).Commit(context.Background())
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
}
d.SetHook(tc)

// for adding index
test = &tests[0]
validArgs := []interface{}{false, model.NewCIStr("idx"),
idxOrigName := "idx"
validArgs := []interface{}{false, model.NewCIStr(idxOrigName),
[]*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: -1,
}}, nil}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[1]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[2]
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
Expand All @@ -576,10 +616,31 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for creating table
// for add column
test = &tests[8]
tblInfo = testTableInfo(c, d, "t1", 3)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
addingColName := "colA"
newColumnDef := &ast.ColumnDef{
Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)},
Tp: &types.FieldType{Tp: mysql.TypeLonglong},
Options: []*ast.ColumnOption{},
}
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef)
addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[9]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[10]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)
test = &tests[11]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
28 changes: 3 additions & 25 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
}
if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) {
log.Warnf("[ddl] run DDL job %v err %v, convert job to rollback job", job, err)
ver, err = d.convert2RollbackJob(t, job, tblInfo, indexInfo, err)
ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
d.reorgCtx.cleanNotifyReorgCancel()
Expand All @@ -300,29 +300,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
default:
err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
}

return ver, errors.Trace(err)
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) {
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
// The write reorganization state in add index job that likes write only state in drop index job.
// So the next state is delete only state.
indexInfo.State = model.StateDeleteOnly
originalState := indexInfo.State
job.SchemaState = model.StateDeleteOnly
ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State)
if err1 != nil {
return ver, errors.Trace(err1)
}

if kv.ErrKeyExists.Equal(err) {
return ver, kv.ErrKeyExists.Gen("Duplicate for key %s", indexInfo.Name.O)
err = ErrInvalidIndexState.Gen("invalid index state %v", indexInfo.State)
}

return ver, errors.Trace(err)
Expand Down Expand Up @@ -390,7 +368,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.Args = append(job.Args, indexInfo.ID)
}
default:
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
err = ErrInvalidTableState.Gen("invalid index state %v", indexInfo.State)
}
return ver, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit ffc8f6c

Please sign in to comment.