Skip to content

Commit

Permalink
ddl: skip deleting the unmatched index key on merge (#40465) (#40478)
Browse files Browse the repository at this point in the history
close #40464
  • Loading branch information
ti-chi-bot authored Jan 29, 2023
1 parent 14c7dce commit 0170ac3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
16 changes: 13 additions & 3 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR
}
if !idxRecords[i].delete {
idxRecords[i].skip = true
} else {
// Prevent deleting an unexpected index KV.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(val, w.table.Meta().IsCommonHandle)
if err != nil {
return errors.Trace(err)
}
if !idxRecords[i].handle.Equal(hdInVal) {
idxRecords[i].skip = true
}
}
} else if idxRecords[i].distinct {
// The keys in w.batchCheckKeys also maybe duplicate,
Expand All @@ -76,6 +85,7 @@ type temporaryIndexRecord struct {
delete bool
unique bool
distinct bool
handle kv.Handle
rowKey kv.Key
}

Expand Down Expand Up @@ -137,7 +147,8 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC

// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey)
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -209,14 +220,13 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, err
}
}
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle)

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)

idxRecord := &temporaryIndexRecord{
rowKey: rowKey,
handle: handle,
delete: isDelete,
unique: unique,
skip: false,
Expand Down
37 changes: 37 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,43 @@ func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
tk.MustExec("admin check table t;")
}

func TestAddIndexMergeDeleteUniqueOnWriteOnly(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int default 0, b int default 0);")
tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4);")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

d := dom.DDL()
originalCallback := d.GetHook()
defer d.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if t.Failed() {
return
}
var err error
switch job.SchemaState {
case model.StateDeleteOnly:
_, err = tk1.Exec("insert into t values (5, 5);")
assert.NoError(t, err)
case model.StateWriteOnly:
_, err = tk1.Exec("insert into t values (5, 7);")
assert.NoError(t, err)
_, err = tk1.Exec("delete from t where b = 7;")
assert.NoError(t, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(callback)
tk.MustExec("alter table t add unique index idx(a);")
tk.MustExec("admin check table t;")
}

func TestAddIndexMergeConflictWithPessimistic(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 0170ac3

Please sign in to comment.