diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index cd7e5738885ae..878f853eb1df2 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -23,11 +23,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/admin" log "github.com/sirupsen/logrus" ) @@ -542,10 +545,30 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, job.Error = toTError(err) job.ErrorCount++ + // Load global ddl variables. + if err1 := loadDDLVars(w); err1 != nil { + log.Errorf("[ddl-%s] load ddl global variable error: %v", w, err1) + } + // Check error limit to avoid falling into an infinite loop. + if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) { + log.Warnf("[ddl-%s] the job id %v error count exceed the limit: %v, cancelling it now", w, job.ID, variable.GetDDLErrorCountLimit()) + job.State = model.JobStateCancelling + } } return } +func loadDDLVars(w *worker) error { + // Get sessionctx from context resource pool. + var ctx sessionctx.Context + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + return util.LoadDDLVars(ctx) +} + func toTError(err error) *terror.Error { originErr := errors.Cause(err) tErr, ok := originErr.(*terror.Error) diff --git a/ddl/serial_test.go b/ddl/serial_test.go index b7910aac3d254..4384a9a2491c0 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -458,3 +458,14 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) { tk.MustExec("insert into t_recover values (4),(5),(6)") tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6")) } + +func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) { + tk := testkit.NewTestKit(c, s.store) + gofail.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + _, err := tk.Exec("create table t (a int)") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job") +} diff --git a/ddl/table.go b/ddl/table.go index 49bdda0e53474..a1e99a50ca647 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -33,6 +33,11 @@ import ( ) func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + // gofail: var mockExceedErrorLimit bool + // if mockExceedErrorLimit { + // return ver, errors.New("mock do job error") + // } + schemaID := job.SchemaID tbInfo := &model.TableInfo{} if err := job.DecodeArgs(tbInfo); err != nil { diff --git a/ddl/util/util.go b/ddl/util/util.go index 2a85b95ba0077..b47a711fc170b 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -135,14 +135,30 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old return errors.Trace(err) } -const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" + - variable.TiDBDDLReorgWorkerCount + "', '" + - variable.TiDBDDLReorgBatchSize + "')" - // LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables. func LoadDDLReorgVars(ctx sessionctx.Context) error { + return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize}) +} + +// LoadDDLVars loads ddl variable from mysql.global_variables. +func LoadDDLVars(ctx sessionctx.Context) error { + return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit}) +} + +const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)" + +// LoadGlobalVars loads global variable from mysql.global_variables. +func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL) + nameList := "" + for i, name := range varNames { + if i > 0 { + nameList += ", " + } + nameList += fmt.Sprintf("'%s'", name) + } + sql := fmt.Sprintf(loadGlobalVarsSQL, nameList) + rows, _, err := sctx.ExecRestrictedSQL(ctx, sql) if err != nil { return errors.Trace(err) } diff --git a/executor/ddl_test.go b/executor/ddl_test.go index d3d2f49800259..8e57c4590566f 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -595,6 +595,33 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) { res.Check(testkit.Rows("1000")) } +func (s *testSuite3) TestSetDDLErrorCountLimit(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + err := ddlutil.LoadDDLVars(tk.Se) + c.Assert(err, IsNil) + c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(variable.DefTiDBDDLErrorCountLimit)) + + tk.MustExec("set @@global.tidb_ddl_error_count_limit = -1") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '-1'")) + err = ddlutil.LoadDDLVars(tk.Se) + c.Assert(err, IsNil) + c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(0)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_error_count_limit = %v", uint64(math.MaxInt64)+1)) + tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '%d'", uint64(math.MaxInt64)+1))) + err = ddlutil.LoadDDLVars(tk.Se) + c.Assert(err, IsNil) + c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(math.MaxInt64)) + _, err = tk.Exec("set @@global.tidb_ddl_error_count_limit = invalid_val") + c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) + tk.MustExec("set @@global.tidb_ddl_error_count_limit = 100") + err = ddlutil.LoadDDLVars(tk.Se) + c.Assert(err, IsNil) + c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(100)) + res := tk.MustQuery("select @@global.tidb_ddl_error_count_limit") + res.Check(testkit.Rows("100")) +} + // Test issue #9205, fix the precision problem for time type default values // See https://github.com/pingcap/tidb/issues/9205 for details func (s *testSuite3) TestIssue9205(c *C) { diff --git a/session/session.go b/session/session.go index 2174dd2f4b889..093aaece383f4 100644 --- a/session/session.go +++ b/session/session.go @@ -1484,6 +1484,7 @@ var builtinGlobalVariable = []string{ variable.TiDBConstraintCheckInPlace, variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, + variable.TiDBDDLErrorCountLimit, variable.TiDBOptInSubqToJoinAndAgg, variable.TiDBDistSQLScanConcurrency, variable.TiDBInitChunkSize, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d28d9d9cffae2..15f350d2160b6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -703,6 +703,8 @@ func SetLocalSystemVar(name string, val string) { SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) case TiDBDDLReorgBatchSize: SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) + case TiDBDDLErrorCountLimit: + SetDDLErrorCountLimit(tidbOptInt64(val, DefTiDBDDLErrorCountLimit)) } } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index aa6f39b226d78..fd454c1b52334 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -679,6 +679,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBConfig, ""}, {ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, {ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, + {ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, {ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 869189e6ee444..e0eeb79a3a5d4 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -213,6 +213,9 @@ const ( // tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers. TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size" + // tidb_ddl_error_count_limit defines the count of ddl error limit. + TiDBDDLErrorCountLimit = "tidb_ddl_error_count_limit" + // tidb_ddl_reorg_priority defines the operations priority of adding indices. // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" @@ -276,6 +279,7 @@ const ( DefTiDBOptimizerSelectivityLevel = 0 DefTiDBDDLReorgWorkerCount = 16 DefTiDBDDLReorgBatchSize = 1024 + DefTiDBDDLErrorCountLimit = 512 DefTiDBHashAggPartialConcurrency = 4 DefTiDBHashAggFinalConcurrency = 4 DefTiDBForcePriority = mysql.NoPriority @@ -289,6 +293,7 @@ var ( ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize + ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit // Export for testing. MaxDDLReorgBatchSize int32 = 10240 MinDDLReorgBatchSize int32 = 32 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 63b6cd29e04cf..7c0887577de54 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -63,6 +63,16 @@ func GetDDLReorgBatchSize() int32 { return atomic.LoadInt32(&ddlReorgBatchSize) } +// SetDDLErrorCountLimit sets ddlErrorCountlimit size. +func SetDDLErrorCountLimit(cnt int64) { + atomic.StoreInt64(&ddlErrorCountlimit, cnt) +} + +// GetDDLErrorCountLimit gets ddlErrorCountlimit size. +func GetDDLErrorCountLimit() int64 { + return atomic.LoadInt64(&ddlErrorCountlimit) +} + // GetSessionSystemVar gets a system variable. // If it is a session only variable, use the default value defined in code. // Returns error if there is no such variable. @@ -363,6 +373,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) case TiDBDDLReorgBatchSize: return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars) + case TiDBDDLErrorCountLimit: + return checkUInt64SystemVar(name, value, uint64(0), math.MaxInt64, vars) case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize, TiDBIndexLookupSize, TiDBHashJoinConcurrency, diff --git a/util/admin/admin.go b/util/admin/admin.go index c258c15a3337d..5bbd37d1ef029 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -83,19 +83,20 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) { return info, nil } -func isJobRollbackable(job *model.Job, id int64) error { +// IsJobRollbackable checks whether the job can be rollback. +func IsJobRollbackable(job *model.Job) bool { switch job.Type { case model.ActionDropIndex: // We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization, otherwise will cause inconsistent between record and index. if job.SchemaState == model.StateDeleteOnly || job.SchemaState == model.StateDeleteReorganization { - return ErrCannotCancelDDLJob.GenWithStackByArgs(id) + return false } case model.ActionDropSchema, model.ActionDropTable: // To simplify the rollback logic, cannot be canceled in the following states. if job.SchemaState == model.StateWriteOnly || job.SchemaState == model.StateDeleteOnly { - return ErrCannotCancelDDLJob.GenWithStackByArgs(id) + return false } case model.ActionDropColumn, model.ActionModifyColumn, model.ActionDropTablePartition, model.ActionAddTablePartition, @@ -103,10 +104,10 @@ func isJobRollbackable(job *model.Job, id int64) error { model.ActionTruncateTable, model.ActionAddForeignKey, model.ActionDropForeignKey: if job.SchemaState != model.StateNone { - return ErrCannotCancelDDLJob.GenWithStackByArgs(id) + return false } } - return nil + return true } // CancelJobs cancels the DDL jobs. @@ -139,8 +140,8 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() { continue } - errs[i] = isJobRollbackable(job, id) - if errs[i] != nil { + if !IsJobRollbackable(job) { + errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) continue }