diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 6de1376e2b75b..cf11036a9935e 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1719,8 +1719,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t(a int default 0, b int default 0)") tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") @@ -1740,8 +1738,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) { if checkErr != nil { return } - err := originalCallback.OnChanged(nil) - require.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 85352cb6d08d1..f9dcc99154dc5 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -37,8 +37,6 @@ func TestIndexChange(t *testing.T) { ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - // TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") tk.MustExec("create table t (c1 int primary key, c2 int)") tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);") @@ -221,6 +219,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT } func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table.Table) error { + var err1 error // WriteOnlyTable: insert t values (6, 6) err := sessiontxn.NewTxn(context.Background(), ctx) if err != nil { @@ -231,7 +230,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table return errors.Trace(err) } err = checkIndexExists(ctx, publicTbl, 6, 6, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, 6, 6, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } // PublicTable: insert t values (7, 7) @@ -250,10 +253,18 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table return errors.Trace(err) } err = checkIndexExists(ctx, publicTbl, 5, 7, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, 5, 7, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } - err = checkIndexExists(ctx, publicTbl, 7, 7, false) + if ddl.IsEnableFastReorg() { + err = checkIndexExists(ctx, writeTbl, 7, 7, false) + } else { + err = checkIndexExists(ctx, publicTbl, 7, 7, false) + } if err != nil { return errors.Trace(err) } @@ -283,7 +294,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table idxVal := row[1].GetInt64() handle := row[0].GetInt64() err = checkIndexExists(ctx, publicTbl, idxVal, handle, true) - if err != nil { + if ddl.IsEnableFastReorg() { + // Need check temp index also. + err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true) + } + if err != nil && err1 != nil { return errors.Trace(err) } } diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 8dae95e590438..389339ac15ad4 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -259,3 +259,75 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;", "amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg") } + +// TestCreateUniqueIndexKeyExist this case will test below things: +// Create one unique index idx((a*b+1)); +// insert (0, 6) and delete it; +// insert (0, 9), it should be successful; +// Should check temp key exist and skip deleted mark +// The error returned below: +// Error: Received unexpected error: +// +// [kv:1062]Duplicate entry '1' for key 't.idx' +func TestCreateUniqueIndexKeyExist(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + stateDeleteOnlySQLs := []string{"insert into t values (5, 5)", "begin pessimistic;", "insert into t select * from t", "rollback", "insert into t set b = 6", "update t set b = 7 where a = 1", "delete from t where b = 4"} + + // If waitReorg timeout, the worker may enter writeReorg more than 2 times. + reorgTime := 0 + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &ddl.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + for _, sql := range stateDeleteOnlySQLs { + _, err = tk1.Exec(sql) + assert.NoError(t, err) + } + // (1, 7), (2, 2), (3, 3), (5, 5), (0, 6) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (8, 8)") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = 7 where a = 2") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 3") + assert.NoError(t, err) + // (1, 7), (2, 7), (5, 5), (0, 6), (8, 8) + case model.StateWriteReorganization: + if reorgTime < 1 { + reorgTime++ + } else { + return + } + _, err = tk1.Exec("insert into t values (10, 10)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 6") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t set b = 9") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = 7 where a = 5") + assert.NoError(t, err) + // (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + tk.MustExec("alter table t add unique index idx((a*b+1))") + tk.MustExec("admin check table t") + tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10")) +} diff --git a/executor/batch_checker.go b/executor/batch_checker.go index d3820ecb0d08c..79a6748b2d5c3 100644 --- a/executor/batch_checker.go +++ b/executor/batch_checker.go @@ -180,6 +180,10 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D if !distinct { continue } + // If index is used ingest ways, then we should check key from temp index. + if v.Meta().BackfillState != model.BackfillStateInapplicable { + _, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key) + } colValStr, err1 := formatDataForDupError(colVals) if err1 != nil { return nil, err1 diff --git a/executor/insert.go b/executor/insert.go index 2450f0a117f70..7e17fabcfe957 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -15,6 +15,7 @@ package executor import ( + "bytes" "context" "encoding/hex" "fmt" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -262,6 +264,14 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } return err } + // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end + // of value, So if return a key we check and skip deleted key. + if tablecodec.IsTempIndexKey(uk.newKey) { + rowVal := val[:len(val)-1] + if bytes.Equal(rowVal, tables.DeleteMarkerUnique) { + continue + } + } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { return err diff --git a/table/tables/index.go b/table/tables/index.go index 9fc1042a110fd..b3a481efba29f 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -15,7 +15,9 @@ package tables import ( + "bytes" "context" + "errors" "sync" "github.com/opentracing/opentracing-go" @@ -127,7 +129,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue keyIsTempIdxKey bool ) if !opt.FromBackFill { - key, tempKey, keyVer = genTempIdxKeyByState(c.idxInfo, key) + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) if keyVer == TempIndexKeyTypeBackfill { key, tempKey = tempKey, nil keyIsTempIdxKey = true @@ -226,11 +228,26 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if err != nil || len(value) == 0 { lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil + var needPresumeKey tempIndexKeyState if keyIsTempIdxKey { idxVal = append(idxVal, keyVer) + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } else { + if len(tempKey) > 0 { + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } } if lazyCheck { - flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists} + var flags []kv.FlagsOp + if needPresumeKey != KeyInTempIndexIsDeleted { + flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) @@ -244,7 +261,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if len(tempKey) > 0 { idxVal = append(idxVal, keyVer) - if lazyCheck { + if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) } else { err = txn.GetMemBuffer().Set(tempKey, idxVal) @@ -285,7 +302,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return err } - key, tempKey, tempKeyVer := genTempIdxKeyByState(c.idxInfo, key) + key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) if distinct { if len(key) > 0 { @@ -336,9 +353,9 @@ const ( TempIndexKeyTypeMerge byte = 'm' ) -// genTempIdxKeyByState is used to get the key version and the temporary key. +// GenTempIdxKeyByState is used to get the key version and the temporary key. // The tempKeyVer means the temp index key/value version. -func genTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) { +func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) { if indexInfo.State != model.StatePublic { switch indexInfo.BackfillState { case model.BackfillStateInapplicable: @@ -364,6 +381,28 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV return false, nil, err } + var ( + tempKey []byte + keyVer byte + ) + // If index current is in creating status and using ingest mode, we need first + // check key exist status in temp index. + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) + if keyVer != TempIndexKeyTypeNone { + KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err1 != nil { + return false, nil, err + } + switch KeyExistInfo { + case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: + return false, nil, nil + case KeyInTempIndexConflict: + return true, h1, kv.ErrKeyExists + case KeyInTempIndexIsItself: + return true, h, nil + } + } + value, err := txn.Get(context.TODO(), key) if kv.IsErrNotFound(err) { return false, nil, nil @@ -463,3 +502,64 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } + +type tempIndexKeyState byte + +const ( + // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. + KeyInTempIndexUnknown tempIndexKeyState = iota + // KeyInTempIndexNotExist the key is not exist in temp index. + KeyInTempIndexNotExist + // KeyInTempIndexIsDeleted the key is marked deleted in temp index. + KeyInTempIndexIsDeleted + // KeyInTempIndexIsItself the key is correlated to itself in temp index. + KeyInTempIndexIsItself + // KeyInTempIndexConflict the key is conflict in temp index. + KeyInTempIndexConflict +) + +// KeyExistInTempIndex is used to check the unique key exist status in temp index. +func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (tempIndexKeyState, kv.Handle, error) { + // Only check temp index key. + if !tablecodec.IsTempIndexKey(key) { + return KeyInTempIndexUnknown, nil, nil + } + value, err := txn.Get(ctx, key) + if kv.IsErrNotFound(err) { + return KeyInTempIndexNotExist, nil, nil + } + if err != nil { + return KeyInTempIndexUnknown, nil, err + } + + // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. + if len(value) < 1 { + return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") + } + length := len(value) + // Firstly, we will remove the last byte of key version. + // It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge. + value = value[:length-1] + if distinct { + if bytes.Equal(value, DeleteMarkerUnique) { + return KeyInTempIndexIsDeleted, nil, nil + } + } else { + if bytes.Equal(value, DeleteMarker) { + return KeyInTempIndexIsDeleted, nil, nil + } + } + + // Check if handle equal. + var handle kv.Handle + if distinct { + handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle) + if err != nil { + return KeyInTempIndexUnknown, nil, err + } + if !handle.Equal(h) { + return KeyInTempIndexConflict, handle, kv.ErrKeyExists + } + } + return KeyInTempIndexIsItself, handle, nil +} diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index b9400b0271d41..e45576b9d0674 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1143,6 +1143,20 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } +// IsTempIndexKey check whether the input key is for a temp index. +func IsTempIndexKey(indexKey []byte) bool { + var ( + indexIDKey []byte + indexID int64 + tempIndexID int64 + ) + // Get encoded indexID from key, Add uint64 8 byte length. + indexIDKey = indexKey[prefixLen : prefixLen+8] + indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) + tempIndexID = int64(TempIndexPrefix) | indexID + return tempIndexID == indexID +} + // GenIndexValuePortal is the portal for generating index value. // Value layout: //