diff --git a/ddl/column.go b/ddl/column.go index 81a8f0bc400b4..53704c3770742 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -125,7 +125,15 @@ func (d *ddl) createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnIn return colInfo, position, nil } -func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { +func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { + // Handle the rolling back job. + if job.IsRollingback() { + ver, err = d.onDropColumn(t, job) + if err != nil { + return ver, errors.Trace(err) + } + return ver, nil + } schemaID := job.SchemaID tblInfo, err := getTableInfo(t, job, schemaID) if err != nil { @@ -256,7 +264,11 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { } // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + if job.IsRollingback() { + job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) + } else { + job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) + } default: err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) } diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index c62b59e8dd5cc..882fd468cee34 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -442,6 +442,61 @@ LOOP: ddl.ReorgWaitTimeout = oldReorgWaitTimeout } +// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. +func (s *testDBSuite) TestCancelAddIndex1(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "use test_db") + s.mustExec(c, "drop table if exists t") + s.mustExec(c, "create table t(c1 int, c2 int)") + defer s.mustExec(c, "drop table t;") + for i := 0; i < 50; i++ { + s.mustExec(c, "insert into t values (?, ?)", i, i) + } + var checkErr error + oldReorgWaitTimeout := ddl.ReorgWaitTimeout + ddl.ReorgWaitTimeout = 50 * time.Millisecond + defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }() + hook := &ddl.TestDDLCallback{} + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 { + jobIDs := []int64{job.ID} + hookCtx := mock.NewContext() + hookCtx.Store = s.store + err := hookCtx.NewTxn() + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = hookCtx.Txn(true).Commit(context.Background()) + } + } + s.dom.DDL().SetHook(hook) + rs, err := s.tk.Exec("alter table t add index idx_c2(c2)") + if rs != nil { + rs.Close() + } + c.Assert(checkErr, IsNil) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") + s.dom.DDL().SetHook(&ddl.TestDDLCallback{}) + t := s.testGetTable(c, "t") + for _, idx := range t.Indices() { + c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse) + } + + s.mustExec(c, "alter table t add index idx_c2(c2)") + s.mustExec(c, "alter table t drop index idx_c2") +} + func (s *testDBSuite) TestAddAnonymousIndex(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index fadc496e1c9bc..16bb6dcc12afe 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -145,3 +145,18 @@ func testDropIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, t checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) return job } + +func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job { + job := &model.Job{ + SchemaID: dbInfo.ID, + TableID: tblInfo.ID, + Type: model.ActionAddColumn, + Args: args, + BinlogInfo: &model.HistoryInfo{}, + } + err := d.doDDLJob(ctx, job) + c.Assert(err, IsNil) + v := getSchemaVer(c, ctx) + checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo}) + return job +} diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 1e0f0faf583f1..2a5ee07d2fdaf 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -348,16 +348,7 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) { } // The cause of this job state is that the job is cancelled by client. if job.IsCancelling() { - // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. - if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { - log.Infof("[ddl] run the cancelling DDL job %s", job) - d.reorgCtx.notifyReorgCancel() - } else { - job.State = model.JobStateCancelled - job.Error = errCancelledDDLJob - job.ErrorCount++ - return - } + return convertJob2RollbackJob(d, t, job) } if !job.IsRollingback() && !job.IsCancelling() { @@ -581,7 +572,7 @@ func (d *ddl) cleanAddIndexQueueJobs(txn kv.Transaction) error { return errors.Trace(err) } indexInfo := findIndexByName(indexName.L, tblInfo.Indices) - _, err = d.convert2RollbackJob(m, job, tblInfo, indexInfo, nil) + _, err = convertAddIdxJob2RollbackJob(m, job, tblInfo, indexInfo, nil) if err == nil { _, err = m.DeQueueDDLJob() } diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 946702f1509d0..7a98f0b700312 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/mock" @@ -439,10 +440,7 @@ func doDDLJobErr(c *C, schemaID, tableID int64, tp model.ActionType, args []inte func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) error { var checkErr error - addIndexFirstReorg := test.act == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 - // If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes. - // When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes. - if test.cancelState == job.SchemaState && !addIndexFirstReorg { + if test.cancelState == job.SchemaState { if job.SchemaState == model.StateNone && job.State != model.JobStateDone { // If the schema state is none, we only test the job is finished. } else { @@ -452,7 +450,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e return checkErr } // It only tests cancel one DDL job. - if errs[0] != test.cancelRetErrs[0] { + if !terror.ErrorEqual(errs[0], test.cancelRetErrs[0]) { checkErr = errors.Trace(errs[0]) return checkErr } @@ -474,22 +472,51 @@ func buildCancelJobTests(firstID int64) []testCancelJob { errs := []error{err} noErrs := []error{nil} tests := []testCancelJob{ - {act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err}, - {act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err}, - {act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: errs, cancelState: model.StateWriteReorganization, ddlRetErr: err}, - {act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err}, + {act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err}, + {act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err}, + {act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err}, + {act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err}, + // TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change. {act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err}, {act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err}, {act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err}, - {act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err}, - - {act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err}, + {act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err}, + + // TODO: add create table back after we fix the cancel bug. + //{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err}, + {act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err}, + {act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err}, + {act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err}, + {act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err}, } return tests } +func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64, idxName string, success bool) { + changedTable := testGetTable(c, d, schemaID, tableID) + var found bool + for _, idxInfo := range changedTable.Meta().Indices { + if idxInfo.Name.O == idxName { + found = true + break + } + } + c.Assert(found, Equals, success) +} +func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) { + changedTable := testGetTable(c, d, schemaID, tableID) + var found bool + for _, colInfo := range changedTable.Meta().Columns { + if colInfo.Name.O == colName { + found = true + break + } + } + c.Assert(found, Equals, success) +} + func (s *testDDLSuite) TestCancelJob(c *C) { store := testCreateStore(c, "test_cancel_job") defer store.Close() @@ -519,21 +546,29 @@ func (s *testDDLSuite) TestCancelJob(c *C) { var checkErr error var test *testCancelJob tc.onJobUpdated = func(job *model.Job) { + if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling { + return + } + if checkErr != nil { return } hookCtx := mock.NewContext() hookCtx.Store = store - var err error - err = hookCtx.NewTxn() - if err != nil { - checkErr = errors.Trace(err) + var err1 error + err1 = hookCtx.NewTxn() + if err1 != nil { + checkErr = errors.Trace(err1) + return + } + checkErr = checkCancelState(hookCtx.Txn(true), job, test) + if checkErr != nil { return } checkCancelState(hookCtx.Txn(true), job, test) - err = hookCtx.Txn(true).Commit(context.Background()) - if err != nil { - checkErr = errors.Trace(err) + err1 = hookCtx.Txn(true).Commit(context.Background()) + if err1 != nil { + checkErr = errors.Trace(err1) return } } @@ -541,25 +576,30 @@ func (s *testDDLSuite) TestCancelJob(c *C) { // for adding index test = &tests[0] - validArgs := []interface{}{false, model.NewCIStr("idx"), + idxOrigName := "idx" + validArgs := []interface{}{false, model.NewCIStr(idxOrigName), []*ast.IndexColName{{ Column: &ast.ColumnName{Name: model.NewCIStr("c1")}, Length: -1, }}, nil} - doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState) + // When the job satisfies this test case, the option will be rollback, so the job's schema state is none. + cancelState := model.StateNone + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState) c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false) test = &tests[1] - doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState) + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState) c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false) test = &tests[2] - // When the job satisfies this test case, the option will be rollback, so the job's schema state is none. - cancelState := model.StateNone doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState) c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false) test = &tests[3] testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil) + s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true) // for dropping index idxName := []interface{}{model.NewCIStr("idx")} @@ -576,10 +616,31 @@ func (s *testDDLSuite) TestCancelJob(c *C) { testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx") c.Check(errors.ErrorStack(checkErr), Equals, "") - // for creating table + // for add column test = &tests[8] - tblInfo = testTableInfo(c, d, "t1", 3) - testCreateTable(c, ctx, d, dbInfo, tblInfo) + addingColName := "colA" + newColumnDef := &ast.ColumnDef{ + Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)}, + Tp: &types.FieldType{Tp: mysql.TypeLonglong}, + Options: []*ast.ColumnOption{}, + } + col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef) + addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0} + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false) + test = &tests[9] + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false) + test = &tests[10] + doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false) + test = &tests[11] + testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs) + c.Check(errors.ErrorStack(checkErr), Equals, "") + s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true) } func (s *testDDLSuite) TestIgnorableSpec(c *C) { diff --git a/ddl/index.go b/ddl/index.go index b54c40342af70..6c406912d614d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -281,7 +281,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error) } if kv.ErrKeyExists.Equal(err) || errCancelledDDLJob.Equal(err) { log.Warnf("[ddl] run DDL job %v err %v, convert job to rollback job", job, err) - ver, err = d.convert2RollbackJob(t, job, tblInfo, indexInfo, err) + ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err) } // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. d.reorgCtx.cleanNotifyReorgCancel() @@ -300,29 +300,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error) // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) default: - err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State) - } - - return ver, errors.Trace(err) -} - -func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) { - job.State = model.JobStateRollingback - job.Args = []interface{}{indexInfo.Name} - // If add index job rollbacks in write reorganization state, its need to delete all keys which has been added. - // Its work is the same as drop index job do. - // The write reorganization state in add index job that likes write only state in drop index job. - // So the next state is delete only state. - indexInfo.State = model.StateDeleteOnly - originalState := indexInfo.State - job.SchemaState = model.StateDeleteOnly - ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State) - if err1 != nil { - return ver, errors.Trace(err1) - } - - if kv.ErrKeyExists.Equal(err) { - return ver, kv.ErrKeyExists.Gen("Duplicate for key %s", indexInfo.Name.O) + err = ErrInvalidIndexState.Gen("invalid index state %v", indexInfo.State) } return ver, errors.Trace(err) @@ -390,7 +368,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { job.Args = append(job.Args, indexInfo.ID) } default: - err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) + err = ErrInvalidTableState.Gen("invalid index state %v", indexInfo.State) } return ver, errors.Trace(err) } diff --git a/ddl/rollingback.go b/ddl/rollingback.go new file mode 100644 index 0000000000000..a7727ef54b597 --- /dev/null +++ b/ddl/rollingback.go @@ -0,0 +1,142 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "github.com/juju/errors" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/model" + log "github.com/sirupsen/logrus" +) + +func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) { + job.State = model.JobStateRollingback + // the second args will be used in onDropIndex. + job.Args = []interface{}{indexInfo.Name} + // If add index job rollbacks in write reorganization state, its need to delete all keys which has been added. + // Its work is the same as drop index job do. + // The write reorganization state in add index job that likes write only state in drop index job. + // So the next state is delete only state. + originalState := indexInfo.State + indexInfo.State = model.StateDeleteOnly + job.SchemaState = model.StateDeleteOnly + ver, err1 := updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State) + if err1 != nil { + return ver, errors.Trace(err1) + } + if kv.ErrKeyExists.Equal(err) { + return ver, kv.ErrKeyExists.Gen("Duplicate for key %s", indexInfo.Name.O) + } + return ver, errors.Trace(err) +} + +// convertNotStartAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, +// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. +func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) { + schemaID := job.SchemaID + tblInfo, err := getTableInfo(t, job, schemaID) + if err != nil { + return ver, errors.Trace(err) + } + var ( + unique bool + indexName model.CIStr + idxColNames []*ast.IndexColName + indexOption *ast.IndexOption + ) + err = job.DecodeArgs(&unique, &indexName, &idxColNames, &indexOption) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + indexInfo := findIndexByName(indexName.L, tblInfo.Indices) + if indexInfo == nil { + job.State = model.JobStateCancelled + return ver, errCancelledDDLJob + } + return convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, occuredErr) +} +func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { + job.State = model.JobStateRollingback + col := &model.ColumnInfo{} + pos := &ast.ColumnPosition{} + offset := 0 + err = job.DecodeArgs(col, pos, &offset) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + schemaID := job.SchemaID + tblInfo, err := getTableInfo(t, job, schemaID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L) + if columnInfo == nil { + job.State = model.JobStateCancelled + return ver, errCancelledDDLJob + } + if columnInfo.State == model.StatePublic { + // We already have a column with the same column name. + job.State = model.JobStateCancelled + return ver, infoschema.ErrColumnExists.GenByArgs(col.Name) + } + originalState := columnInfo.State + columnInfo.State = model.StateDeleteOnly + job.SchemaState = model.StateDeleteOnly + job.Args = []interface{}{col.Name} + ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfo.State) + if err != nil { + return ver, errors.Trace(err) + } + return ver, errCancelledDDLJob +} +func rollingbackAddindex(d *ddl, t *meta.Meta, job *model.Job) (ver int64, err error) { + // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. + if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { + // add index workers are started. need to ask them to exit. + log.Infof("[ddl] run the cancelling DDL job %s", job) + d.reorgCtx.notifyReorgCancel() + ver, err = d.onCreateIndex(t, job) + } else { + // add index workers are not started, remove the indexInfo in tableInfo. + ver, err = convertNotStartAddIdxJob2RollbackJob(t, job, errCancelledDDLJob) + } + return +} +func convertJob2RollbackJob(d *ddl, t *meta.Meta, job *model.Job) (ver int64, err error) { + switch job.Type { + case model.ActionAddColumn: + ver, err = rollingbackAddColumn(t, job) + case model.ActionAddIndex: + ver, err = rollingbackAddindex(d, t, job) + default: + job.State = model.JobStateCancelled + err = errCancelledDDLJob + } + if err != nil { + if job.State != model.JobStateRollingback && job.State != model.JobStateCancelled { + log.Errorf("[ddl] run DDL job err %v", errors.ErrorStack(err)) + } else { + log.Infof("[ddl] the DDL job is normal to cancel because %v", err) + } + job.Error = toTError(err) + job.ErrorCount++ + } + return +} diff --git a/executor/executor_test.go b/executor/executor_test.go index 0ecb04b386f3d..6c124a53cb045 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -145,7 +145,7 @@ func (s *testSuite) TestAdmin(c *C) { row := chk.GetRow(0) c.Assert(row.Len(), Equals, 2) c.Assert(row.GetString(0), Equals, "1") - c.Assert(row.GetString(1), Equals, "error: Can't find this job") + c.Assert(row.GetString(1), Equals, "error: [admin:4]DDL Job:1 not found") r, err = tk.Exec("admin show ddl") c.Assert(err, IsNil) diff --git a/util/admin/admin.go b/util/admin/admin.go index e9660c23866f5..0e1363e509c89 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -89,7 +89,7 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { found = true // These states can't be cancelled. if job.IsDone() || job.IsSynced() { - errs[i] = errors.New("This job is finished, so can't be cancelled") + errs[i] = ErrCancelFinishedDDLJob.GenByArgs(id) continue } // If the state is rolling back, it means the work is cleaning the data after cancelling the job. @@ -109,7 +109,7 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { } } if !found { - errs[i] = errors.New("Can't find this job") + errs[i] = ErrDDLJobNotFound.GenByArgs(id) } } return errs, nil @@ -601,10 +601,16 @@ const ( codeDataNotEqual terror.ErrCode = 1 codeRepeatHandle = 2 codeInvalidColumnState = 3 + codeDDLJobNotFound = 4 + codeCancelFinishedJob = 5 ) var ( errDateNotEqual = terror.ClassAdmin.New(codeDataNotEqual, "data isn't equal") errRepeatHandle = terror.ClassAdmin.New(codeRepeatHandle, "handle is repeated") errInvalidColumnState = terror.ClassAdmin.New(codeInvalidColumnState, "invalid column state") + // ErrDDLJobNotFound indicates the job id was not found. + ErrDDLJobNotFound = terror.ClassAdmin.New(codeDDLJobNotFound, "DDL Job:%v not found") + // ErrCancelFinishedDDLJob returns when cancel a finished ddl job. + ErrCancelFinishedDDLJob = terror.ClassAdmin.New(codeCancelFinishedJob, "This job:%v is finished, so can't be cancelled") )