diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 5f699b3507e6f..a7b9816fc85de 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -59,6 +59,15 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR } if !idxRecords[i].delete { idxRecords[i].skip = true + } else { + // Prevent deleting an unexpected index KV. + hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(val, w.table.Meta().IsCommonHandle) + if err != nil { + return errors.Trace(err) + } + if !idxRecords[i].handle.Equal(hdInVal) { + idxRecords[i].skip = true + } } } else if idxRecords[i].distinct { // The keys in w.batchCheckKeys also maybe duplicate, @@ -76,6 +85,7 @@ type temporaryIndexRecord struct { delete bool unique bool distinct bool + handle kv.Handle rowKey kv.Key } @@ -137,7 +147,8 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC // Lock the corresponding row keys so that it doesn't modify the index KVs // that are changing by a pessimistic transaction. - err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.rowKey) + rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle) + err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey) if err != nil { return errors.Trace(err) } @@ -209,14 +220,13 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, err } } - rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), handle) originIdxKey := make([]byte, len(indexKey)) copy(originIdxKey, indexKey) tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) idxRecord := &temporaryIndexRecord{ - rowKey: rowKey, + handle: handle, delete: isDelete, unique: unique, skip: false, diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 6027c5cee215f..1c4d107a0aebf 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -378,6 +378,43 @@ func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) { tk.MustExec("admin check table t;") } +func TestAddIndexMergeDeleteUniqueOnWriteOnly(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3), (4, 4);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &ddl.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + _, err = tk1.Exec("insert into t values (5, 5);") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (5, 7);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 7;") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") +} + func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store)