From ab802601825eabafc3978a179386046e3eebbf6a Mon Sep 17 00:00:00 2001 From: ciscoxll Date: Wed, 20 Feb 2019 17:55:21 +0800 Subject: [PATCH] fix cancel add/drop partitioned table ddl job --- ddl/db_test.go | 88 +++++++++++++++++++++++++++++++++++++++++++++ ddl/rollingback.go | 32 +++++++++++++++++ ddl/table.go | 39 ++++++++++++++++++++ util/admin/admin.go | 4 +++ 4 files changed, 163 insertions(+) diff --git a/ddl/db_test.go b/ddl/db_test.go index 9adeba750916d..4abf60028dd3d 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -890,6 +890,94 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) { c.Assert(handles, HasLen, 0, Commentf("take time %v", time.Since(startTime))) } +// TestCancelAddTableAndDropTablePartition tests cancel ddl job which type is add/drop table partition. +func (s *testDBSuite) TestCancelAddTableAndDropTablePartition(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "create database if not exists test_partition_table") + s.mustExec(c, "use test_partition_table") + s.mustExec(c, "drop table if exists t_part") + s.mustExec(c, `create table t_part (a int key) + partition by range(a) ( + partition p0 values less than (10), + partition p1 values less than (20) + );`) + defer s.mustExec(c, "drop table t_part;") + for i := 0; i < 10; i++ { + s.mustExec(c, "insert into t_part values (?)", i) + } + + testCases := []struct { + action model.ActionType + jobState model.JobState + JobSchemaState model.SchemaState + cancelSucc bool + }{ + {model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true}, + {model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true}, + {model.ActionAddTablePartition, model.JobStateRunning, model.StatePublic, false}, + {model.ActionDropTablePartition, model.JobStateRunning, model.StatePublic, false}, + } + var checkErr error + hook := &ddl.TestDDLCallback{} + testCase := &testCases[0] + var jobID int64 + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == testCase.action && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + jobIDs := []int64{job.ID} + jobID = job.ID + hookCtx := mock.NewContext() + hookCtx.Store = s.store + err := hookCtx.NewTxn() + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = txn.Commit(context.Background()) + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + var err error + sql := "" + for i := range testCases { + testCase = &testCases[i] + if testCase.action == model.ActionAddTablePartition { + sql = `alter table t_part add partition ( + partition p2 values less than (30) + );` + } else if testCase.action == model.ActionDropTablePartition { + sql = "alter table t_part drop partition p1;" + } + _, err = s.tk.Exec(sql) + if testCase.cancelSucc { + c.Assert(checkErr, IsNil) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") + s.mustExec(c, "insert into t_part values (?)", i) + } else { + c.Assert(err, IsNil) + c.Assert(checkErr, NotNil) + c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) + _, err = s.tk.Exec("insert into t_part values (?)", i) + c.Assert(err, NotNil) + } + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{}) +} + // TestCancelDropColumn tests cancel ddl job which type is drop column. func (s *testDBSuite) TestCancelDropColumn(c *C) { s.tk = testkit.NewTestKit(c, s.store) diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 4492ea2d11654..887d7a6f2bdcd 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -75,6 +75,18 @@ func convertNotStartAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, occuredE return convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, occuredErr) } +func cancelOnlyNotHandledJob(job *model.Job) (ver int64, err error) { + // We can only cancel the not handled job. + if job.SchemaState == model.StateNone { + job.State = model.JobStateCancelled + return ver, errCancelledDDLJob + } + + job.State = model.JobStateRunning + + return ver, nil +} + func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { job.State = model.JobStateRollingback col := &model.ColumnInfo{} @@ -131,6 +143,14 @@ func rollingbackAddindex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve return } +func rollingbackAddTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) { + _, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + return cancelOnlyNotHandledJob(job) +} + func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { tblInfo, err := getTableInfo(t, job, job.SchemaID) if err != nil { @@ -163,14 +183,26 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) return ver, errors.Trace(errCancelledDDLJob) } +func rollingbackDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, err error) { + _, err = getTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + return cancelOnlyNotHandledJob(job) +} + func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { switch job.Type { case model.ActionAddColumn: ver, err = rollingbackAddColumn(t, job) case model.ActionAddIndex: ver, err = rollingbackAddindex(w, d, t, job) + case model.ActionAddTablePartition: + ver, err = rollingbackAddTablePartition(t, job) case model.ActionDropColumn: ver, err = rollingbackDropColumn(t, job) + case model.ActionDropTablePartition: + ver, err = rollingbackDropTablePartition(t, job) default: job.State = model.JobStateCancelled err = errCancelledDDLJob diff --git a/ddl/table.go b/ddl/table.go index 6879ef554d166..c990862395c3d 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -159,6 +159,45 @@ func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table return tbl, errors.Trace(err) } +func getTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { + tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, schemaID) + if err != nil { + return nil, errors.Trace(err) + } + + if tblInfo.State != model.StatePublic { + job.State = model.JobStateCancelled + return nil, ErrInvalidTableState.GenWithStack("table %s is not in public, but %s", tblInfo.Name, tblInfo.State) + } + + return tblInfo, nil +} + +func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { + tableID := job.TableID + // Check this table's database. + tblInfo, err := t.GetTable(schemaID, tableID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + job.State = model.JobStateCancelled + return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", schemaID), + )) + } + return nil, errors.Trace(err) + } + + // Check the table. + if tblInfo == nil { + job.State = model.JobStateCancelled + return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs( + fmt.Sprintf("(Schema ID %d)", schemaID), + fmt.Sprintf("(Table ID %d)", tableID), + )) + } + return tblInfo, nil +} + func getTableInfo(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) { tableID := job.TableID tblInfo, err := t.GetTable(schemaID, tableID) diff --git a/util/admin/admin.go b/util/admin/admin.go index eed8a58d7c88c..d8f8b0bafcb54 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -89,6 +89,10 @@ func isJobRollbackable(job *model.Job, id int64) error { if job.SchemaState != model.StateNone { return ErrCannotCancelDDLJob.GenWithStackByArgs(id) } + case model.ActionDropTablePartition, model.ActionAddTablePartition: + if job.SchemaState != model.StateNone { + return ErrCannotCancelDDLJob.GenWithStackByArgs(id) + } } return nil }