diff --git a/br/pkg/storage/parse_test.go b/br/pkg/storage/parse_test.go index 3d86ccef1c38d..2dbeaabcb4386 100644 --- a/br/pkg/storage/parse_test.go +++ b/br/pkg/storage/parse_test.go @@ -69,15 +69,15 @@ func TestCreateStorage(t *testing.T) { require.Equal(t, "TestKey", s3.SseKmsKeyId) // special character in access keys - s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw&session-token=FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=`, nil) + s, err = ParseBackend(`s3://bucket4/prefix/path?access-key=******&secret-access-key=******+&session-token=******`, nil) require.NoError(t, err) s3 = s.GetS3() require.NotNil(t, s3) require.Equal(t, "bucket4", s3.Bucket) require.Equal(t, "prefix/path", s3.Prefix) - require.Equal(t, "NXN7IPIOSAAKDEEOLMAF", s3.AccessKey) - require.Equal(t, "nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw", s3.SecretAccessKey) - require.Equal(t, "FQoDYXdzEPP//////////wEaDPv5GPAhRW8pw6/nsiKsAZu7sZDCXPtEBEurxmvyV1r+nWy1I4VPbdIJV+iDnotwS3PKIyj+yDnOeigMf2yp9y2Dg9D7r51vWUyUQQfceZi9/8Ghy38RcOnWImhNdVP5zl1zh85FHz6ytePo+puHZwfTkuAQHj38gy6VF/14GU17qDcPTfjhbETGqEmh8QX6xfmWlO0ZrTmsAo4ZHav8yzbbl3oYdCLICOjMhOO1oY+B/DiURk3ZLPjaXyoo2Iql2QU=", s3.SessionToken) + require.Equal(t, "******", s3.AccessKey) + require.Equal(t, "******+", s3.SecretAccessKey) + require.Equal(t, "******", s3.SessionToken) require.True(t, s3.ForcePathStyle) // parse role ARN and external ID diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 12d0cd0d313f3..c6bb38297402d 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -93,7 +93,7 @@ func (bT backfillerType) String() string { } } -// BackfillJob is for a tidb_ddl_backfill table's record. +// BackfillJob is for a tidb_background_subtask table's record. type BackfillJob struct { ID int64 JobID int64 @@ -104,15 +104,9 @@ type BackfillJob struct { State model.JobState InstanceID string InstanceLease types.Time - // range info - CurrKey []byte - StartKey []byte - EndKey []byte - - StartTS uint64 - FinishTS uint64 - RowCount int64 - Meta *model.BackfillMeta + StartTS uint64 + StateUpdateTS uint64 + Meta *model.BackfillMeta } // AbbrStr returns the BackfillJob's info without the Meta info. @@ -325,7 +319,7 @@ func (w *backfillWorker) updateLease(execID string, bfJob *BackfillJob, nextKey if err != nil { return err } - bfJob.CurrKey = nextKey + bfJob.Meta.CurrKey = nextKey bfJob.InstanceID = execID bfJob.InstanceLease = GetLeaseGoTime(leaseTime, InstanceLease) return w.backfiller.UpdateTask(bfJob) @@ -479,7 +473,7 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul // Change the batch size dynamically. w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize()) result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller) - task.bfJob.RowCount = int64(result.addedCount) + task.bfJob.Meta.RowCount = int64(result.addedCount) if result.err != nil { logutil.BgLogger().Warn("[ddl] backfill worker runTask failed", zap.Stringer("worker", w), zap.String("backfillJob", task.bfJob.AbbrStr()), zap.Error(result.err)) @@ -1152,6 +1146,8 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob Type: reorgInfo.Job.Type, Query: reorgInfo.Job.Query, }, + StartKey: task.startKey, + EndKey: task.endKey, } bj := &BackfillJob{ ID: sJobCtx.currBackfillJobID.Add(1), @@ -1162,11 +1158,9 @@ func addBatchBackfillJobs(sess *session, reorgInfo *reorgInfo, sJobCtx *splitJob Tp: sJobCtx.bfWorkerType, State: model.JobStateNone, InstanceID: instanceID, - CurrKey: task.startKey, - StartKey: task.startKey, - EndKey: task.endKey, Meta: bm, } + bj.Meta.CurrKey = task.startKey bJobs = append(bJobs, bj) } if err := AddBackfillJobs(sess, bJobs); err != nil { diff --git a/ddl/constant.go b/ddl/constant.go index 8e276e7205e56..99d6498c0cea1 100644 --- a/ddl/constant.go +++ b/ddl/constant.go @@ -25,10 +25,10 @@ const ( ReorgTable = "tidb_ddl_reorg" // HistoryTable stores the history DDL jobs. HistoryTable = "tidb_ddl_history" - // BackfillTable stores the information of backfill jobs. - BackfillTable = "tidb_ddl_backfill" - // BackfillHistoryTable stores the information of history backfill jobs. - BackfillHistoryTable = "tidb_ddl_backfill_history" + // BackgroundSubtaskTable stores the information of backfill jobs. + BackgroundSubtaskTable = "tidb_background_subtask" + // BackgroundSubtaskHistoryTable stores the information of history backfill jobs. + BackgroundSubtaskHistoryTable = "tidb_background_subtask_history" // JobTableID is the table ID of `tidb_ddl_job`. JobTableID = meta.MaxInt48 - 1 @@ -38,10 +38,10 @@ const ( HistoryTableID = meta.MaxInt48 - 3 // MDLTableID is the table ID of `tidb_mdl_info`. MDLTableID = meta.MaxInt48 - 4 - // BackfillTableID is the table ID of `tidb_ddl_backfill`. - BackfillTableID = meta.MaxInt48 - 5 - // BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`. - BackfillHistoryTableID = meta.MaxInt48 - 6 + // BackgroundSubtaskTableID is the table ID of `tidb_background_subtask`. + BackgroundSubtaskTableID = meta.MaxInt48 - 5 + // BackgroundSubtaskHistoryTableID is the table ID of `tidb_background_subtask_history`. + BackgroundSubtaskHistoryTableID = meta.MaxInt48 - 6 // JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`. JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))" @@ -49,42 +49,34 @@ const ( ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))" // HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`. HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))" - // BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`. - BackfillTableSQL = "create table " + BackfillTable + `( - id bigint not null, - ddl_job_id bigint not null, - ele_id bigint not null, - ele_key blob, - ddl_physical_tid bigint, + // BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`. + BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `( + id bigint not null auto_increment primary key, + namespace varchar(256), + task_key varchar(256), + ddl_physical_tid bigint(20), type int, - exec_id blob default null, - exec_lease timestamp, - state int, - curr_key blob, - start_key blob, - end_key blob, - start_ts bigint, - finish_ts bigint, - row_count bigint, - backfill_meta longblob, - unique key(ddl_job_id, ele_id, ele_key(20), id))` - // BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`. - BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `( - id bigint not null, - ddl_job_id bigint not null, - ele_id bigint not null, - ele_key blob, - ddl_physical_tid bigint, - type int, - exec_id blob default null, - exec_lease timestamp, - state int, - curr_key blob, - start_key blob, - end_key blob, - start_ts bigint, - finish_ts bigint, - row_count bigint, - backfill_meta longblob, - unique key(ddl_job_id, ele_id, ele_key(20), id))` + exec_id varchar(256), + exec_expired timestamp, + state varchar(64) not null, + checkpoint longblob not null, + start_time bigint, + state_update_time bigint, + meta longblob, + unique key(namespace, task_key))` + // BackgroundSubtaskHistoryTableSQL is the CREATE TABLE SQL of `tidb_background_subtask_history`. + BackgroundSubtaskHistoryTableSQL = "create table " + BackgroundSubtaskHistoryTable + `( + id bigint not null auto_increment primary key, + namespace varchar(256), + task_key varchar(256), + ddl_physical_tid bigint(20), + type int, + exec_id varchar(256), + exec_expired timestamp, + state varchar(64) not null, + checkpoint longblob not null, + start_time bigint, + state_update_time bigint, + meta longblob, + unique key(namespace, task_key))` ) diff --git a/ddl/ddl.go b/ddl/ddl.go index 42032f5dd8a92..e658547b471d3 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -534,6 +534,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, startKey []byte, currElement *meta.El rc.setCurrentElement(currElement) rc.mu.warnings = make(map[errors.ErrorID]*terror.Error) rc.mu.warningsCount = make(map[errors.ErrorID]int64) + rc.references.Add(1) dc.reorgCtx.Lock() defer dc.reorgCtx.Unlock() dc.reorgCtx.reorgCtxMap[jobID] = rc @@ -544,14 +545,22 @@ func (dc *ddlCtx) setReorgCtxForBackfill(bfJob *BackfillJob) { rc := dc.getReorgCtx(bfJob.JobID) if rc == nil { ele := &meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey} - dc.newReorgCtx(bfJob.JobID, bfJob.StartKey, ele, bfJob.RowCount) + dc.newReorgCtx(bfJob.JobID, bfJob.Meta.StartKey, ele, bfJob.Meta.RowCount) + } else { + rc.references.Add(1) } } -func (dc *ddlCtx) removeReorgCtx(job *model.Job) { +func (dc *ddlCtx) removeReorgCtx(jobID int64) { dc.reorgCtx.Lock() defer dc.reorgCtx.Unlock() - delete(dc.reorgCtx.reorgCtxMap, job.ID) + ctx, ok := dc.reorgCtx.reorgCtxMap[jobID] + if ok { + ctx.references.Sub(1) + if ctx.references.Load() == 0 { + delete(dc.reorgCtx.reorgCtxMap, jobID) + } + } } func (dc *ddlCtx) notifyReorgCancel(job *model.Job) { diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index dd0505b58e4db..99e01a8e740c4 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -39,9 +39,14 @@ import ( const testLease = 5 * time.Millisecond +// DDLForTest exports for testing. type DDLForTest interface { // SetInterceptor sets the interceptor. SetInterceptor(h Interceptor) + NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx + SetReorgCtxForBackfill(bfJob *BackfillJob) + GetReorgCtx(jobID int64) *reorgCtx + RemoveReorgCtx(id int64) } // SetInterceptor implements DDL.SetInterceptor interface. @@ -52,6 +57,31 @@ func (d *ddl) SetInterceptor(i Interceptor) { d.mu.interceptor = i } +// IsReorgCanceled exports for testing. +func (rc *reorgCtx) IsReorgCanceled() bool { + return rc.isReorgCanceled() +} + +// NewReorgCtx exports for testing. +func (d *ddl) NewReorgCtx(jobID int64, startKey []byte, currElement *meta.Element, rowCount int64) *reorgCtx { + return d.newReorgCtx(jobID, startKey, currElement, rowCount) +} + +// SetReorgCtxForBackfill exports for testing. +func (d *ddl) SetReorgCtxForBackfill(bfJob *BackfillJob) { + d.setReorgCtxForBackfill(bfJob) +} + +// GetReorgCtx exports for testing. +func (d *ddl) GetReorgCtx(jobID int64) *reorgCtx { + return d.getReorgCtx(jobID) +} + +// RemoveReorgCtx exports for testing. +func (d *ddl) RemoveReorgCtx(id int64) { + d.removeReorgCtx(id) +} + // JobNeedGCForTest is only used for test. var JobNeedGCForTest = jobNeedGC diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 0471740d41ddf..b946490e18e25 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -309,3 +309,31 @@ func TestJobNeedGC(t *testing.T) { }}} require.True(t, ddl.JobNeedGCForTest(job)) } + +func TestUsingReorgCtx(t *testing.T) { + _, domain := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) + d := domain.DDL() + + wg := util.WaitGroupWrapper{} + wg.Run(func() { + jobID := int64(1) + m := &model.BackfillMeta{StartKey: []byte("skey"), RowCount: 1} + bfJob := &ddl.BackfillJob{JobID: jobID, EleID: 1, EleKey: nil, Meta: m} + for i := 0; i < 100; i++ { + d.(ddl.DDLForTest).SetReorgCtxForBackfill(bfJob) + d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled() + d.(ddl.DDLForTest).RemoveReorgCtx(jobID) + } + }) + wg.Run(func() { + jobID := int64(1) + startKey := []byte("skey") + ele := &meta.Element{ID: 1, TypeKey: nil} + for i := 0; i < 100; i++ { + d.(ddl.DDLForTest).NewReorgCtx(jobID, startKey, ele, 0) + d.(ddl.DDLForTest).GetReorgCtx(jobID).IsReorgCanceled() + d.(ddl.DDLForTest).RemoveReorgCtx(jobID) + } + }) + wg.Wait() +} diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index 59f9b17506543..9b2524beb86c5 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -229,8 +229,8 @@ func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBac physicalTable: pt, // TODO: Remove these fields after remove the old logic. sqlQuery: bfJob.Meta.Query, - startKey: bfJob.StartKey, - endKey: bfJob.EndKey, + startKey: bfJob.Meta.StartKey, + endKey: bfJob.Meta.EndKey, endInclude: bfJob.Meta.EndInclude, priority: bfJob.Meta.Priority}, nil } diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 588ef036ef276..43a6dff87bc97 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -343,7 +343,7 @@ func checkReorgJobFinished(ctx context.Context, sess *session, reorgCtxs *reorgC return errors.Trace(err) } - bfJobs, err := getBackfillJobWithRetry(sess, BackfillTable, ddlJobID, currEle.ID, currEle.TypeKey) + bfJobs, err := getBackfillJobWithRetry(sess, BackgroundSubtaskTable, ddlJobID, currEle.ID, currEle.TypeKey) if err != nil { logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", ddlJobID), zap.Stringer("ele", currEle), zap.Error(err)) return errors.Trace(err) @@ -380,10 +380,10 @@ func checkJobIsFinished(sess *session, ddlJobID int64) (bool, error) { return true, nil } - logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", - zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) time.Sleep(RetrySQLInterval) } + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", retrySQLTimes), zap.Error(err)) return false, errors.Trace(err) } @@ -393,8 +393,8 @@ func GetBackfillErr(sess *session, ddlJobID, currEleID int64, currEleKey []byte) var err error var metas []*model.BackfillMeta for i := 0; i < retrySQLTimes; i++ { - metas, err = GetBackfillMetas(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - ddlJobID, currEleID, wrapKey2String(currEleKey)), "get_backfill_job_metas") + metas, err = GetBackfillMetas(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "get_backfill_job_metas") if err == nil { for _, m := range metas { if m.Error != nil { @@ -445,9 +445,9 @@ func checkBackfillJobCount(sess *session, ddlJobID, currEleID int64, currEleKey return 0, errors.Trace(err) } - backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, - fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and ddl_physical_tid = %d", - ddlJobID, currEleID, wrapKey2String(currEleKey), pTblID), "check_backfill_job_count") + backfillJobCnt, err = GetBackfillJobCount(sess, BackgroundSubtaskTable, + fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_count") if err != nil { return 0, errors.Trace(err) } @@ -459,31 +459,29 @@ func getBackfillJobWithRetry(sess *session, tableName string, ddlJobID, currEleI var err error var bJobs []*BackfillJob for i := 0; i < retrySQLTimes; i++ { - bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s limit 1", - ddlJobID, currEleID, wrapKey2String(currEleKey)), "check_backfill_job_state") + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" limit 1", + ddlJobID, hex.EncodeToString(currEleKey), currEleID), "check_backfill_job_state") if err != nil { logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) time.Sleep(RetrySQLInterval) continue } - return bJobs, nil } return nil, errors.Trace(err) } -// GetPhysicalTableMetas gets the max backfill metas per physical table in BackfillTable and BackfillHistoryTable. +// GetPhysicalTableMetas gets the max backfill metas per physical table in BackgroundSubtaskTable and BackgroundSubtaskHistoryTable. func GetPhysicalTableMetas(sess *session, ddlJobID, currEleID int64, currEleKey []byte) (map[int64]*BackfillJobRangeMeta, error) { - condition := fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", ddlJobID, currEleID, wrapKey2String(currEleKey)) - pTblMs, err := GetBackfillIDAndMetas(sess, BackfillTable, condition, "get_ptbl_metas") + condition := fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", ddlJobID, hex.EncodeToString(currEleKey), currEleID) + pTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - hPTblMs, err := GetBackfillIDAndMetas(sess, BackfillHistoryTable, condition, "get_ptbl_metas") + hPTblMs, err := GetBackfillIDAndMetas(sess, BackgroundSubtaskHistoryTable, condition, "get_ptbl_metas") if err != nil { return nil, errors.Trace(err) } - metaMap := make(map[int64]*BackfillJobRangeMeta, len(pTblMs)+len(hPTblMs)) for _, m := range pTblMs { metaMap[m.PhyTblID] = m @@ -506,8 +504,8 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) return s.runInTxn(func(se *session) error { // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. - bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey)), "update_backfill_job") + bJobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID), "update_backfill_job") if err != nil { return errors.Trace(err) } @@ -524,7 +522,7 @@ func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) if err == nil { for _, bj := range bJobs { bj.State = model.JobStateCancelled - bj.FinishTS = startTS + bj.StateUpdateTS = startTS } err = AddBackfillHistoryJob(se, bJobs) } diff --git a/ddl/index.go b/ddl/index.go index 6adb56142a4e2..f7430cc864cdf 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -101,7 +101,7 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde // The multiple column index and the unique index in which the length sum exceeds the maximum size // will return an error instead produce a warning. if ctx == nil || ctx.GetSessionVars().StrictSQLMode || mysql.HasUniKeyFlag(col.GetFlag()) || len(indexPartSpecifications) > 1 { - return nil, false, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength) + return nil, false, dbterror.ErrTooLongKey.GenWithStackByArgs(sumLength, maxIndexLength) } // truncate index length and produce warning message in non-restrict sql mode. colLenPerUint, err := getIndexColumnLength(col, 1) @@ -110,7 +110,7 @@ func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, inde } indexColLen = maxIndexLength / colLenPerUint // produce warning message - ctx.GetSessionVars().StmtCtx.AppendWarning(dbterror.ErrTooLongKey.FastGenByArgs(maxIndexLength)) + ctx.GetSessionVars().StmtCtx.AppendWarning(dbterror.ErrTooLongKey.FastGenByArgs(sumLength, maxIndexLength)) } idxParts = append(idxParts, &model.IndexColumn{ @@ -149,7 +149,7 @@ func checkIndexPrefixLength(columns []*model.ColumnInfo, idxColumns []*model.Ind return err } if idxLen > config.GetGlobalConfig().MaxIndexLength { - return dbterror.ErrTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength) + return dbterror.ErrTooLongKey.GenWithStackByArgs(idxLen, config.GetGlobalConfig().MaxIndexLength) } return nil } @@ -211,7 +211,7 @@ func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumn maxIndexLength := config.GetGlobalConfig().MaxIndexLength if indexColumnLen > maxIndexLength && (ctx == nil || ctx.GetSessionVars().StrictSQLMode) { // return error in strict sql mode - return dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength) + return dbterror.ErrTooLongKey.GenWithStackByArgs(indexColumnLen, maxIndexLength) } return nil } @@ -1374,8 +1374,8 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { s := newSession(w.backfillCtx.sessCtx) return s.runInTxn(func(se *session) error { - jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", - bfJob.JobID, bfJob.EleID, wrapKey2String(bfJob.EleKey), bfJob.ID), "update_backfill_task") + jobs, err := GetBackfillJobs(se, BackgroundSubtaskTable, fmt.Sprintf("task_key = '%d_%s_%d_%d'", + bfJob.JobID, hex.EncodeToString(bfJob.EleKey), bfJob.EleID, bfJob.ID), "update_backfill_task") if err != nil { return err } @@ -1391,7 +1391,7 @@ func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { return err } bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) - return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task") + return updateBackfillJob(se, BackgroundSubtaskTable, bfJob, "update_backfill_task") }) } @@ -1402,7 +1402,7 @@ func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { if err != nil { return errors.Trace(err) } - bfJob.FinishTS = txn.StartTS() + bfJob.StateUpdateTS = txn.StartTS() err = RemoveBackfillJob(se, false, bfJob) if err != nil { return err diff --git a/ddl/job_table.go b/ddl/job_table.go index f36e5dad2e601..710fc7ad9d0ce 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -17,6 +17,7 @@ package ddl import ( "bytes" "context" + "encoding/hex" "fmt" "math" "strconv" @@ -406,6 +407,7 @@ func (d *ddl) loadBackfillJobAndRun() { defer func() { tidbutil.Recover(metrics.LabelDistReorg, "runBackfillJobs", nil, false) d.removeBackfillCtxJobCtx(bfJob.JobID) + d.removeReorgCtx(bfJob.JobID) d.sessPool.put(se) }() @@ -632,8 +634,8 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { } func syncBackfillHistoryJobs(sess *session, uuid string, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("update mysql.%s set state = %d where ddl_job_id = %d and ele_id = %d and ele_key = %s and exec_id = '%s' limit 1;", - BackfillHistoryTable, model.JobStateSynced, backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), uuid) + sql := fmt.Sprintf("update mysql.%s set state = '%s' where task_key like \"%d_%s_%d_%%\" and exec_id = '%s' limit 1;", + BackgroundSubtaskHistoryTable, model.JobStateSynced.String(), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, uuid) _, err := sess.execute(context.Background(), sql, "sync_backfill_history_job") return err } @@ -642,7 +644,7 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) sqlBuilder := strings.Builder{} sqlBuilder.WriteString("insert into mysql.") sqlBuilder.WriteString(tableName) - sqlBuilder.WriteString("(id, ddl_job_id, ele_id, ele_key, ddl_physical_tid, type, exec_id, exec_lease, state, curr_key, start_key, end_key, start_ts, finish_ts, row_count, backfill_meta) values") + sqlBuilder.WriteString("(task_key, ddl_physical_tid, type, exec_id, exec_expired, state, checkpoint, start_time, state_update_time, meta) values") for i, bj := range backfillJobs { mateByte, err := bj.Meta.Encode() if err != nil { @@ -652,17 +654,17 @@ func generateInsertBackfillJobSQL(tableName string, backfillJobs []*BackfillJob) if i != 0 { sqlBuilder.WriteString(", ") } - sqlBuilder.WriteString(fmt.Sprintf("(%d, %d, %d, %s, %d, %d, '%s', '%s', %d, %s, %s, %s, %d, %d, %d, %s)", bj.ID, bj.JobID, bj.EleID, - wrapKey2String(bj.EleKey), bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State, wrapKey2String(bj.CurrKey), - wrapKey2String(bj.StartKey), wrapKey2String(bj.EndKey), bj.StartTS, bj.FinishTS, bj.RowCount, wrapKey2String(mateByte))) + sqlBuilder.WriteString(fmt.Sprintf("('%d_%s_%d_%d', %d, %d, '%s', '%s', '%s', %s, %d, %d, %s)", + bj.JobID, hex.EncodeToString(bj.EleKey), bj.EleID, bj.ID, bj.PhysicalTableID, bj.Tp, bj.InstanceID, bj.InstanceLease, bj.State.String(), wrapKey2String(bj.Meta.CurrKey), + bj.StartTS, bj.StateUpdateTS, wrapKey2String(mateByte))) } return sqlBuilder.String(), nil } -// AddBackfillHistoryJob adds the backfill jobs to the tidb_ddl_backfill_history table. +// AddBackfillHistoryJob adds the backfill jobs to the tidb_background_subtask_history table. func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { - label := fmt.Sprintf("add_%s_job", BackfillHistoryTable) - sql, err := generateInsertBackfillJobSQL(BackfillHistoryTable, backfillJobs) + label := fmt.Sprintf("add_%s_job", BackgroundSubtaskHistoryTable) + sql, err := generateInsertBackfillJobSQL(BackgroundSubtaskHistoryTable, backfillJobs) if err != nil { return err } @@ -670,9 +672,9 @@ func AddBackfillHistoryJob(sess *session, backfillJobs []*BackfillJob) error { return errors.Trace(err) } -// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +// AddBackfillJobs adds the backfill jobs to the tidb_background_subtask table. func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { - label := fmt.Sprintf("add_%s_job", BackfillTable) + label := fmt.Sprintf("add_%s_job", BackgroundSubtaskTable) // Do runInTxn to get StartTS. return s.runInTxn(func(se *session) error { txn, err := se.txn() @@ -684,7 +686,7 @@ func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { bj.StartTS = startTS } - sql, err := generateInsertBackfillJobSQL(BackfillTable, backfillJobs) + sql, err := generateInsertBackfillJobSQL(BackgroundSubtaskTable, backfillJobs) if err != nil { return err } @@ -696,16 +698,8 @@ func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { // GetBackfillJobForOneEle gets the backfill jobs in the tblName table that contains only one element. func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Duration) (*BackfillJob, error) { eJobIDsBuilder := strings.Builder{} - for i, id := range excludedJobIDs { - if i == 0 { - eJobIDsBuilder.WriteString(" and ddl_job_id not in (") - } - eJobIDsBuilder.WriteString(strconv.Itoa(int(id))) - if i == len(excludedJobIDs)-1 { - eJobIDsBuilder.WriteString(")") - } else { - eJobIDsBuilder.WriteString(", ") - } + for _, id := range excludedJobIDs { + eJobIDsBuilder.WriteString(fmt.Sprintf(" and task_key not like \"%d_%%\"", id)) } var err error @@ -716,9 +710,8 @@ func GetBackfillJobForOneEle(s *session, excludedJobIDs []int64, lease time.Dura return err } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - - bJobs, err = GetBackfillJobs(se, BackfillTable, - fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit 1", + bJobs, err = GetBackfillJobs(se, BackgroundSubtaskTable, + fmt.Sprintf("(exec_id = '' or exec_expired < '%v') %s order by task_key limit 1", leaseStr, eJobIDsBuilder.String()), "get_backfill_job") return err }) @@ -741,13 +734,13 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st } leaseStr := currTime.Add(-lease).Format(types.TimeFormat) - getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d", + getJobsSQL := fmt.Sprintf("(exec_id = '' or exec_expired < '%v') and task_key like \"%d_%%\" order by task_key limit %d", leaseStr, jobID, batch) if pTblID != getJobWithoutPartition { if pTblID == 0 { rows, err := s.execute(context.Background(), - fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having max(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1", - BackfillTable, leaseStr), "get_mark_backfill_job") + fmt.Sprintf("select ddl_physical_tid from mysql.%s group by substring_index(task_key,\"_\",3), ddl_physical_tid having max(length(exec_id)) = 0 or max(exec_expired) < '%s' order by substring_index(task_key,\"_\",3), ddl_physical_tid limit 1", + BackgroundSubtaskTable, leaseStr), "get_mark_backfill_job") if err != nil { return errors.Trace(err) } @@ -758,11 +751,11 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st pTblID = rows[0].GetInt64(0) } - getJobsSQL = fmt.Sprintf("(exec_ID = '' or exec_lease < '%s') and ddl_job_id = %d and ddl_physical_tid = %d order by ddl_job_id, ele_key, ele_id limit %d", + getJobsSQL = fmt.Sprintf("(exec_id = '' or exec_expired < '%s') and task_key like \"%d_%%\" and ddl_physical_tid = %d order by task_key limit %d", leaseStr, jobID, pTblID, batch) } - bJobs, err = GetBackfillJobs(se, BackfillTable, getJobsSQL, "get_mark_backfill_job") + bJobs, err = GetBackfillJobs(se, BackgroundSubtaskTable, getJobsSQL, "get_mark_backfill_job") if err != nil { return err } @@ -781,7 +774,7 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st bJobs[i].InstanceID = uuid bJobs[i].InstanceLease = GetLeaseGoTime(currTime, lease) // TODO: batch update - if err = updateBackfillJob(se, BackfillTable, bJobs[i], "get_mark_backfill_job"); err != nil { + if err = updateBackfillJob(se, BackgroundSubtaskTable, bJobs[i], "get_mark_backfill_job"); err != nil { return err } } @@ -796,8 +789,8 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st // GetInterruptedBackfillJobForOneEle gets an interrupted backfill job that contains only one element. func GetInterruptedBackfillJobForOneEle(sess *session, jobID, eleID int64, eleKey []byte) ([]*BackfillJob, error) { - bJobs, err := GetBackfillJobs(sess, BackfillHistoryTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s and state = %d limit 1", - jobID, eleID, wrapKey2String(eleKey), model.JobStateCancelled), "get_interrupt_backfill_job") + bJobs, err := GetBackfillJobs(sess, BackgroundSubtaskHistoryTable, fmt.Sprintf("task_key like \"%d_%s_%d_%%\" and state = \"%s\" limit 1", + jobID, hex.EncodeToString(eleKey), eleID, model.JobStateCancelled.String()), "get_interrupt_backfill_job") if err != nil || len(bJobs) == 0 { return nil, err } @@ -819,7 +812,7 @@ func GetBackfillJobCount(sess *session, tblName, condition string, label string) // GetBackfillMetas gets the backfill metas in the tblName table according to condition. func GetBackfillMetas(sess *session, tblName, condition string, label string) ([]*model.BackfillMeta, error) { - rows, err := sess.execute(context.Background(), fmt.Sprintf("select backfill_meta from mysql.%s where %s", tblName, condition), label) + rows, err := sess.execute(context.Background(), fmt.Sprintf("select meta from mysql.%s where %s", tblName, condition), label) if err != nil { return nil, errors.Trace(err) } @@ -842,9 +835,9 @@ func GetBackfillMetas(sess *session, tblName, condition string, label string) ([ // GetBackfillIDAndMetas gets the backfill IDs and metas in the tblName table according to condition. func GetBackfillIDAndMetas(sess *session, tblName, condition string, label string) ([]*BackfillJobRangeMeta, error) { - sql := "select tbl.id, tbl.curr_key, tbl.end_key, tbl.ddl_physical_tid from (select max(id) max_id, ddl_physical_tid " + + sql := "select tbl.task_key, tbl.meta, tbl.ddl_physical_tid from (select max(task_key) max_id, ddl_physical_tid " + fmt.Sprintf(" from mysql.%s tbl where %s group by ddl_physical_tid) tmp join mysql.%s tbl", - tblName, condition, tblName) + " on tbl.id=tmp.max_id and tbl.ddl_physical_tid=tmp.ddl_physical_tid;" + tblName, condition, tblName) + " on tbl.task_key=tmp.max_id and tbl.ddl_physical_tid=tmp.ddl_physical_tid;" rows, err := sess.execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -855,21 +848,31 @@ func GetBackfillIDAndMetas(sess *session, tblName, condition string, label strin pTblMetas := make([]*BackfillJobRangeMeta, 0, len(rows)) for _, r := range rows { + key := r.GetString(0) + keySlice := strings.Split(key, "_") + id, err := strconv.ParseInt(keySlice[3], 10, 64) + if err != nil { + return nil, err + } + meta := &model.BackfillMeta{} + err = meta.Decode(r.GetBytes(1)) + if err != nil { + return nil, err + } pTblMeta := BackfillJobRangeMeta{ - ID: r.GetInt64(0), - StartKey: r.GetBytes(1), - EndKey: r.GetBytes(2), - PhyTblID: r.GetInt64(3), + ID: id, + StartKey: meta.StartKey, + EndKey: meta.EndKey, + PhyTblID: r.GetInt64(2), } pTblMetas = append(pTblMetas, &pTblMeta) } - return pTblMetas, nil } func getUnsyncedInstanceIDs(sess *session, jobID int64, label string) ([]string, error) { - sql := fmt.Sprintf("select sum((state=%d) + (state=%d)) as tmp, exec_id from mysql.tidb_ddl_backfill_history where ddl_job_id = %d group by exec_id having tmp = 0;", - model.JobStateSynced, model.JobStateCancelled, jobID) + sql := fmt.Sprintf("select sum((state='%s') + (state='%s')) as tmp, exec_id from mysql.tidb_background_subtask_history where task_key like \"%d_%%\" group by exec_id having tmp = 0;", + model.JobStateSynced.String(), model.JobStateCancelled.String(), jobID) rows, err := sess.execute(context.Background(), sql, label) if err != nil { return nil, errors.Trace(err) @@ -890,40 +893,56 @@ func GetBackfillJobs(sess *session, tblName, condition string, label string) ([] } bJobs := make([]*BackfillJob, 0, len(rows)) for _, row := range rows { + key := row.GetString(2) + keySlice := strings.Split(key, "_") + jobID, err := strconv.ParseInt(keySlice[0], 10, 64) + if err != nil { + return nil, err + } + eleKey, err := hex.DecodeString(keySlice[1]) + if err != nil { + return nil, err + } + eleID, err := strconv.ParseInt(keySlice[2], 10, 64) + if err != nil { + return nil, err + } + id, err := strconv.ParseInt(keySlice[3], 10, 64) + if err != nil { + return nil, err + } bfJob := BackfillJob{ - ID: row.GetInt64(0), - JobID: row.GetInt64(1), - EleID: row.GetInt64(2), - EleKey: row.GetBytes(3), - PhysicalTableID: row.GetInt64(4), - Tp: backfillerType(row.GetInt64(5)), - InstanceID: row.GetString(6), - InstanceLease: row.GetTime(7), - State: model.JobState(row.GetInt64(8)), - CurrKey: row.GetBytes(9), - StartKey: row.GetBytes(10), - EndKey: row.GetBytes(11), - StartTS: row.GetUint64(12), - FinishTS: row.GetUint64(13), - RowCount: row.GetInt64(14), + ID: id, + JobID: jobID, + EleID: eleID, + EleKey: eleKey, + PhysicalTableID: row.GetInt64(3), + Tp: backfillerType(row.GetInt64(4)), + InstanceID: row.GetString(5), + InstanceLease: row.GetTime(6), + State: model.StrToJobState(row.GetString(7)), + StartTS: row.GetUint64(9), + StateUpdateTS: row.GetUint64(10), } bfJob.Meta = &model.BackfillMeta{} - err = bfJob.Meta.Decode(row.GetBytes(15)) + err = bfJob.Meta.Decode(row.GetBytes(11)) if err != nil { return nil, errors.Trace(err) } + bfJob.Meta.CurrKey = row.GetBytes(8) bJobs = append(bJobs, &bfJob) } return bJobs, nil } -// RemoveBackfillJob removes the backfill jobs from the tidb_ddl_backfill table. +// RemoveBackfillJob removes the backfill jobs from the tidb_background_subtask table. // If isOneEle is true, removes all jobs with backfillJob's ddl_job_id, ele_id and ele_key. Otherwise, removes the backfillJob. func RemoveBackfillJob(sess *session, isOneEle bool, backfillJob *BackfillJob) error { - sql := fmt.Sprintf("delete from mysql.tidb_ddl_backfill where ddl_job_id = %d and ele_id = %d and ele_key = %s", - backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey)) + sql := "delete from mysql.tidb_background_subtask" if !isOneEle { - sql += fmt.Sprintf(" and id = %d", backfillJob.ID) + sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%d\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) + } else { + sql += fmt.Sprintf(" where task_key like \"%d_%s_%d_%%\"", backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID) } _, err := sess.execute(context.Background(), sql, "remove_backfill_job") return err @@ -934,9 +953,9 @@ func updateBackfillJob(sess *session, tableName string, backfillJob *BackfillJob if err != nil { return err } - sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_lease = '%s', state = %d, curr_key = %s, row_count = %d, backfill_meta = %s where ddl_job_id = %d and ele_id = %d and ele_key = %s and id = %d", - tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State, wrapKey2String(backfillJob.CurrKey), backfillJob.RowCount, - wrapKey2String(mate), backfillJob.JobID, backfillJob.EleID, wrapKey2String(backfillJob.EleKey), backfillJob.ID) + sql := fmt.Sprintf("update mysql.%s set exec_id = '%s', exec_expired = '%s', state = '%s', checkpoint = %s, meta = %s where task_key = '%d_%s_%d_%d'", + tableName, backfillJob.InstanceID, backfillJob.InstanceLease, backfillJob.State.String(), wrapKey2String(backfillJob.Meta.CurrKey), + wrapKey2String(mate), backfillJob.JobID, hex.EncodeToString(backfillJob.EleKey), backfillJob.EleID, backfillJob.ID) _, err = sess.execute(context.Background(), sql, label) return err } diff --git a/ddl/job_table_test.go b/ddl/job_table_test.go index ae04c0eb6dd85..2f7a180f7b9ca 100644 --- a/ddl/job_table_test.go +++ b/ddl/job_table_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "context" + "encoding/hex" "fmt" "strconv" "strings" @@ -198,6 +199,8 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query TableID: tblID, Query: query, }, + StartKey: sKey, + EndKey: eKey, } bj := &ddl.BackfillJob{ ID: int64(i), @@ -207,11 +210,9 @@ func makeAddIdxBackfillJobs(schemaID, tblID, jobID, eleID int64, cnt int, query State: model.JobStateNone, PhysicalTableID: 1, InstanceLease: types.ZeroTimestamp, - CurrKey: sKey, - StartKey: sKey, - EndKey: eKey, Meta: bm, } + bj.Meta.CurrKey = sKey bJobs = append(bJobs, bj) } return bJobs @@ -230,8 +231,8 @@ func equalBackfillJob(t *testing.T, a, b *ddl.BackfillJob, lessTime types.Time) } func getIdxConditionStr(jobID, eleID int64) string { - return fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = %s", - jobID, eleID, wrapKey2String(meta.IndexElementKey)) + return fmt.Sprintf("task_key like \"%d_%s_%d_%%\"", + jobID, hex.EncodeToString(meta.IndexElementKey), eleID) } func readInTxn(se sessionctx.Context, f func(sessionctx.Context)) (err error) { @@ -248,8 +249,8 @@ func backfillJob2PTblMetaMap(bJob *ddl.BackfillJob) map[int64]*ddl.BackfillJobRa m := &ddl.BackfillJobRangeMeta{ ID: bJob.ID, PhyTblID: bJob.PhysicalTableID, - StartKey: bJob.StartKey, - EndKey: bJob.EndKey, + StartKey: bJob.Meta.StartKey, + EndKey: bJob.Meta.EndKey, } mMap := make(map[int64]*ddl.BackfillJobRangeMeta) mMap[m.PhyTblID] = m @@ -280,7 +281,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err := ddl.GetAndMarkBackfillJobsForOneEle(se, 1, jobID1, uuid, noPID, instanceLease) require.EqualError(t, err, dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job").Error()) require.Nil(t, bJobs) - allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") + allCnt, err := ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID2), "check_backfill_job_count") require.NoError(t, err) require.Equal(t, allCnt, 0) // Test some backfill jobs, add backfill jobs to the table. @@ -293,7 +294,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bjTestCases = append(bjTestCases, bJobs2...) bjTestCases = append(bjTestCases, bJobs3...) err = ddl.AddBackfillJobs(se, bjTestCases) - require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_lease' at row 1") + require.Equal(t, err.Error(), "[table:1292]Incorrect timestamp value: '0000-00-00 00:00:00' for column 'exec_expired' at row 1") tk.Session().GetSessionVars().SQLMode = mysql.ModeNone err = ddl.AddBackfillJobs(se, bjTestCases) // ID jobID eleID InstanceID PhysicalTableID @@ -336,7 +337,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { }) currGoTime := ddl.GetLeaseGoTime(currTime, instanceLease) require.GreaterOrEqual(t, currGoTime.Compare(bJobs[0].InstanceLease), 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, cnt) // test physical table @@ -503,10 +504,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 // 1 jobID2 eleID3 require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 1) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, cnt) // remove all backfill jobs @@ -517,10 +518,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 0 jobID2 eleID3 // 1 jobID2 eleID3 require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 1) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID2), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) // clean backfill job @@ -541,11 +542,11 @@ func TestSimpleExecBackfillJobs(t *testing.T) { currTime, err = ddl.GetOracleTimeWithStartTS(se) require.NoError(t, err) }) - condition := fmt.Sprintf("exec_ID = '' or exec_lease < '%v' and ddl_job_id = %d", currTime.Add(-instanceLease), jobID2) - bJobs, err = ddl.GetBackfillJobs(se, ddl.BackfillHistoryTable, condition, "test_get_bj") + condition := fmt.Sprintf("exec_id = '' or exec_expired < '%v' and task_key like \"%d_%%\"", currTime.Add(-instanceLease), jobID2) + bJobs, err = ddl.GetBackfillJobs(se, ddl.BackgroundSubtaskHistoryTable, condition, "test_get_bj") require.NoError(t, err) require.Len(t, bJobs, 1) - require.Equal(t, bJobs[0].FinishTS, uint64(0)) + require.Equal(t, bJobs[0].StateUpdateTS, uint64(0)) // test GetMaxBackfillJob pTblMeta, err := ddl.GetPhysicalTableMetas(se, bJobs3[0].JobID, bJobs3[0].EleID, eleKey) @@ -590,10 +591,10 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, "ID:3, JobID:1, EleID:11, Type:add index, State:cancelled, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs1[1].AbbrStr()) require.Equal(t, "ID:0, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[0].AbbrStr()) require.Equal(t, "ID:1, JobID:2, EleID:33, Type:add index, State:none, InstanceID:, InstanceLease:0000-00-00 00:00:00", bJobs3[1].AbbrStr()) - // test select tidb_ddl_backfill - tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). + // test select tidb_background_subtask + tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[0].ID, getIdxConditionStr(jobID1, eleID1))). Check(testkit.Rows(fmt.Sprintf("%s 0000-00-00 00:00:00", uuid))) - tk.MustQuery(fmt.Sprintf("select exec_id, exec_lease from mysql.tidb_ddl_backfill where id = %d and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). + tk.MustQuery(fmt.Sprintf("select exec_id, exec_expired from mysql.tidb_background_subtask where task_key like \"%%%d\" and %s", bJobs1[1].ID, getIdxConditionStr(jobID1, eleID1))). Check(testkit.Rows(" 0000-00-00 00:00:00")) // test GetBackfillMetas bfErr := ddl.GetBackfillErr(se, jobID1, eleID1, eleKey) @@ -606,7 +607,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs1[1].State = model.JobStateNone bJobs1[1].ID = 4 err = ddl.AddBackfillHistoryJob(se, bJobs1) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -618,18 +619,18 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 2 jobID1 eleID1 JobStateRollingback // 3 jobID1 eleID1 JobStateCancelled // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone // 4 jobID1 eleID1 JobStateNone pTblMeta, err = ddl.GetPhysicalTableMetas(se, jobID1, eleID1, eleKey) require.NoError(t, err) - require.Equal(t, backfillJob2PTblMetaMap(bJobs1[0]), pTblMeta) + require.Equal(t, backfillJob2PTblMetaMap(bJobs1[0]), pTblMeta) // ??????????? bJobs1[0].ID = 6 bJobs1[1].ID = 7 err = ddl.AddBackfillJobs(se, bJobs1) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -643,7 +644,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone @@ -653,18 +654,18 @@ func TestSimpleExecBackfillJobs(t *testing.T) { require.Equal(t, backfillJob2PTblMetaMap(bJobs1[1]), pTblMeta) // test MoveBackfillJobsToHistoryTable and GetInterruptedBackfillJobForOneEle - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs3[0]) require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskHistoryTable, getIdxConditionStr(jobID2, eleID3), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 2) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID1 eleID1 JobStateNone @@ -676,7 +677,7 @@ func TestSimpleExecBackfillJobs(t *testing.T) { // 6 jobID1 eleID1 JobStateNone // 7 jobID1 eleID1 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone @@ -686,15 +687,15 @@ func TestSimpleExecBackfillJobs(t *testing.T) { bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID1, eleID1, eleKey) require.NoError(t, err) require.Len(t, bJobs, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 6) err = ddl.MoveBackfillJobsToHistoryTable(se, bJobs1[0]) require.NoError(t, err) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 0) - allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackfillHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") + allCnt, err = ddl.GetBackfillJobCount(se, ddl.BackgroundSubtaskHistoryTable, getIdxConditionStr(jobID1, eleID1), "test_get_bj") require.NoError(t, err) require.Equal(t, allCnt, 8) bJobs, err = ddl.GetInterruptedBackfillJobForOneEle(se, jobID2, eleID3, eleKey) @@ -706,13 +707,13 @@ func TestSimpleExecBackfillJobs(t *testing.T) { } expectJob.State = model.JobStateCancelled equalBackfillJob(t, bJobs3[0], bJobs[0], types.ZeroTimestamp) - // BackfillTable + // BackgroundSubtaskTable // ID jobID eleID state // -------------------------------- // 0 jobID2 eleID2 JobStateNone // 1 jobID2 eleID2 JobStateNone // - // BackfillHistoryTable + // BackgroundSubtaskHistoryTable // ID jobID eleID state // -------------------------------- // 5 jobID1 eleID1 JobStateNone diff --git a/ddl/reorg.go b/ddl/reorg.go index 0ae356224b629..3bde3fdaa844b 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tipb/go-tipb" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -77,6 +78,8 @@ type reorgCtx struct { warnings map[errors.ErrorID]*terror.Error warningsCount map[errors.ErrorID]int64 } + + references atomicutil.Int32 } // nullableKey can store kv.Key. @@ -229,7 +232,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo case err := <-rc.doneCh: // Since job is cancelled,we don't care about its partial counts. if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { - d.removeReorgCtx(job) + d.removeReorgCtx(job.ID) return dbterror.ErrCancelledDDLJob } rowCount := rc.getRowCount() @@ -244,7 +247,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo // Update a job's warnings. w.mergeWarningsIntoJob(job) - d.removeReorgCtx(job) + d.removeReorgCtx(job.ID) // For other errors, even err is not nil here, we still wait the partial counts to be collected. // since in the next round, the startKey is brand new which is stored by last time. if err != nil { @@ -254,7 +257,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo updateBackfillProgress(w, reorgInfo, tblInfo, 0) case <-w.ctx.Done(): logutil.BgLogger().Info("[ddl] run reorg job quit") - d.removeReorgCtx(job) + d.removeReorgCtx(job.ID) // We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break. return dbterror.ErrWaitReorgTimeout case <-time.After(waitTimeout): diff --git a/ddl/serial_test.go b/ddl/serial_test.go index 668b675a0b185..4d11db2b3409c 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -102,7 +102,7 @@ func TestChangeMaxIndexLength(t *testing.T) { tk.MustExec("create table t (c1 varchar(3073), index(c1)) charset = ascii") tk.MustExec(fmt.Sprintf("create table t1 (c1 varchar(%d), index(c1)) charset = ascii;", config.DefMaxOfMaxIndexLength)) err := tk.ExecToErr(fmt.Sprintf("create table t2 (c1 varchar(%d), index(c1)) charset = ascii;", config.DefMaxOfMaxIndexLength+1)) - require.EqualError(t, err, "[ddl:1071]Specified key was too long; max key length is 12288 bytes") + require.EqualError(t, err, "[ddl:1071]Specified key was too long (12289 bytes); max key length is 12288 bytes") } func TestCreateTableWithLike(t *testing.T) { diff --git a/errno/errname.go b/errno/errname.go index 39deb486ff2fb..0f6c3db0e1645 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -92,7 +92,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrMultiplePriKey: mysql.Message("Multiple primary key defined", nil), ErrTooManyKeys: mysql.Message("Too many keys specified; max %d keys allowed", nil), ErrTooManyKeyParts: mysql.Message("Too many key parts specified; max %d parts allowed", nil), - ErrTooLongKey: mysql.Message("Specified key was too long; max key length is %d bytes", nil), + ErrTooLongKey: mysql.Message("Specified key was too long (%d bytes); max key length is %d bytes", nil), ErrKeyColumnDoesNotExits: mysql.Message("Key column '%-.192s' doesn't exist in table", nil), ErrBlobUsedAsKey: mysql.Message("BLOB column '%-.192s' can't be used in key specification with the used table type", nil), ErrTooBigFieldlength: mysql.Message("Column length too big for column '%-.192s' (max = %d); use BLOB or TEXT instead", nil), diff --git a/errors.toml b/errors.toml index d7808c34337a1..9b503c113a836 100644 --- a/errors.toml +++ b/errors.toml @@ -668,7 +668,7 @@ Too many keys specified; max %d keys allowed ["ddl:1071"] error = ''' -Specified key was too long; max key length is %d bytes +Specified key was too long (%d bytes); max key length is %d bytes ''' ["ddl:1072"] diff --git a/executor/hash_table.go b/executor/hash_table.go index 57d4519be50b8..2bfdc05c69244 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -241,10 +241,14 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize lastChunkBufPointer *chunk.Chunk = nil memDelta int64 = 0 + needTrackMemUsage = cap(innerPtrs) > signalCheckpointForJoin ) c.chkBuf = nil - c.memTracker.Consume(-c.chkBufSizeForOneProbe + int64(cap(innerPtrs))*rowPtrSize) - defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta) + c.memTracker.Consume(-c.chkBufSizeForOneProbe) + if needTrackMemUsage { + c.memTracker.Consume(int64(cap(innerPtrs)) * rowPtrSize) + defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta) + } c.chkBufSizeForOneProbe = 0 for i, ptr := range innerPtrs { @@ -257,13 +261,13 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk if err != nil { return nil, nil, err } - if c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil { + if needTrackMemUsage && c.chkBuf != lastChunkBufPointer && lastChunkBufPointer != nil { lastChunkSize := lastChunkBufPointer.MemoryUsage() c.chkBufSizeForOneProbe += lastChunkSize memDelta += lastChunkSize } lastChunkBufPointer = c.chkBuf - if i&signalCheckpointForJoin == 0 { + if needTrackMemUsage && (i&signalCheckpointForJoin == (signalCheckpointForJoin - 1)) { // Trigger Consume for checking the OOM Action signal memDelta += int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize - matchedDataSize matchedDataSize = int64(cap(matched))*rowSize + int64(cap(matchedPtrs))*rowPtrSize diff --git a/executor/recover_test.go b/executor/recover_test.go index 806e251f17769..92aa83ec99d3e 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -296,6 +296,43 @@ func TestRecoverTableMeetError(t *testing.T) { tk.MustContainErrMsg("select * from t_recover", "Table 'test_recover.t_recover' doesn't exist") } +func TestRecoverTablePrivilege(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + // Set GC safe point + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t_recover") + tk.MustExec("create table t_recover (a int);") + tk.MustExec("drop table t_recover") + + // Recover without drop/create privilege. + tk.MustExec("CREATE USER 'testrecovertable'@'localhost';") + newTk := testkit.NewTestKit(t, store) + require.NoError(t, newTk.Session().Auth(&auth.UserIdentity{Username: "testrecovertable", Hostname: "localhost"}, nil, nil)) + newTk.MustGetErrCode("recover table t_recover", errno.ErrTableaccessDenied) + newTk.MustGetErrCode("flashback table t_recover", errno.ErrTableaccessDenied) + + // Got drop privilege, still failed. + tk.MustExec("grant drop on *.* to 'testrecovertable'@'localhost';") + newTk.MustGetErrCode("recover table t_recover", errno.ErrTableaccessDenied) + newTk.MustGetErrCode("flashback table t_recover", errno.ErrTableaccessDenied) + + // Got select, create and drop privilege, execute success. + tk.MustExec("grant select,create on *.* to 'testrecovertable'@'localhost';") + newTk.MustExec("use test") + newTk.MustExec("recover table t_recover") + newTk.MustExec("drop table t_recover") + newTk.MustExec("flashback table t_recover") + + tk.MustExec("drop user 'testrecovertable'@'localhost';") +} + func TestRecoverClusterMeetError(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -499,6 +536,26 @@ func TestFlashbackSchema(t *testing.T) { tk.MustExec("use test2") tk.MustQuery("select a from t order by a").Check(testkit.Rows("1", "2", "3")) tk.MustQuery("select a from t1 order by a").Check(testkit.Rows("4", "5", "6")) + + tk.MustExec("drop database if exists t_recover") + tk.MustExec("create database t_recover") + tk.MustExec("drop database t_recover") + + // Recover without drop/create privilege. + tk.MustExec("CREATE USER 'testflashbackschema'@'localhost';") + newTk := testkit.NewTestKit(t, store) + require.NoError(t, newTk.Session().Auth(&auth.UserIdentity{Username: "testflashbackschema", Hostname: "localhost"}, nil, nil)) + newTk.MustGetErrCode("flashback database t_recover", errno.ErrDBaccessDenied) + + // Got drop privilege, still failed. + tk.MustExec("grant drop on *.* to 'testflashbackschema'@'localhost';") + newTk.MustGetErrCode("flashback database t_recover", errno.ErrDBaccessDenied) + + // Got create and drop privilege, execute success. + tk.MustExec("grant create on *.* to 'testflashbackschema'@'localhost';") + newTk.MustExec("flashback schema t_recover") + + tk.MustExec("drop user 'testflashbackschema'@'localhost';") } // MockGC is used to make GC work in the test environment. diff --git a/meta/meta.go b/meta/meta.go index 801273a98cbf9..af02ae7882ac3 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -132,7 +132,7 @@ const ( BaseDDLTableVersion DDLTableVersion = 1 // MDLTableVersion is for support MDL tables. MDLTableVersion DDLTableVersion = 2 - // BackfillTableVersion is for support distributed reorg stage, it added tidb_ddl_backfill, tidb_ddl_backfill_history. + // BackfillTableVersion is for support distributed reorg stage, it added tidb_background_subtask, tidb_background_subtask_history. BackfillTableVersion DDLTableVersion = 3 ) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 3eec005372384..36ada1ed9b3ea 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -446,8 +446,11 @@ type BackfillMeta struct { WarningsCount map[errors.ErrorID]int64 `json:"warnings_count"` Location *TimeZoneLocation `json:"location"` ReorgTp ReorgType `json:"reorg_tp"` - - *JobMeta `json:"job_meta"` + RowCount int64 `json:"row_count"` + StartKey []byte `json:"start_key"` + EndKey []byte `json:"end_key"` + CurrKey []byte `json:"curr_key"` + *JobMeta `json:"job_meta"` } // Encode encodes BackfillMeta with json format. @@ -952,6 +955,30 @@ func (s JobState) String() string { } } +// StrToJobState converts string to JobState. +func StrToJobState(s string) JobState { + switch s { + case "running": + return JobStateRunning + case "rollingback": + return JobStateRollingback + case "rollback done": + return JobStateRollbackDone + case "done": + return JobStateDone + case "cancelled": + return JobStateCancelled + case "cancelling": + return JobStateCancelling + case "synced": + return JobStateSynced + case "queueing": + return JobStateQueueing + default: + return JobStateNone + } +} + // SchemaDiff contains the schema modification at a particular schema version. // It is used to reduce schema reload cost. type SchemaDiff struct { diff --git a/parser/mysql/errname.go b/parser/mysql/errname.go index 3066d9f6fb6b0..5b1cff580f658 100644 --- a/parser/mysql/errname.go +++ b/parser/mysql/errname.go @@ -97,7 +97,7 @@ var MySQLErrName = map[uint16]*ErrMessage{ ErrMultiplePriKey: Message("Multiple primary key defined", nil), ErrTooManyKeys: Message("Too many keys specified; max %d keys allowed", nil), ErrTooManyKeyParts: Message("Too many key parts specified; max %d parts allowed", nil), - ErrTooLongKey: Message("Specified key was too long; max key length is %d bytes", nil), + ErrTooLongKey: Message("Specified key was too long (%d bytes); max key length is %d bytes", nil), ErrKeyColumnDoesNotExits: Message("Key column '%-.192s' doesn't exist in table", nil), ErrBlobUsedAsKey: Message("BLOB column '%-.192s' can't be used in key specification with the used table type", nil), ErrTooBigFieldlength: Message("Column length too big for column '%-.192s' (max = %d); use BLOB or TEXT instead", nil), diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 72e2cc048bf55..fc5ee481aba76 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4727,9 +4727,39 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err } b.visitInfo = appendVisitInfo(b.visitInfo, mysql.InsertPriv, v.TableToTables[0].NewTable.Schema.L, v.TableToTables[0].NewTable.Name.L, "", authErr) - case *ast.RecoverTableStmt, *ast.FlashBackTableStmt, *ast.FlashBackDatabaseStmt: - // Recover table command can only be executed by administrator. - b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) + case *ast.RecoverTableStmt: + if v.Table == nil { + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) + } else { + if b.ctx.GetSessionVars().User != nil { + authErr = ErrTableaccessDenied.GenWithStackByArgs("CREATE", b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.Table.Schema.L, v.Table.Name.L, "", authErr) + if b.ctx.GetSessionVars().User != nil { + authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.Table.Schema.L, v.Table.Name.L, "", authErr) + } + case *ast.FlashBackTableStmt: + if b.ctx.GetSessionVars().User != nil { + authErr = ErrTableaccessDenied.GenWithStackByArgs("CREATE", b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.Table.Schema.L, v.Table.Name.L, "", authErr) + if b.ctx.GetSessionVars().User != nil { + authErr = ErrTableaccessDenied.GenWithStackByArgs("DROP", b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, v.Table.Name.L) + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.Table.Schema.L, v.Table.Name.L, "", authErr) + case *ast.FlashBackDatabaseStmt: + if b.ctx.GetSessionVars().User != nil { + authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.AuthUsername, + b.ctx.GetSessionVars().User.AuthHostname, v.DBName.L) + } + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.DBName.L, "", "", authErr) + b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.DBName.L, "", "", authErr) case *ast.FlashBackToTimestampStmt: // Flashback cluster can only be executed by user with `super` privilege. b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SuperPriv, "", "", "", nil) diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go index a33e0b75a764e..4ba4068ba88c2 100644 --- a/resourcemanager/schedule.go +++ b/resourcemanager/schedule.go @@ -55,14 +55,14 @@ func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { switch cmd { case scheduler.Downclock: concurrency := con - 1 - log.Info("[resource manager] downclock goroutine pool", + log.Debug("[resource manager] downclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) pool.Pool.Tune(concurrency) case scheduler.Overclock: concurrency := con + 1 - log.Info("[resource manager] overclock goroutine pool", + log.Debug("[resource manager] overclock goroutine pool", zap.Int("origin concurrency", con), zap.Int("concurrency", concurrency), zap.String("name", pool.Pool.Name())) diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index e6013d8b150ce..d126c1f7549f9 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -217,8 +217,8 @@ func TestBootstrapWithError(t *testing.T) { require.Equal(t, []byte("True"), row.GetBytes(0)) require.NoError(t, r.Close()) - mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill") - mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill_history") + mustExec(t, se, "SELECT * from mysql.tidb_background_subtask") + mustExec(t, se, "SELECT * from mysql.tidb_background_subtask_history") // Check tidb_ttl_table_status table mustExec(t, se, "SELECT * from mysql.tidb_ttl_table_status") @@ -238,8 +238,8 @@ func TestDDLTableCreateBackfillTable(t *testing.T) { // downgrade `mDDLTableVersion` m.SetDDLTables(meta.MDLTableVersion) - mustExec(t, se, "drop table mysql.tidb_ddl_backfill") - mustExec(t, se, "drop table mysql.tidb_ddl_backfill_history") + mustExec(t, se, "drop table mysql.tidb_background_subtask") + mustExec(t, se, "drop table mysql.tidb_background_subtask_history") err = txn.Commit(context.Background()) require.NoError(t, err) @@ -249,8 +249,8 @@ func TestDDLTableCreateBackfillTable(t *testing.T) { require.NoError(t, err) se = createSessionAndSetID(t, store) - mustExec(t, se, "select * from mysql.tidb_ddl_backfill") - mustExec(t, se, "select * from mysql.tidb_ddl_backfill_history") + mustExec(t, se, "select * from mysql.tidb_background_subtask") + mustExec(t, se, "select * from mysql.tidb_background_subtask_history") dom.Close() } diff --git a/session/session.go b/session/session.go index 678081077df70..84c95bdb3ef9c 100644 --- a/session/session.go +++ b/session/session.go @@ -3075,8 +3075,8 @@ var ( } // BackfillTables is a list of tables definitions used in dist reorg DDL. BackfillTables = []tableBasicInfo{ - {ddl.BackfillTableSQL, ddl.BackfillTableID}, - {ddl.BackfillHistoryTableSQL, ddl.BackfillHistoryTableID}, + {ddl.BackgroundSubtaskTableSQL, ddl.BackgroundSubtaskTableID}, + {ddl.BackgroundSubtaskHistoryTableSQL, ddl.BackgroundSubtaskHistoryTableID}, } mdlTable = "create table mysql.tidb_mdl_info(job_id BIGINT NOT NULL PRIMARY KEY, version BIGINT NOT NULL, table_ids text(65535));" ) @@ -3095,7 +3095,7 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) { } } -// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history. +// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_background_subtask and tidb_background_subtask_history. func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error { targetTables := DDLJobTables if targetVer == meta.BackfillTableVersion { diff --git a/session/session_test.go b/session/session_test.go index 4edb8ddce4c86..1426c689997d6 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -45,11 +45,11 @@ func TestInitMetaTable(t *testing.T) { } tbls := map[string]struct{}{ - "tidb_ddl_job": {}, - "tidb_ddl_reorg": {}, - "tidb_ddl_history": {}, - "tidb_ddl_backfill": {}, - "tidb_ddl_backfill_history": {}, + "tidb_ddl_job": {}, + "tidb_ddl_reorg": {}, + "tidb_ddl_history": {}, + "tidb_background_subtask": {}, + "tidb_background_subtask_history": {}, } for tbl := range tbls { @@ -83,12 +83,12 @@ func TestMetaTableRegion(t *testing.T) { require.NotEqual(t, ddlJobTableRegionID, ddlReorgTableRegionID) - ddlBackfillTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][0] - ddlBackfillTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill regions").Rows()[0][1] - require.Equal(t, ddlBackfillTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillTableID)) - ddlBackfillHistoryTableRegionID := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][0] - ddlBackfillHistoryTableRegionStartKey := tk.MustQuery("show table mysql.tidb_ddl_backfill_history regions").Rows()[0][1] - require.Equal(t, ddlBackfillHistoryTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackfillHistoryTableID)) + ddlBackfillTableRegionID := tk.MustQuery("show table mysql.tidb_background_subtask regions").Rows()[0][0] + ddlBackfillTableRegionStartKey := tk.MustQuery("show table mysql.tidb_background_subtask regions").Rows()[0][1] + require.Equal(t, ddlBackfillTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackgroundSubtaskTableID)) + ddlBackfillHistoryTableRegionID := tk.MustQuery("show table mysql.tidb_background_subtask_history regions").Rows()[0][0] + ddlBackfillHistoryTableRegionStartKey := tk.MustQuery("show table mysql.tidb_background_subtask_history regions").Rows()[0][1] + require.Equal(t, ddlBackfillHistoryTableRegionStartKey, fmt.Sprintf("%s_%d_", tablecodec.TablePrefix(), ddl.BackgroundSubtaskHistoryTableID)) require.NotEqual(t, ddlBackfillTableRegionID, ddlBackfillHistoryTableRegionID) } diff --git a/sessiontxn/BUILD.bazel b/sessiontxn/BUILD.bazel index 4691bfa4ff3af..b37a5e4ec04d3 100644 --- a/sessiontxn/BUILD.bazel +++ b/sessiontxn/BUILD.bazel @@ -27,7 +27,6 @@ go_test( "txn_rc_tso_optimize_test.go", ], flaky = True, - race = "on", shard_count = 50, deps = [ ":sessiontxn", diff --git a/telemetry/BUILD.bazel b/telemetry/BUILD.bazel index 56376f0031109..8b67ae1144e7b 100644 --- a/telemetry/BUILD.bazel +++ b/telemetry/BUILD.bazel @@ -51,7 +51,7 @@ go_library( go_test( name = "telemetry_test", - timeout = "short", + timeout = "moderate", srcs = [ "data_cluster_hardware_test.go", "data_feature_usage_test.go", @@ -62,6 +62,7 @@ go_test( ], embed = [":telemetry"], flaky = True, + shard_count = 30, deps = [ "//autoid_service", "//config", diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index f1a38f81ef0b8..278ef894e3474 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -589,6 +589,7 @@ func TestAddIndexAccelerationAndMDL(t *testing.T) { } func TestDistReorgUsage(t *testing.T) { + t.Skip("skip in order to pass the test quickly") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) usage, err := telemetry.GetFeatureUsage(tk.Session()) diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index c926ee231cd93..1e448cbeccf3f 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -35,6 +35,7 @@ go_library( go_test( name = "cache_test", + timeout = "short", srcs = [ "base_test.go", "infoschema_test.go", @@ -46,6 +47,7 @@ go_test( ], embed = [":cache"], flaky = True, + shard_count = 50, deps = [ "//infoschema", "//kv", diff --git a/ttl/client/BUILD.bazel b/ttl/client/BUILD.bazel index e842ad03a887b..31577c1161263 100644 --- a/ttl/client/BUILD.bazel +++ b/ttl/client/BUILD.bazel @@ -20,8 +20,10 @@ go_library( go_test( name = "client_test", + timeout = "short", srcs = ["command_test.go"], embed = [":client"], + shard_count = 5, deps = [ "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/ttl/metrics/BUILD.bazel b/ttl/metrics/BUILD.bazel index f0666b5c59530..f8fbe4ef8030b 100644 --- a/ttl/metrics/BUILD.bazel +++ b/ttl/metrics/BUILD.bazel @@ -13,7 +13,9 @@ go_library( go_test( name = "metrics_test", + timeout = "short", srcs = ["metrics_test.go"], embed = [":metrics"], + shard_count = 5, deps = ["@com_github_stretchr_testify//require"], ) diff --git a/ttl/session/BUILD.bazel b/ttl/session/BUILD.bazel index 2c9dae3fc426f..a98067cff9854 100644 --- a/ttl/session/BUILD.bazel +++ b/ttl/session/BUILD.bazel @@ -21,12 +21,14 @@ go_library( go_test( name = "session_test", + timeout = "short", srcs = [ "main_test.go", "session_test.go", "sysvar_test.go", ], flaky = True, + shard_count = 5, deps = [ ":session", "//sessionctx/variable", diff --git a/ttl/sqlbuilder/BUILD.bazel b/ttl/sqlbuilder/BUILD.bazel index 505e9ffcb3576..5eafc4a7748d3 100644 --- a/ttl/sqlbuilder/BUILD.bazel +++ b/ttl/sqlbuilder/BUILD.bazel @@ -19,11 +19,13 @@ go_library( go_test( name = "sqlbuilder_test", + timeout = "short", srcs = [ "main_test.go", "sql_test.go", ], flaky = True, + shard_count = 5, deps = [ ":sqlbuilder", "//kv", diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 55fe5dffc2b8a..6e80e4f298145 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -44,6 +44,7 @@ go_library( go_test( name = "ttlworker_test", + timeout = "moderate", srcs = [ "del_test.go", "job_manager_integration_test.go", @@ -56,6 +57,7 @@ go_test( embed = [":ttlworker"], flaky = True, race = "on", + shard_count = 30, deps = [ "//domain", "//infoschema",