From 532689cb0003148128463da0c00c0f939c82855a Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 10 Feb 2023 11:13:59 +0800 Subject: [PATCH 1/7] lightning: fix panic when user cancel (#41236) close pingcap/tidb#41235 --- br/pkg/lightning/backend/local/engine.go | 10 ++++++++++ br/pkg/lightning/lightning.go | 5 +++++ br/pkg/lightning/restore/table_restore.go | 14 +++++++++++++- br/tests/lightning_checkpoint_chunks/run.sh | 5 +++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 04ac7dce7a7fe..745454a372b18 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -32,6 +32,7 @@ import ( "github.com/google/btree" "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" @@ -1224,6 +1225,15 @@ func (w *Writer) flushKVs(ctx context.Context) error { if err != nil { return errors.Trace(err) } + + failpoint.Inject("orphanWriterGoRoutine", func() { + _ = common.KillMySelf() + // mimic we meet context cancel error when `addSST` + <-ctx.Done() + time.Sleep(5 * time.Second) + failpoint.Return(errors.Trace(ctx.Err())) + }) + err = w.addSST(ctx, meta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index ae83d41efd7f6..6ed4b40335b96 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -598,6 +598,11 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti o.logger.Error("restore failed", log.ShortError(err)) return errors.Trace(err) } + + failpoint.Inject("orphanWriterGoRoutine", func() { + // don't exit too quickly to expose panic + defer time.Sleep(time.Second * 10) + }) defer procedure.Close() err = procedure.Run(ctx) diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 6b1372c0bca9e..a3562cb436a5c 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -501,6 +501,7 @@ func (tr *TableRestore) restoreEngine( metrics, _ := metric.FromContext(ctx) // Restore table data +ChunkLoop: for chunkIndex, chunk := range cp.Chunks { if rc.status != nil && rc.status.backend == config.BackendTiDB { rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset) @@ -524,9 +525,15 @@ func (tr *TableRestore) restoreEngine( } checkFlushLock.Unlock() + failpoint.Inject("orphanWriterGoRoutine", func() { + if chunkIndex > 0 { + <-pCtx.Done() + } + }) + select { case <-pCtx.Done(): - return nil, pCtx.Err() + break ChunkLoop default: } @@ -615,6 +622,11 @@ func (tr *TableRestore) restoreEngine( } wg.Wait() + select { + case <-pCtx.Done(): + return nil, pCtx.Err() + default: + } // Report some statistics into the log for debugging. totalKVSize := uint64(0) diff --git a/br/tests/lightning_checkpoint_chunks/run.sh b/br/tests/lightning_checkpoint_chunks/run.sh index 35cabe0aadfc5..48d24ca405070 100755 --- a/br/tests/lightning_checkpoint_chunks/run.sh +++ b/br/tests/lightning_checkpoint_chunks/run.sh @@ -48,6 +48,11 @@ for i in $(seq "$CHUNK_COUNT"); do done done +PKG="github.com/pingcap/tidb/br/pkg/lightning" +export GO_FAILPOINTS="$PKG/backend/local/orphanWriterGoRoutine=return();$PKG/restore/orphanWriterGoRoutine=return();$PKG/orphanWriterGoRoutine=return()" +# test won't panic +do_run_lightning config + # Set the failpoint to kill the lightning instance as soon as # one file (after writing totally $ROW_COUNT rows) is imported. # If checkpoint does work, this should kill exactly $CHUNK_COUNT instances of lightnings. From 95e660f3f43ac33350039261caa927eca6cc9a7b Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 10 Feb 2023 12:15:59 +0800 Subject: [PATCH 2/7] util/gpool/spmc: add a sleep to a simple `for` (#41240) close pingcap/tidb#41205 --- util/gpool/spmc/spmcpool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index d1a3bbb80c4aa..8afdf9db0c253 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -249,6 +249,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() { if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 { return } + time.Sleep(5 * time.Millisecond) } } From 893cdb4495e88d9377bd8b5d3711c0bd053191f1 Mon Sep 17 00:00:00 2001 From: Zhigao Tong Date: Fri, 10 Feb 2023 14:05:59 +0800 Subject: [PATCH 3/7] planner: fix calculating TiFlash stream count (#41221) ref pingcap/tidb#40123 --- planner/core/optimizer.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index a5db249fd2be1..bc298dbc404e0 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "runtime" "strconv" "github.com/pingcap/errors" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/tidb/types" utilhint "github.com/pingcap/tidb/util/hint" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tipb/go-tipb" @@ -747,14 +749,20 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx s for _, row := range rows { if row[4].GetString() == "cpu-logical-cores" { logicalCpus, err := strconv.Atoi(row[5].GetString()) - if err == nil && logicalCpus > 0 && uint64(logicalCpus) < minLogicalCores { - minLogicalCores = uint64(logicalCpus) + if err == nil && logicalCpus > 0 { + minLogicalCores = mathutil.Min(minLogicalCores, uint64(logicalCpus)) } } } // No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness if minLogicalCores > 1 && minLogicalCores != initialMaxCores { - return true, minLogicalCores / 2 + if runtime.GOARCH == "amd64" { + // In most x86-64 platforms, `Thread(s) per core` is 2 + return true, minLogicalCores / 2 + } + // ARM cpus don't implement Hyper-threading. + return true, minLogicalCores + // Other platforms are too rare to consider } return false, 0 From c6e6d621e250e8198312145badb8bfa93b9ab7ba Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 10 Feb 2023 15:22:00 +0800 Subject: [PATCH 4/7] ddl: check the key existence on original index (#40749) close pingcap/tidb#40730 --- ddl/index.go | 15 ++ ddl/index_merge_tmp.go | 72 +++--- ddl/indexmergetest/BUILD.bazel | 1 + ddl/indexmergetest/merge_test.go | 329 ++++++++++++++++++++++++++ executor/insert.go | 30 +-- executor/replace.go | 17 +- table/tables/index.go | 277 +++++++++++++--------- table/tables/mutation_checker.go | 34 ++- table/tables/mutation_checker_test.go | 4 +- tablecodec/tablecodec.go | 245 +++++++++++++------ tablecodec/tablecodec_test.go | 102 ++++++-- 11 files changed, 850 insertions(+), 276 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 4785b12e41090..833a825e24e30 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -962,6 +962,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + + failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging && + MockDMLExecutionStateMerging != nil { + MockDMLExecutionStateMerging() + } + }) + sctx, err1 := w.sessPool.get() if err1 != nil { err = err1 @@ -1789,6 +1798,12 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC // MockDMLExecution is only used for test. var MockDMLExecution func() +// MockDMLExecutionMerging is only used for test. +var MockDMLExecutionMerging func() + +// MockDMLExecutionStateMerging is only used for test. +var MockDMLExecutionStateMerging func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 6e3e98b04380b..029c87542de11 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" @@ -198,6 +199,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return nil }) + failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecutionMerging != nil { + MockDMLExecutionMerging() + } + }) logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000) return } @@ -252,40 +259,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) - if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete { - // For 'm' version kvs, they are double-written. - // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. - return true, nil + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) + if err != nil { + return false, err } + tempIdxVal = tempIdxVal.FilterOverwritten() + + // Extract the operations on the original index and replay them later. + for _, elem := range tempIdxVal { + if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete { + // For 'm' version kvs, they are double-written. + // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. + continue + } - if handle == nil { - // If the handle is not found in the value of the temp index, it means - // 1) This is not a deletion marker, the handle is in the key or the origin value. - // 2) This is a deletion marker, but the handle is in the key of temp index. - handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns)) - if err != nil { - return false, err + if elem.Handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } } - } - originIdxKey := make([]byte, len(indexKey)) - copy(originIdxKey, indexKey) - tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) + originIdxKey := make([]byte, len(indexKey)) + copy(originIdxKey, indexKey) + tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) - idxRecord := &temporaryIndexRecord{ - handle: handle, - delete: isDelete, - unique: unique, - skip: false, - } - if !isDelete { - idxRecord.vals = originVal - idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal) + idxRecord := &temporaryIndexRecord{ + handle: elem.Handle, + delete: elem.Delete, + unique: elem.Distinct, + skip: false, + } + if !elem.Delete { + idxRecord.vals = elem.Value + idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value) + } + w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) + w.originIdxKeys = append(w.originIdxKeys, originIdxKey) + w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) } - w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) - w.originIdxKeys = append(w.originIdxKeys, originIdxKey) - w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) + lastKey = indexKey return true, nil }) diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel index 5f6e4215f664e..b70146ae8d461 100644 --- a/ddl/indexmergetest/BUILD.bazel +++ b/ddl/indexmergetest/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "//ddl/internal/callback", "//ddl/testutil", "//domain", + "//errno", "//kv", "//meta/autoid", "//parser/model", diff --git a/ddl/indexmergetest/merge_test.go b/ddl/indexmergetest/merge_test.go index f63164bafe871..f74db4e0b9eb9 100644 --- a/ddl/indexmergetest/merge_test.go +++ b/ddl/indexmergetest/merge_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" @@ -529,3 +530,331 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) } + +func TestAddIndexMergeInsertOnMerging(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + _, err = tk1.Exec("insert into t values (5, 5)") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (5, 7)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 7") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' + _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") + assert.NoError(t, err) // The row should be normally updated to (6, 5). + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeReplaceOnMerging(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0);") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where b = 5;") + assert.NoError(t, err) + } + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("replace into t values (5, 8);") + assert.NoError(t, err) + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateWriteOnly: + _, err = tk1.Exec("delete from t where b = 5") + assert.NoError(t, err) + _, err := tk1.Exec("set @@tidb_constraint_check_in_place = true;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + _, err = tk1.Exec("set @@tidb_constraint_check_in_place = false;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) +} + +func TestAddIndexMergeReplaceDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("insert into t values (1, 1);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionMerging = func() { + _, err := tk1.Exec("replace into t values (2, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} + +func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, c char(10));") + tk.MustExec("insert into t values (1, 'a');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'a');") + assert.NoError(t, err) + _, err = tk1.Exec("replace into t values (3, 'a');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + // It is too late to remove the duplicated index value. + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustGetErrCode("alter table t add unique index idx(c);", errno.ErrDupEntry) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("3 a")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} + +func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id_a bigint, id_b char(20), c char(20), primary key (id_a, id_b));") + tk.MustExec("insert into t values (1, 'id_1', 'char_1');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'id_2', 'char_2');") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (3, 'id_3', 'char_3');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(c);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 id_1 char_1", "2 id_2 char_2", "3 id_3 char_3")) +} + +func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert ignore into t values (1, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("insert ignore into t values (2, 2);") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = null where id = 1;") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 ", "2 2")) +} + +func TestAddIndexMultipleDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + tk.MustExec("insert into t values (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("delete from t where id in (4, 5, 6);") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err := tk1.Exec("delete from t where id in (2, 3);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} diff --git a/executor/insert.go b/executor/insert.go index 9d958d5d8bd2f..2b6e45fb018d7 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -152,11 +153,11 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) + if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp { + // If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue. + // Since this function is an optimization, we can skip prefetching the rows referenced by + // temp indexes. + continue } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { @@ -251,26 +252,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return err } - // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end - // of value, So if return a key we check and skip deleted key. - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return err + if handle == nil { + continue } - err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err != nil { if kv.IsErrNotFound(err) { diff --git a/executor/replace.go b/executor/replace.go index bfc70ebc4451c..801ff40ebd252 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -176,22 +177,12 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return false, false, err } - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return false, true, err + if handle == nil { + continue } rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { diff --git a/table/tables/index.go b/table/tables/index.go index 57c6498e698fd..8b8ce6660ca1c 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,7 +16,6 @@ package tables import ( "context" - "errors" "sync" "github.com/pingcap/tidb/kv" @@ -239,18 +238,21 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic if !distinct || skipCheck || opt.Untouched { + val := idxVal if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) if err != nil { return nil, err } if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) if err != nil { return nil, err } @@ -280,45 +282,49 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } - if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + // The index key value is not found or deleted. + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { + val := idxVal lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey TempIndexKeyState if keyIsTempIdxKey { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } else { - if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + } + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, + keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) + if err != nil { + return nil, err } if lazyCheck { var flags []kv.FlagsOp - if needPresumeKey != KeyInTempIndexIsDeleted { + if needPresumeNotExists { flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) } - err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) + err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) } else { - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) } if err != nil { return nil, err } if len(tempKey) > 0 { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { - err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + if lazyCheck && needPresumeNotExists { + err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) } else { - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) } if err != nil { return nil, err @@ -338,8 +344,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue continue } - if keyIsTempIdxKey { - value = tablecodec.DecodeTempIndexOriginValue(value) + if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) if err != nil { @@ -350,6 +356,26 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, nil } +func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, + h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { + var uniqueTempKey kv.Key + if keyIsTempIdxKey { + uniqueTempKey = key + } else if len(tempKey) > 0 { + uniqueTempKey = tempKey + } else { + return true, nil + } + foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon) + if err != nil { + return false, err + } + if foundKey && dupHandle != nil && !dupHandle.Equal(h) { + return false, kv.ErrKeyExists + } + return false, nil +} + // Delete removes the entry for handle h and indexedValues from KV index. func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { indexedValues := c.getIndexedValue(indexedValue) @@ -360,6 +386,16 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + var originTempVal []byte + if len(tempKey) > 0 && c.idxInfo.Unique { + // Get the origin value of the unique temporary index key. + // Append the new delete operations to the end of the origin value. + originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) + if err != nil { + return err + } + } + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} if distinct { if len(key) > 0 { @@ -369,7 +405,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + // Append to the end of the origin value for distinct value. + tempVal := tempValElem.Encode(originTempVal) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -383,7 +420,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + tempVal := tempValElem.Encode(nil) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -484,50 +521,116 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if err != nil { return false, nil, err } - - var ( - tempKey []byte - keyVer byte - ) // If index current is in creating status and using ingest mode, we need first // check key exist status in temp index. - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer != TempIndexKeyTypeNone { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { - return false, nil, err - } - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - continue - } + key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) + if len(tempKey) > 0 { + key = tempKey } - - value, err := txn.Get(context.TODO(), key) - if kv.IsErrNotFound(err) { - return false, nil, nil + foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil || !foundKey { + return false, nil, err } - if err != nil { + if dupHandle != nil && !dupHandle.Equal(h) { return false, nil, err } + continue + } + return true, h, nil +} + +// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, + txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { + return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon) + } + // The index key is not from temp index. + val, err := getKeyInTxn(ctx, txn, key) + if err != nil || len(val) == 0 { + return false, nil, err + } + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + return true, h, err + } + return true, nil, nil +} - // For distinct index, the value of key is handle. +func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, + txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + tempRawVal, err := getKeyInTxn(ctx, txn, tempKey) + if err != nil { + return false, nil, err + } + if tempRawVal == nil { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } if distinct { - var handle kv.Handle - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) if err != nil { return false, nil, err } - if !handle.Equal(h) { - return true, handle, kv.ErrKeyExists + return true, originHandle, err + } + return false, nil, nil + } + tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) + if err != nil { + return false, nil, err + } + curElem := tempVal.Current() + if curElem.Delete { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return false, nil, err + } + if originHandle.Equal(curElem.Handle) { + // The key has been deleted. This is not a duplicated key. + return false, nil, nil } + // The inequality means multiple modifications happened in the same key. + // We use the handle in origin index value to check if the row exists. + recPrefix := tablecodec.GenTableRecordPrefix(tableID) + rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) + rowVal, err := getKeyInTxn(ctx, txn, rowKey) + if err != nil || rowVal == nil { + return false, nil, err + } + // The row exists. This is the duplicated key. + return true, originHandle, nil } + return false, nil, nil } - return true, h, nil + // The value in temp index is not the delete marker. + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon) + return true, h, err + } + return true, nil, nil +} + +// getKeyInTxn gets the value of the key in the transaction, and ignore the ErrNotExist error. +func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { + val, err := txn.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + return val, nil } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { @@ -605,57 +708,3 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } - -// TempIndexKeyState is the state of the temporary index key. -type TempIndexKeyState byte - -const ( - // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown TempIndexKeyState = iota - // KeyInTempIndexNotExist the key is not exist in temp index. - KeyInTempIndexNotExist - // KeyInTempIndexIsDeleted the key is marked deleted in temp index. - KeyInTempIndexIsDeleted - // KeyInTempIndexIsItself the key is correlated to itself in temp index. - KeyInTempIndexIsItself - // KeyInTempIndexConflict the key is conflict in temp index. - KeyInTempIndexConflict -) - -// KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { - // Only check temp index key. - if !tablecodec.IsTempIndexKey(key) { - return KeyInTempIndexUnknown, nil, nil - } - value, err := txn.Get(ctx, key) - if kv.IsErrNotFound(err) { - return KeyInTempIndexNotExist, nil, nil - } - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - - // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. - if len(value) < 1 { - return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") - } - - if tablecodec.CheckTempIndexValueIsDelete(value) { - return KeyInTempIndexIsDeleted, nil, nil - } - - // Check if handle equal. - var handle kv.Handle - if distinct { - originVal := tablecodec.DecodeTempIndexOriginValue(value) - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - if !handle.Equal(h) { - return KeyInTempIndexConflict, handle, kv.ErrKeyExists - } - } - return KeyInTempIndexIsItself, handle, nil -} diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 328989d88ad3f..52b517eb93050 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -106,7 +106,7 @@ func CheckDataConsistency( // } if rowInsertion.key != nil { - if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta().Name.O); err != nil { + if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta()); err != nil { return errors.Trace(err) } } @@ -123,7 +123,7 @@ func CheckDataConsistency( // in row insertions and index insertions are consistent. // A PUT_index implies a PUT_row with the same handle. // Deletions are not checked since the values of deletions are unknown -func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tableName string) error { +func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tblInfo *model.TableInfo) error { var insertionHandle kv.Handle var err error @@ -154,7 +154,18 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in indexHandle kv.Handle ) if idxID != m.indexID { - value = tablecodec.DecodeTempIndexOriginValue(m.value) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + var tempIdxVal tablecodec.TempIndexValue + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle) + if err != nil { + return err + } + if !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value + } if len(value) == 0 { // Skip the deleted operation values. continue @@ -170,7 +181,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in } // NOTE: handle type can be different, see issue 29520 if indexHandle.IsInt() == insertionHandle.IsInt() && indexHandle.Compare(insertionHandle) != 0 { - err = ErrInconsistentHandle.GenWithStackByArgs(tableName, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) + err = ErrInconsistentHandle.GenWithStackByArgs(tblInfo.Name, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) logutil.BgLogger().Error("inconsistent handle in index and record insertions", zap.Error(err)) return err } @@ -209,9 +220,20 @@ func checkIndexKeys( return errors.New("index not found") } + var isTmpIdxValAndDeleted bool // If this is temp index data, need remove last byte of index data. if idxID != m.indexID { - value = append(value, m.value[:len(m.value)-1]...) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle) + if err != nil { + return err + } + curElem := tmpVal.Current() + isTmpIdxValAndDeleted = curElem.Delete + value = append(value, curElem.Value...) } else { value = append(value, m.value...) } @@ -245,7 +267,7 @@ func checkIndexKeys( } // When it is in add index new backfill state. - if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) { + if len(value) == 0 || isTmpIdxValAndDeleted { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 43fb35c21a5b6..4c44e90a7d244 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -310,9 +310,9 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { require.Nil(t, err) rowMutation := mutation{key: rowKey, value: rowValue} corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue} - err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.Nil(t, err) - err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.NotNil(t, err) } } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 249a4f35495a0..394277ba2768c 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1159,8 +1159,8 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } -// IsTempIndexKey check whether the input key is for a temp index. -func IsTempIndexKey(indexKey []byte) bool { +// CheckTempIndexKey checks whether the input key is for a temp index. +func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { var ( indexIDKey []byte indexID int64 @@ -1170,101 +1170,206 @@ func IsTempIndexKey(indexKey []byte) bool { indexIDKey = indexKey[prefixLen : prefixLen+8] indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) tempIndexID = int64(TempIndexPrefix) | indexID - return tempIndexID == indexID + return tempIndexID == indexID, indexID & IndexIDMask } // TempIndexValueFlag is the flag of temporary index value. type TempIndexValueFlag byte const ( - // TempIndexValueFlagNormal means the following value is the normal index value. + // TempIndexValueFlagNormal means the following value is a distinct the normal index value. TempIndexValueFlagNormal TempIndexValueFlag = iota - // TempIndexValueFlagDeleted means this is a representation of a "delete" operation. + // TempIndexValueFlagNonDistinctNormal means the following value is the non-distinct normal index value. + TempIndexValueFlagNonDistinctNormal + // TempIndexValueFlagDeleted means the following value is the distinct and deleted index value. TempIndexValueFlagDeleted + // TempIndexValueFlagNonDistinctDeleted means the following value is the non-distinct deleted index value. + TempIndexValueFlagNonDistinctDeleted ) -// EncodeTempIndexValue encodes the value of temporary index. -// Note: this function changes the input value. -func EncodeTempIndexValue(value []byte, keyVer byte) []byte { - value = append(value, 0) - copy(value[1:], value[:len(value)-1]) - value[0] = byte(TempIndexValueFlagNormal) // normal flag + value + tempKeyVer - value = append(value, keyVer) - return value -} +// TempIndexValue is the value of temporary index. +// It contains one or more element, each element represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +// +// The temp index value is encoded as: +// - [element 1][element 2]...[element n] {for distinct values} +// - [element 1] {for non-distinct values} +type TempIndexValue []*TempIndexValueElem -// EncodeTempIndexValueDeletedUnique encodes the value of temporary index for unique index. -func EncodeTempIndexValueDeletedUnique(handle kv.Handle, keyVer byte) []byte { - var hEncoded []byte - var hLen int - if handle.IsInt() { - var data [8]byte - binary.BigEndian.PutUint64(data[:], uint64(handle.IntValue())) - hEncoded = data[:] - hLen = 8 - } else { - hEncoded = handle.Encoded() - hLen = len(hEncoded) - } - val := make([]byte, 0, 1+hLen+1) // deleted flag + handle + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, hEncoded...) - val = append(val, keyVer) - return val +// IsEmpty checks whether the value is empty. +func (v TempIndexValue) IsEmpty() bool { + return len(v) == 0 } -// EncodeTempIndexValueDeleted encodes the delete operation on origin index to a value for temporary index. -func EncodeTempIndexValueDeleted(keyVer byte) []byte { - // Handle is not needed because it is already in the key. - val := make([]byte, 0, 2) // deleted flag + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, keyVer) - return val +// Current returns the current latest temp index value. +func (v TempIndexValue) Current() *TempIndexValueElem { + return v[len(v)-1] } -// DecodeTempIndexValue decodes the value of temporary index. -func DecodeTempIndexValue(value []byte, isCommonHandle bool) (originVal []byte, handle kv.Handle, isDelete bool, isUnique bool, keyVer byte) { - if len(value) == 0 { - return nil, nil, false, false, 0 +// FilterOverwritten is used by the temp index merge process to remove the overwritten index operations. +// For example, the value {temp_idx_key -> [h2, h2d, h3, h1d]} recorded four operations on the original index. +// Since 'h2d' overwrites 'h2', we can remove 'h2' from the value. +func (v TempIndexValue) FilterOverwritten() TempIndexValue { + if len(v) <= 1 || !v[0].Distinct { + return v } - switch TempIndexValueFlag(value[0]) { - case TempIndexValueFlagNormal: - originVal = value[1 : len(value)-1] - keyVer = value[len(value)-1] - case TempIndexValueFlagDeleted: - isDelete = true - if len(value) == 2 { - keyVer = value[1] + occurred := kv.NewHandleMap() + for i := len(v) - 1; i >= 0; i-- { + if _, ok := occurred.Get(v[i].Handle); !ok { + occurred.Set(v[i].Handle, struct{}{}) } else { - isUnique = true - if isCommonHandle { - handle, _ = kv.NewCommonHandle(value[1 : len(value)-1]) + v[i] = nil + } + } + ret := v[:0] + for _, elem := range v { + if elem != nil { + ret = append(ret, elem) + } + } + return ret +} + +// TempIndexValueElem represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +type TempIndexValueElem struct { + Value []byte + Handle kv.Handle + KeyVer byte + Delete bool + Distinct bool +} + +// Encode encodes the temp index value. +func (v *TempIndexValueElem) Encode(buf []byte) []byte { + if v.Delete { + if v.Distinct { + handle := v.Handle + var hEncoded []byte + var hLen uint16 + if handle.IsInt() { + hEncoded = codec.EncodeUint(hEncoded, uint64(handle.IntValue())) + hLen = 8 } else { - handle = decodeIntHandleInIndexValue(value[1 : len(value)-1]) + hEncoded = handle.Encoded() + hLen = uint16(len(hEncoded)) + } + // flag + handle length + handle + temp key version + if buf == nil { + buf = make([]byte, 0, hLen+4) } - keyVer = value[len(value)-1] + buf = append(buf, byte(TempIndexValueFlagDeleted)) + buf = append(buf, byte(hLen>>8), byte(hLen)) + buf = append(buf, hEncoded...) + buf = append(buf, v.KeyVer) + return buf } - } - return + // flag + temp key version + if buf == nil { + buf = make([]byte, 0, 2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctDeleted)) + buf = append(buf, v.KeyVer) + return buf + } + if v.Distinct { + // flag + value length + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+4) + } + buf = append(buf, byte(TempIndexValueFlagNormal)) + vLen := uint16(len(v.Value)) + buf = append(buf, byte(vLen>>8), byte(vLen)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf + } + // flag + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctNormal)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf } -// CheckTempIndexValueIsDelete checks whether the value is a delete operation. -func CheckTempIndexValueIsDelete(value []byte) bool { - if len(value) == 0 { - return false +// DecodeTempIndexValue decodes the temp index value. +func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) { + var ( + values []*TempIndexValueElem + err error + ) + for len(value) > 0 { + v := &TempIndexValueElem{} + value, err = v.DecodeOne(value, isCommonHandle) + if err != nil { + return nil, err + } + values = append(values, v) } - return TempIndexValueFlag(value[0]) == TempIndexValueFlagDeleted + return values, nil } -// DecodeTempIndexOriginValue decodes the value of origin index from a temp index value. -func DecodeTempIndexOriginValue(value []byte) []byte { - if len(value) == 0 { - return nil +// DecodeOne decodes one temp index value element. +func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) { + flag := TempIndexValueFlag(b[0]) + b = b[1:] + switch flag { + case TempIndexValueFlagNormal: + vLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + v.Value = b[:vLen] + b = b[vLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle) + return b, err + case TempIndexValueFlagNonDistinctNormal: + v.Value = b[:len(b)-1] + v.KeyVer = b[len(b)-1] + return nil, nil + case TempIndexValueFlagDeleted: + hLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + if isCommonHandle { + v.Handle, _ = kv.NewCommonHandle(b[:hLen]) + } else { + v.Handle = decodeIntHandleInIndexValue(b[:hLen]) + } + b = b[hLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Delete = true + return b, nil + case TempIndexValueFlagNonDistinctDeleted: + v.KeyVer = b[0] + b = b[1:] + v.Delete = true + return b, nil + default: + return nil, errors.New("invalid temp index value") } - if TempIndexValueFlag(value[0]) == TempIndexValueFlagNormal { - return value[1 : len(value)-1] +} + +// TempIndexValueIsUntouched returns true if the value is untouched. +// All the temp index value has the suffix of temp key version. +// All the temp key versions differ from the uncommitted KV flag. +func TempIndexValueIsUntouched(b []byte) bool { + if len(b) > 0 && b[len(b)-1] == kv.UnCommitIndexKVFlag { + return true } - return nil + return false } // GenIndexValuePortal is the portal for generating index value. diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 231d58cf18bd3..adc4ccc78c13b 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -617,26 +617,84 @@ func TestTempIndexValueCodec(t *testing.T) { require.NoError(t, err) encodedValueCopy := make([]byte, len(encodedValue)) copy(encodedValueCopy, encodedValue) - tempIdxVal := EncodeTempIndexValue(encodedValue, 'b') - originVal, handle, isDelete, unique, keyVer := DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.False(t, isDelete || unique) - require.Equal(t, keyVer, byte('b')) - require.EqualValues(t, encodedValueCopy, originVal) - - tempIdxVal = EncodeTempIndexValueDeletedUnique(kv.IntHandle(100), 'm') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Equal(t, handle.IntValue(), int64(100)) - require.True(t, isDelete) - require.True(t, unique) - require.Equal(t, keyVer, byte('m')) - require.Empty(t, originVal) - - tempIdxVal = EncodeTempIndexValueDeleted('b') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.True(t, isDelete) - require.False(t, unique) - require.Equal(t, keyVer, byte('b')) - require.Empty(t, originVal) + + tempIdxVal := TempIndexValueElem{ + Value: encodedValue, + KeyVer: 'b', + } + val := tempIdxVal.Encode(nil) + var newTempIdxVal TempIndexValueElem + remain, err := newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + idxVal := EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.Equal(t, newTempIdxVal.Handle.IntValue(), int64(100)) + newTempIdxVal.Handle = nil + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + Distinct: true, + Handle: kv.IntHandle(100), + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + // Test multiple temp index value elements. + idxVal = EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + tempIdxVal2 := TempIndexValueElem{ + Handle: kv.IntHandle(100), + KeyVer: 'm', + Distinct: true, + Delete: true, + } + idxVal3 := EncodeHandleInUniqueIndexValue(kv.IntHandle(101), false) + tempIdxVal3 := TempIndexValueElem{ + Value: idxVal3, + KeyVer: 'm', + Distinct: true, + } + val = tempIdxVal.Encode(nil) + val = tempIdxVal2.Encode(val) + val = tempIdxVal3.Encode(val) + var result TempIndexValue + result, err = DecodeTempIndexValue(val, false) + require.NoError(t, err) + require.Equal(t, 3, len(result)) + require.Equal(t, result[0].Handle.IntValue(), int64(100)) + require.Equal(t, result[1].Handle.IntValue(), int64(100)) + require.Equal(t, result[2].Handle.IntValue(), int64(101)) } From d901aa1b977f16f27eaf8182aa66fa4e366a322b Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Fri, 10 Feb 2023 15:22:07 +0800 Subject: [PATCH 5/7] types: use `mathutil.Max/mathutil.Min` instead of `myMax/myMin` (#41192) --- types/helper.go | 28 ---------------------------- types/mydecimal.go | 33 +++++++++++++++++---------------- 2 files changed, 17 insertions(+), 44 deletions(-) diff --git a/types/helper.go b/types/helper.go index 91c7733906b24..e3cc4aaf68cd9 100644 --- a/types/helper.go +++ b/types/helper.go @@ -114,34 +114,6 @@ func isPunctuation(c byte) bool { return (c >= 0x21 && c <= 0x2F) || (c >= 0x3A && c <= 0x40) || (c >= 0x5B && c <= 0x60) || (c >= 0x7B && c <= 0x7E) } -func myMax(a, b int) int { - if a > b { - return a - } - return b -} - -func myMaxInt8(a, b int8) int8 { - if a > b { - return a - } - return b -} - -func myMin(a, b int) int { - if a < b { - return a - } - return b -} - -func myMinInt8(a, b int8) int8 { - if a < b { - return a - } - return b -} - const ( maxUint = uint64(math.MaxUint64) uintCutOff = maxUint/uint64(10) + 1 diff --git a/types/mydecimal.go b/types/mydecimal.go index 203211265e0c9..37e724ab61370 100644 --- a/types/mydecimal.go +++ b/types/mydecimal.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/mathutil" "go.uber.org/zap" ) @@ -353,7 +354,7 @@ func (d *MyDecimal) ToString() (str []byte) { for ; digitsFrac > 0; digitsFrac -= digitsPerWord { x := d.wordBuf[wordIdx] wordIdx++ - for i := myMin(digitsFrac, digitsPerWord); i > 0; i-- { + for i := mathutil.Min(digitsFrac, digitsPerWord); i > 0; i-- { y := x / digMask str[fracIdx] = byte(y) + '0' fracIdx++ @@ -380,7 +381,7 @@ func (d *MyDecimal) ToString() (str []byte) { for ; digitsInt > 0; digitsInt -= digitsPerWord { wordIdx-- x := d.wordBuf[wordIdx] - for i := myMin(digitsInt, digitsPerWord); i > 0; i-- { + for i := mathutil.Min(digitsInt, digitsPerWord); i > 0; i-- { y := x / 10 strIdx-- str[strIdx] = '0' + byte(x-y*10) @@ -840,7 +841,7 @@ func (d *MyDecimal) Round(to *MyDecimal, frac int, roundMode RoundMode) (err err if to != d { copy(to.wordBuf[:], d.wordBuf[:]) to.negative = d.negative - to.digitsInt = int8(myMin(wordsInt, wordBufLen) * digitsPerWord) + to.digitsInt = int8(mathutil.Min(wordsInt, wordBufLen) * digitsPerWord) } if wordsFracTo > wordsFrac { idx := wordsInt + wordsFrac @@ -941,7 +942,7 @@ func (d *MyDecimal) Round(to *MyDecimal, frac int, roundMode RoundMode) (err err frac = wordsFracTo * digitsPerWord err = ErrTruncated } - for toIdx = wordsInt + myMax(wordsFracTo, 0); toIdx > 0; toIdx-- { + for toIdx = wordsInt + mathutil.Max(wordsFracTo, 0); toIdx > 0; toIdx-- { if toIdx < wordBufLen { to.wordBuf[toIdx] = to.wordBuf[toIdx-1] } else { @@ -965,7 +966,7 @@ func (d *MyDecimal) Round(to *MyDecimal, frac int, roundMode RoundMode) (err err /* making 'zero' with the proper scale */ idx := wordsFracTo + 1 to.digitsInt = 1 - to.digitsFrac = int8(myMax(frac, 0)) + to.digitsFrac = int8(mathutil.Max(frac, 0)) to.negative = false for toIdx < idx { to.wordBuf[toIdx] = 0 @@ -1602,7 +1603,7 @@ func DecimalNeg(from *MyDecimal) *MyDecimal { // of `to` may be changed during evaluating. func DecimalAdd(from1, from2, to *MyDecimal) error { from1, from2, to = validateArgs(from1, from2, to) - to.resultFrac = myMaxInt8(from1.resultFrac, from2.resultFrac) + to.resultFrac = mathutil.Max(from1.resultFrac, from2.resultFrac) if from1.negative == from2.negative { return doAdd(from1, from2, to) } @@ -1613,7 +1614,7 @@ func DecimalAdd(from1, from2, to *MyDecimal) error { // DecimalSub subs one decimal from another, sets the result to 'to'. func DecimalSub(from1, from2, to *MyDecimal) error { from1, from2, to = validateArgs(from1, from2, to) - to.resultFrac = myMaxInt8(from1.resultFrac, from2.resultFrac) + to.resultFrac = mathutil.Max(from1.resultFrac, from2.resultFrac) if from1.negative == from2.negative { _, err := doSub(from1, from2, to) return err @@ -1649,7 +1650,7 @@ func doSub(from1, from2, to *MyDecimal) (cmp int, err error) { wordsFrac1 = digitsToWords(int(from1.digitsFrac)) wordsInt2 = digitsToWords(int(from2.digitsInt)) wordsFrac2 = digitsToWords(int(from2.digitsFrac)) - wordsFracTo = myMax(wordsFrac1, wordsFrac2) + wordsFracTo = mathutil.Max(wordsFrac1, wordsFrac2) start1 = 0 stop1 = wordsInt1 @@ -1814,8 +1815,8 @@ func doAdd(from1, from2, to *MyDecimal) error { wordsFrac1 = digitsToWords(int(from1.digitsFrac)) wordsInt2 = digitsToWords(int(from2.digitsInt)) wordsFrac2 = digitsToWords(int(from2.digitsFrac)) - wordsIntTo = myMax(wordsInt1, wordsInt2) - wordsFracTo = myMax(wordsFrac1, wordsFrac2) + wordsIntTo = mathutil.Max(wordsInt1, wordsInt2) + wordsFracTo = mathutil.Max(wordsFrac1, wordsFrac2) ) var x int32 @@ -1839,7 +1840,7 @@ func doAdd(from1, from2, to *MyDecimal) error { idxTo := wordsIntTo + wordsFracTo to.negative = from1.negative to.digitsInt = int8(wordsIntTo * digitsPerWord) - to.digitsFrac = myMaxInt8(from1.digitsFrac, from2.digitsFrac) + to.digitsFrac = mathutil.Max(from1.digitsFrac, from2.digitsFrac) if err != nil { if to.digitsFrac > int8(wordsFracTo*digitsPerWord) { @@ -1977,7 +1978,7 @@ func DecimalMul(from1, from2, to *MyDecimal) error { tmp1 = wordsIntTo tmp2 = wordsFracTo ) - to.resultFrac = myMinInt8(from1.resultFrac+from2.resultFrac, mysql.MaxDecimalScale) + to.resultFrac = mathutil.Min(from1.resultFrac+from2.resultFrac, mysql.MaxDecimalScale) wordsIntTo, wordsFracTo, err = fixWordCntError(wordsIntTo, wordsFracTo) to.negative = from1.negative != from2.negative to.digitsFrac = from1.digitsFrac + from2.digitsFrac @@ -2092,7 +2093,7 @@ func DecimalMul(from1, from2, to *MyDecimal) error { // fracIncr - increment of fraction func DecimalDiv(from1, from2, to *MyDecimal, fracIncr int) error { from1, from2, to = validateArgs(from1, from2, to) - to.resultFrac = myMinInt8(from1.resultFrac+int8(fracIncr), mysql.MaxDecimalScale) + to.resultFrac = mathutil.Min(from1.resultFrac+int8(fracIncr), mysql.MaxDecimalScale) return doDivMod(from1, from2, to, nil, fracIncr) } @@ -2122,7 +2123,7 @@ DecimalMod does modulus of two decimals. */ func DecimalMod(from1, from2, to *MyDecimal) error { from1, from2, to = validateArgs(from1, from2, to) - to.resultFrac = myMaxInt8(from1.resultFrac, from2.resultFrac) + to.resultFrac = mathutil.Max(from1.resultFrac, from2.resultFrac) return doDivMod(from1, from2, nil, to, 0) } @@ -2190,7 +2191,7 @@ func doDivMod(from1, from2, to, mod *MyDecimal, fracIncr int) error { // digitsFrac=max(frac1, frac2), as for subtraction // digitsInt=from2.digitsInt to.negative = from1.negative - to.digitsFrac = myMaxInt8(from1.digitsFrac, from2.digitsFrac) + to.digitsFrac = mathutil.Max(from1.digitsFrac, from2.digitsFrac) } else { wordsFracTo = digitsToWords(frac1 + frac2 + fracIncr) wordsIntTo, wordsFracTo, err = fixWordCntError(wordsIntTo, wordsFracTo) @@ -2355,7 +2356,7 @@ func doDivMod(from1, from2, to, mod *MyDecimal, fracIncr int) error { return ErrOverflow } stop1 = start1 + wordsIntTo + wordsFracTo - to.digitsInt = int8(myMin(wordsIntTo*digitsPerWord, int(from2.digitsInt))) + to.digitsInt = int8(mathutil.Min(wordsIntTo*digitsPerWord, int(from2.digitsInt))) } if wordsIntTo+wordsFracTo > wordBufLen { stop1 -= wordsIntTo + wordsFracTo - wordBufLen From ea237a79840a8b8d4ff9a884c84b57f11423703d Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 10 Feb 2023 15:22:15 +0800 Subject: [PATCH 6/7] bazel: set --test_keep_going false (#41247) --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 99df487fcfcef..dff26f1854ca6 100644 --- a/Makefile +++ b/Makefile @@ -410,11 +410,11 @@ bazel_test: failpoint-enable bazel_ci_prepare bazel_coverage_test: failpoint-enable bazel_ci_prepare - bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ + bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \ --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... - bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \ + bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_keep_going=false \ --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,distributereorg \ -- //... -//cmd/... -//tests/graceshutdown/... \ -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/... From 6cb73749c6534c36229c3b42bd7cecf8e03b372a Mon Sep 17 00:00:00 2001 From: Benjamin2037 Date: Fri, 10 Feb 2023 17:08:00 +0800 Subject: [PATCH 7/7] ddl: Add telementry for distributed reorg tasks. (#41201) ref pingcap/tidb#41266 --- ddl/dist_owner.go | 8 ++++++- metrics/telemetry.go | 10 +++++++++ telemetry/data_feature_usage_test.go | 31 ++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 94337a02ad809..588ef036ef276 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -39,7 +39,10 @@ import ( ) // CheckBackfillJobFinishInterval is export for test. -var CheckBackfillJobFinishInterval = 300 * time.Millisecond +var ( + CheckBackfillJobFinishInterval = 300 * time.Millisecond + telemetryDistReorgUsage = metrics.TelemetryDistReorgCnt +) const ( distPhysicalTableConcurrency = 16 @@ -48,6 +51,9 @@ const ( func initDistReorg(reorgMeta *model.DDLReorgMeta) { isDistReorg := variable.DDLEnableDistributeReorg.Load() reorgMeta.IsDistReorg = isDistReorg + if isDistReorg { + metrics.TelemetryDistReorgCnt.Inc() + } } // BackfillJobRangeMeta is export for test. diff --git a/metrics/telemetry.go b/metrics/telemetry.go index c133610c56189..69093252079c3 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -169,6 +169,13 @@ var ( Name: "compact_partition_usage", Help: "Counter of compact table partition", }) + TelemetryDistReorgCnt = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "telemetry", + Name: "distributed_reorg_count", + Help: "Counter of usage of distributed reorg DDL tasks count", + }) TelemetryStoreBatchedQueryCnt = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "tidb", @@ -414,6 +421,7 @@ type DDLUsageCounter struct { AddIndexIngestUsed int64 `json:"add_index_ingest_used"` MetadataLockUsed bool `json:"metadata_lock_used"` FlashbackClusterUsed int64 `json:"flashback_cluster_used"` + DistReorgUsed int64 `json:"dist_reorg_used"` } // Sub returns the difference of two counters. @@ -421,6 +429,7 @@ func (a DDLUsageCounter) Sub(rhs DDLUsageCounter) DDLUsageCounter { return DDLUsageCounter{ AddIndexIngestUsed: a.AddIndexIngestUsed - rhs.AddIndexIngestUsed, FlashbackClusterUsed: a.FlashbackClusterUsed - rhs.FlashbackClusterUsed, + DistReorgUsed: a.DistReorgUsed - rhs.DistReorgUsed, } } @@ -429,6 +438,7 @@ func GetDDLUsageCounter() DDLUsageCounter { return DDLUsageCounter{ AddIndexIngestUsed: readCounter(TelemetryAddIndexIngestCnt), FlashbackClusterUsed: readCounter(TelemetryFlashbackClusterCnt), + DistReorgUsed: readCounter(TelemetryDistReorgCnt), } } diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index 0630c26b31471..67d3afac74d98 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -585,6 +585,37 @@ func TestAddIndexAccelerationAndMDL(t *testing.T) { require.Equal(t, true, usage.DDLUsageCounter.MetadataLockUsed) } +func TestDistReorgUsage(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + usage, err := telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + initCount := usage.DDLUsageCounter.DistReorgUsed + + tk.MustExec("set @@global.tidb_ddl_distribute_reorg = off") + allow := variable.DDLEnableDistributeReorg.Load() + require.Equal(t, false, allow) + tk.MustExec("use test") + tk.MustExec("drop table if exists tele_t") + tk.MustExec("create table tele_t(id int, b int)") + tk.MustExec("insert into tele_t values(1,1),(2,2);") + tk.MustExec("alter table tele_t add index idx_org(b)") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, initCount, usage.DDLUsageCounter.DistReorgUsed) + + tk.MustExec("set @@global.tidb_ddl_distribute_reorg = on") + allow = variable.DDLEnableDistributeReorg.Load() + require.Equal(t, true, allow) + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, initCount, usage.DDLUsageCounter.DistReorgUsed) + tk.MustExec("alter table tele_t add index idx_new(b)") + usage, err = telemetry.GetFeatureUsage(tk.Session()) + require.NoError(t, err) + require.Equal(t, initCount+1, usage.DDLUsageCounter.DistReorgUsed) +} + func TestGlobalMemoryControl(t *testing.T) { store := testkit.CreateMockStore(t)