diff --git a/pkg/ddl/db_integration_test.go b/pkg/ddl/db_integration_test.go index 6821d74e62956..91808b4fee99f 100644 --- a/pkg/ddl/db_integration_test.go +++ b/pkg/ddl/db_integration_test.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/collate" contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" @@ -2944,10 +2945,11 @@ func TestDDLLastInfo(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec(`use test;`) - tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows("\"\" 0")) + lastDDLSQL := "select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')" + tk.MustQuery(lastDDLSQL).Check(testkit.Rows("\"\" 0")) tk.MustExec("create table t(a int)") firstSequence := 0 - res := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')") + res := tk.MustQuery(lastDDLSQL) require.Len(t, res.Rows(), 1) require.Equal(t, "\"create table t(a int)\"", res.Rows()[0][0]) var err error @@ -2957,10 +2959,22 @@ func TestDDLLastInfo(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk2.MustExec(`use test;`) tk.MustExec("create table t2(a int)") - tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1))) + tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1))) tk.MustExec("drop table t, t2") - tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3))) + tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3))) + + // owner change, sequence will be reset + ch := make(chan struct{}) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterSchedulerClose", func() { + close(ch) + }) + dom, err := session.GetDomain(store) + require.NoError(t, err) + require.NoError(t, dom.DDL().OwnerManager().ResignOwner(context.Background())) + <-ch + tk.MustExec("create table t(a int)") + tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf(`"create table t(a int)" %d`, 1))) } func TestDefaultCollationForUTF8MB4(t *testing.T) { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index fc2d2bcb22039..700158de475d9 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -428,12 +428,6 @@ type ddlCtx struct { hook Callback interceptor Interceptor } - - // TODO merge with *waitSchemaSyncedController into another new struct. - ddlSeqNumMu struct { - sync.Mutex - seqNum uint64 - } } // the schema synchronization mechanism now requires strict incremental schema versions. @@ -832,6 +826,7 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { } func (d *ddl) prepareLocalModeWorkers() { + var idAllocator atomic.Uint64 workerFactory := func(tp workerType) func() (pools.Resource, error) { return func() (pools.Resource, error) { wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx) @@ -841,6 +836,7 @@ func (d *ddl) prepareLocalModeWorkers() { } sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) wk.sess = sess.NewSession(sessForJob) + wk.seqAllocator = &idAllocator metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc() return wk, nil } @@ -969,19 +965,6 @@ func (d *ddl) DisableDDL() error { return nil } -// GetNextDDLSeqNum return the next DDL seq num. -func (s *jobScheduler) GetNextDDLSeqNum() (uint64, error) { - var count uint64 - ctx := kv.WithInternalSourceType(s.schCtx, kv.InternalTxnDDL) - err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error { - t := meta.NewMeta(txn) - var err error - count, err = t.GetHistoryDDLCount() - return err - }) - return count, err -} - func (d *ddl) close() { if d.ctx.Err() != nil { return diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 153d9bfff1fad..4cb45668efd45 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -108,7 +108,7 @@ type worker struct { sess *sess.Session // sess is used and only used in running DDL job. delRangeManager delRangeManager logCtx context.Context - seqNumLocked bool + seqAllocator *atomic.Uint64 *ddlCtx } @@ -942,20 +942,13 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { // Notice: warnings is used to support non-strict mode. updateRawArgs = false } - w.writeDDLSeqNum(job) + job.SeqNum = w.seqAllocator.Add(1) w.removeJobCtx(job) failpoint.InjectCall("afterFinishDDLJob", job) err = AddHistoryDDLJob(w.ctx, w.sess, t, job, updateRawArgs) return errors.Trace(err) } -func (w *worker) writeDDLSeqNum(job *model.Job) { - w.ddlSeqNumMu.Lock() - w.ddlSeqNumMu.seqNum++ - w.seqNumLocked = true - job.SeqNum = w.ddlSeqNumMu.seqNum -} - func finishRecoverTable(w *worker, job *model.Job) error { var ( recoverInfo *RecoverInfo @@ -1006,17 +999,6 @@ func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) { } } -func (w *worker) unlockSeqNum(err error) { - if w.seqNumLocked { - if err != nil { - // if meet error, we should reset seqNum. - w.ddlSeqNumMu.seqNum-- - } - w.seqNumLocked = false - w.ddlSeqNumMu.Unlock() - } -} - // DDLBackfillers contains the DDL need backfill step. var DDLBackfillers = map[model.ActionType]string{ model.ActionAddIndex: "add_index", @@ -1099,9 +1081,6 @@ func (w *worker) transitOneJobStep(d *ddlCtx, job *model.Job) (int64, error) { var ( err error ) - defer func() { - w.unlockSeqNum(err) - }() txn, err := w.prepareTxn(job) if err != nil { @@ -1229,10 +1208,6 @@ func (w *worker) checkBeforeCommit() error { // 2. no need to wait schema version(only support create table now). // 3. no register mdl info(only support create table now). func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) { - defer func() { - w.unlockSeqNum(err) - }() - txn, err := w.prepareTxn(job) if err != nil { return err diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index d84e151f4238e..57aab57015424 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -24,6 +24,7 @@ import ( "slices" "strconv" "strings" + "sync/atomic" "time" "github.com/ngaut/pools" @@ -132,9 +133,10 @@ type jobScheduler struct { schemaLoader SchemaLoader minJobIDRefresher *systable.MinJobIDRefresher - // those fields are created on start + // those fields are created or initialized on start reorgWorkerPool *workerPool generalDDLWorkerPool *workerPool + seqAllocator atomic.Uint64 // those fields are shared with 'ddl' instance // TODO ddlCtx is too large for here, we should remove dependency on it. @@ -145,14 +147,6 @@ type jobScheduler struct { } func (s *jobScheduler) start() { - var err error - s.ddlCtx.ddlSeqNumMu.Lock() - defer s.ddlCtx.ddlSeqNumMu.Unlock() - s.ddlCtx.ddlSeqNumMu.seqNum, err = s.GetNextDDLSeqNum() - if err != nil { - logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err)) - } - workerFactory := func(tp workerType) func() (pools.Resource, error) { return func() (pools.Resource, error) { wk := newWorker(s.schCtx, tp, s.sessPool, s.delRangeMgr, s.ddlCtx) @@ -160,6 +154,7 @@ func (s *jobScheduler) start() { if err != nil { return nil, err } + wk.seqAllocator = &s.seqAllocator sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) wk.sess = sess.NewSession(sessForJob) metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc() @@ -185,6 +180,7 @@ func (s *jobScheduler) close() { if s.generalDDLWorkerPool != nil { s.generalDDLWorkerPool.close() } + failpoint.InjectCall("afterSchedulerClose") } func hasSysDB(job *model.Job) bool { diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index 84a2af24814d7..224385bc0c79f 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -547,10 +547,15 @@ type Job struct { // Priority is only used to set the operation priority of adding indices. Priority int `json:"priority"` - // SeqNum is the total order in all DDLs, it's used to identify the order of - // moving the job into DDL history, not the order of the job execution. - // fast create table doesn't honor this field, there might duplicate seq_num in this case. - // TODO: deprecated it, as it forces 'moving jobs into DDL history' part to be serial. + // SeqNum is used to identify the order of moving the job into DDL history, it's + // not the order of the job execution. for jobs with dependency, or if they are + // run in the same session, their SeqNum will be in increasing order. + // when using fast create table, there might duplicate seq_num as any TiDB can + // execute the DDL in this case. + // since 8.3, we only honor previous semantic when DDL owner not changed, on + // owner change, new owner will start it from 1. as previous semantic forces + // 'moving jobs into DDL history' part to be serial, it hurts performance, and + // has very limited usage scenario. SeqNum uint64 `json:"seq_num"` // Charset is the charset when the DDL Job is created.