Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: properly handle existence checks for deleted records in temp index #39371

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
bbb7c38
fix tempidx key check exist logic.
Benjamin2037 Nov 24, 2022
ab56202
only check temp idnex key.
Benjamin2037 Nov 24, 2022
9d2b142
refactor code and add comments.
Benjamin2037 Nov 25, 2022
4104176
add test case.
Benjamin2037 Nov 25, 2022
43aae4c
add error information.
Benjamin2037 Nov 25, 2022
8396fda
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
Benjamin2037 Nov 25, 2022
5c73ab3
Update ddl/index_merge_tmp_test.go
Benjamin2037 Nov 25, 2022
87e1d09
remove reduent err check
Benjamin2037 Nov 25, 2022
456b2bf
refactor code
Benjamin2037 Nov 25, 2022
1f810bf
refactor the code
Benjamin2037 Nov 25, 2022
10ecd3c
unexported KeyInTempIndeix to keyInTempIndex
Benjamin2037 Nov 25, 2022
928a07b
name a enum tempIndexKeyState.
Benjamin2037 Nov 25, 2022
3778505
Edit hook test case code.
Benjamin2037 Nov 28, 2022
e37a755
Add check logic to make sure the uk.newkey is from temp key, then app…
Benjamin2037 Nov 28, 2022
72886e7
Update ddl/index_merge_tmp_test.go
Benjamin2037 Nov 28, 2022
42f0e3e
Add check logic for input key and value.
Benjamin2037 Nov 28, 2022
a16e7eb
Merge branch 'fix-default-on-fast-ddl-ut-part2' of github.com:Benjami…
Benjamin2037 Nov 28, 2022
a4f2d55
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
ti-chi-bot Nov 28, 2022
d33e363
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
ti-chi-bot Nov 28, 2022
125e94b
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
ti-chi-bot Nov 28, 2022
9c370be
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
ti-chi-bot Nov 28, 2022
ef47ff7
Merge branch 'master' into fix-default-on-fast-ddl-ut-part2
ti-chi-bot Nov 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,8 +1719,6 @@ func TestCreateUniqueExpressionIndex(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

Expand Down
27 changes: 21 additions & 6 deletions ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func TestIndexChange(t *testing.T) {
ddl.SetWaitTimeWhenErrorOccurred(1 * time.Microsecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// TODO: Will check why tidb_ddl_enable_fast_reorg could not default be on in another PR.
tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")

Expand Down Expand Up @@ -221,6 +219,7 @@ func checkAddWriteOnlyForAddIndex(ctx sessionctx.Context, delOnlyTbl, writeOnlyT
}

func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table.Table) error {
var err1 error
// WriteOnlyTable: insert t values (6, 6)
err := sessiontxn.NewTxn(context.Background(), ctx)
if err != nil {
Expand All @@ -231,7 +230,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 6, 6, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 6, 6, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
// PublicTable: insert t values (7, 7)
Expand All @@ -250,10 +253,18 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 5, 7, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, 5, 7, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
if ddl.IsEnableFastReorg() {
err = checkIndexExists(ctx, writeTbl, 7, 7, false)
} else {
err = checkIndexExists(ctx, publicTbl, 7, 7, false)
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -283,7 +294,11 @@ func checkAddPublicForAddIndex(ctx sessionctx.Context, writeTbl, publicTbl table
idxVal := row[1].GetInt64()
handle := row[0].GetInt64()
err = checkIndexExists(ctx, publicTbl, idxVal, handle, true)
if err != nil {
if ddl.IsEnableFastReorg() {
// Need check temp index also.
err1 = checkIndexExists(ctx, writeTbl, idxVal, handle, true)
}
if err != nil && err1 != nil {
return errors.Trace(err)
}
}
Expand Down
62 changes: 62 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,65 @@ func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) {
tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;",
"amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg")
}

// TestCreateUniqueIndexKeyExist this case will test below things:
// Create one unique index idx((a*b+1));
// insert (0, 6) and delete it;
// insert (0, 9), it should be successful;
// Should check temp key exist and skip deleted mark
// The error returned below:
// Error: Received unexpected error:
//
// [kv:1062]Duplicate entry '1' for key 't.idx'
func TestCreateUniqueIndexKeyExist(t *testing.T) {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0)")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4)")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

stateDeleteOnlySQLs := []string{"insert into t values (5, 5)", "begin pessimistic;", "insert into t select * from t", "rollback", "insert into t set b = 6", "update t set b = 7 where a = 1", "delete from t where b = 4"}

// If waitReorg timeout, the worker may enter writeReorg more than 2 times.
reorgTime := 0
d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
err := originalCallback.OnChanged(nil)
require.NoError(t, err)
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
switch job.SchemaState {
case model.StateDeleteOnly:
for _, sql := range stateDeleteOnlySQLs {
tk1.MustExec(sql)
}
// (1, 7), (2, 2), (3, 3), (5, 5), (0, 6)
case model.StateWriteOnly:
tk1.MustExec("insert into t values (8, 8)")
tk1.MustExec("update t set b = 7 where a = 2")
tk1.MustExec("delete from t where b = 3")
// (1, 7), (2, 7), (5, 5), (0, 6), (8, 8)
case model.StateWriteReorganization:
if reorgTime < 1 {
reorgTime++
} else {
return
}
tk1.MustExec("insert into t values (10, 10)")
tk1.MustExec("delete from t where b = 6")
tk1.MustExec("insert into t set b = 9")
tk1.MustExec("update t set b = 7 where a = 5")
// (1, 7), (2, 7), (5, 7), (8, 8), (10, 10), (0, 9)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)
tk.MustExec("alter table t add unique index idx((a*b+1))")
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10"))
}
4 changes: 4 additions & 0 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D
if !distinct {
continue
}
// If index is used ingest ways, then we should check key from temp index.
if v.Meta().BackfillState != model.BackfillStateInapplicable {
_, key, _ = tables.GenTempIdxKeyByState(v.Meta(), key)
}
colValStr, err1 := formatDataForDupError(colVals)
if err1 != nil {
return nil, err1
Expand Down
8 changes: 8 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -264,6 +266,12 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}
return err
}
// Since the temp index stores deleted key with marked 'deleteu' for unique key at the end
// of value, So if return a key we check and skip deleted key.
rowVal := val[:len(val)-1]
if bytes.Equal(rowVal, tables.DeleteMarkerUnique) {
continue
}
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle)
if err != nil {
return err
Expand Down
103 changes: 97 additions & 6 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tables

import (
"bytes"
"context"
"sync"

Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
keyIsTempIdxKey bool
)
if !opt.FromBackFill {
key, tempKey, keyVer = genTempIdxKeyByState(c.idxInfo, key)
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
Expand Down Expand Up @@ -226,11 +227,26 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
if err != nil || len(value) == 0 {
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
var needPresumeKey int
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
if keyIsTempIdxKey {
idxVal = append(idxVal, keyVer)
needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
}
} else {
if len(tempKey) > 0 {
needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err != nil {
return nil, err
}
}
}
if lazyCheck {
flags := []kv.FlagsOp{kv.SetPresumeKeyNotExists}
var flags []kv.FlagsOp
if needPresumeKey != KeyInTempIndexIsDeleted {
flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists}
}
if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() &&
!vars.InRestrictedSQL && vars.ConnectionID > 0 {
flags = append(flags, kv.SetNeedConstraintCheckInPrewrite)
Expand All @@ -244,7 +260,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
if len(tempKey) > 0 {
idxVal = append(idxVal, keyVer)
if lazyCheck {
if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted {
err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists)
} else {
err = txn.GetMemBuffer().Set(tempKey, idxVal)
Expand Down Expand Up @@ -285,7 +301,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
return err
}

key, tempKey, tempKeyVer := genTempIdxKeyByState(c.idxInfo, key)
key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key)

if distinct {
if len(key) > 0 {
Expand Down Expand Up @@ -336,9 +352,9 @@ const (
TempIndexKeyTypeMerge byte = 'm'
)

// genTempIdxKeyByState is used to get the key version and the temporary key.
// GenTempIdxKeyByState is used to get the key version and the temporary key.
// The tempKeyVer means the temp index key/value version.
func genTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tempKey kv.Key, tempKeyVer byte) {
if indexInfo.State != model.StatePublic {
switch indexInfo.BackfillState {
case model.BackfillStateInapplicable:
Expand All @@ -364,6 +380,30 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV
return false, nil, err
}

var (
tempKey []byte
keyVer byte
)
// If index current is in creating status and using ingest mode, we need first
// check key exist status in temp index.
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer != TempIndexKeyTypeNone {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
if len(tempKey) > 0 {
KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle)
if err1 != nil {
return false, nil, err
}
switch KeyExistInfo {
case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted:
return false, nil, nil
case KeyInTempIndexConflict:
return true, h1, kv.ErrKeyExists
case KeyInTempIndexIsItself:
return true, h, nil
}
}
}

value, err := txn.Get(context.TODO(), key)
if kv.IsErrNotFound(err) {
return false, nil, nil
Expand Down Expand Up @@ -463,3 +503,54 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo *
}
return colInfo
}

const (
// KeyInTempIndexUnknown whether the key exists or not in temp index is unknown.
KeyInTempIndexUnknown = 0
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
// KeyInTempIndexNotExist the key is not exist in temp index.
KeyInTempIndexNotExist = 1
// KeyInTempIndexIsDeleted the key is marked deleted in temp index.
KeyInTempIndexIsDeleted = 2
// KeyInTempIndexIsItself the key is correlated to itself in temp index.
KeyInTempIndexIsItself = 3
// KeyInTempIndexConflict the key is conflict in temp index.
KeyInTempIndexConflict = 4
)

// KeyExistInTempIndex is used to check if there is unique key is marked delete in temp index.
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (int, kv.Handle, error) {
value, err := txn.Get(ctx, key)
if kv.IsErrNotFound(err) {
return KeyInTempIndexNotExist, nil, nil
}
if err != nil {
return KeyInTempIndexUnknown, nil, err
}

length := len(value)
// Firstly, we will remove the last byte of key version.
// It should be TempIndexKeyTypeBackfill or TempIndexKeyTypeMerge.
value = value[:length-1]
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
if distinct {
if bytes.Equal(value, DeleteMarkerUnique) {
return KeyInTempIndexIsDeleted, nil, nil
}
} else {
if bytes.Equal(value, DeleteMarker) {
return KeyInTempIndexIsDeleted, nil, nil
}
}

// Check if handle equal.
var handle kv.Handle
if distinct {
handle, err = tablecodec.DecodeHandleInUniqueIndexValue(value, IsCommonHandle)
if err != nil {
return KeyInTempIndexUnknown, nil, err
}
if !handle.Equal(h) {
return KeyInTempIndexConflict, handle, kv.ErrKeyExists
}
}
return KeyInTempIndexIsItself, handle, nil
}