Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: backport of (#38738) tidb_ddl_reorg commit conflicts #41115

Merged
merged 9 commits into from
Apr 11, 2023
10 changes: 6 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const (
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
// workers to do this in the DDL owner node.
//
// DDL owner thread
// DDL owner thread (also see comments before runReorgJob func)
// ^
// | (reorgCtx.doneCh)
// |
Expand Down Expand Up @@ -453,9 +453,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
err = dc.isReorgRunnable(reorgInfo.Job)
}

// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("element type", reorgInfo.currElement.TypeKey),
Expand All @@ -480,7 +481,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("take time", elapsedTime.String()))
zap.String("take time", elapsedTime.String()),
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
zap.NamedError("updateHandleError", err1))
return nil
}

Expand Down
34 changes: 29 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,30 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
var rh *reorgHandler
if w.concurrentDDL {
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = errors.Trace(err1)
return
}
defer w.sessPool.put(sctx)
sess := newSession(sctx)
err = sess.begin()
if err != nil {
return
}
defer sess.rollback()
txn, err1 := sess.txn()
if err1 != nil {
err = errors.Trace(err1)
return
}
newMeta := meta.NewMeta(txn)
rh = newReorgHandler(newMeta, newSession(sctx), w.concurrentDDL)
} else {
rh = newReorgHandler(t, w.sess, w.concurrentDDL)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down Expand Up @@ -1273,8 +1296,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return w.reformatErrors(err)
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
Expand Down Expand Up @@ -1358,8 +1381,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords))
warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords))
// Optimize for few warnings!
warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
warningsCountMap := make(map[errors.ErrorID]int64, 2)
for _, rowRecord := range rowRecords {
taskCtx.scanCount++

Expand Down
37 changes: 37 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2426,3 +2426,40 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustExec("admin check table t")
}

func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) {
store := testkit.CreateMockStore(t)
tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set @@global.tidb_enable_metadata_lock=0")
defer tk0.MustExec("set @@global.tidb_enable_metadata_lock = default")
tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = off")
defer tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = default")
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}

func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}
19 changes: 19 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4537,6 +4537,25 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) {
` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`))
}

func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
t.Skip("waiting for supporting Modify Partition Column again")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "truncWarn"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`set sql_mode = default`)
tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`)
tk.MustExec(`insert into t values ("123456"),(" 654321")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '")
tk.MustExec(`set sql_mode = ''`)
tk.MustExec(`alter table t modify a varchar(5)`)
// Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

func TestIssue40135Ver2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
13 changes: 13 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,19 @@ func (s *session) session() sessionctx.Context {
return s.Context
}

func (s *session) runInTxn(f func(*session) error) (err error) {
err = s.begin()
if err != nil {
return err
}
err = f(s)
if err != nil {
s.rollback()
return
}
return errors.Trace(s.commit())
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
iterator, err := GetLastHistoryDDLJobsIterator(m)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
if err != nil {
return err
}
CleanupDDLReorgHandles(job, w, t)
asyncNotify(d.ddlJobDoneCh)
return nil
}
Expand Down
25 changes: 24 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,30 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m
func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
var rh *reorgHandler
if w.concurrentDDL {
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = err1
return
}
defer w.sessPool.put(sctx)
sess := newSession(sctx)
err = sess.begin()
if err != nil {
return
}
defer sess.rollback()
txn, err1 := sess.txn()
if err1 != nil {
err = err1
return
}
newMeta := meta.NewMeta(txn)
rh = newReorgHandler(newMeta, sess, w.concurrentDDL)
} else {
rh = newReorgHandler(t, w.sess, w.concurrentDDL)
}

failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -905,7 +929,6 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}
})

rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
76 changes: 39 additions & 37 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,8 @@ func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, st
return
}

// updateDDLReorgStartHandle update the startKey of the handle.
func updateDDLReorgStartHandle(sess *session, job *model.Job, element *meta.Element, startKey kv.Key) error {
sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s where job_id = %d",
element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), job.ID)
_, err := sess.execute(context.Background(), sql, "update_start_handle")
return err
}

// updateDDLReorgHandle update startKey, endKey physicalTableID and element of the handle.
// Caller should wrap this in a separate transaction, to avoid conflicts.
func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s, end_key = %s, physical_id = %d where job_id = %d",
element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID, jobID)
Expand All @@ -447,28 +440,48 @@ func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv
}

// initDDLReorgHandle initializes the handle for ddl reorg.
func initDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
sql := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)",
func initDDLReorgHandle(s *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
del := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", jobID)
ins := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)",
jobID, element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID)
_, err := sess.execute(context.Background(), sql, "update_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), del, "init_handle")
if err != nil {
logutil.BgLogger().Info("initDDLReorgHandle failed to delete", zap.Int64("jobID", jobID), zap.Error(err))
}
_, err = se.execute(context.Background(), ins, "init_handle")
return err
})
}

// deleteDDLReorgHandle deletes the handle for ddl reorg.
func removeDDLReorgHandle(sess *session, job *model.Job, elements []*meta.Element) error {
func removeDDLReorgHandle(s *session, job *model.Job, elements []*meta.Element) error {
if len(elements) == 0 {
return nil
}
sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
_, err := sess.execute(context.Background(), sql, "remove_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "remove_handle")
return err
})
}

// removeReorgElement removes the element from ddl reorg, it is the same with removeDDLReorgHandle, only used in failpoint
func removeReorgElement(sess *session, job *model.Job) error {
func removeReorgElement(s *session, job *model.Job) error {
sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
_, err := sess.execute(context.Background(), sql, "remove_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "remove_handle")
return err
})
}

// cleanDDLReorgHandles removes handles that are no longer needed.
func cleanDDLReorgHandles(s *session, job *model.Job) error {
sql := "delete from mysql.tidb_ddl_reorg where job_id = " + strconv.FormatInt(job.ID, 10)
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "clean_handle")
return err
})
}

func wrapKey2String(key []byte) string {
Expand Down Expand Up @@ -498,12 +511,13 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) {

// MoveJobFromQueue2Table move existing DDLs in queue to table.
func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error {
sess, err := d.sessPool.get()
sctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(sess)
return runInTxn(newSession(sess), func(se *session) error {
defer d.sessPool.put(sctx)
sess := newSession(sctx)
return sess.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -562,12 +576,13 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error {

// MoveJobFromTable2Queue move existing DDLs in table to queue.
func (d *ddl) MoveJobFromTable2Queue() error {
sess, err := d.sessPool.get()
sctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(sess)
return runInTxn(newSession(sess), func(se *session) error {
defer d.sessPool.put(sctx)
sess := newSession(sctx)
return sess.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -614,16 +629,3 @@ func (d *ddl) MoveJobFromTable2Queue() error {
return t.SetConcurrentDDL(false)
})
}

func runInTxn(se *session, f func(*session) error) (err error) {
err = se.begin()
if err != nil {
return err
}
err = f(se)
if err != nil {
se.rollback()
return
}
return errors.Trace(se.commit())
}
Loading