Skip to content

Commit e3ed28c

Browse files
zyguanti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#42210
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent 51eab73 commit e3ed28c

File tree

4 files changed

+135
-17
lines changed

4 files changed

+135
-17
lines changed

executor/insert_common.go

+48
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,10 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
11281128
}
11291129
} else {
11301130
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
1131+
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
1132+
// lock duplicated row key on insert-ignore
1133+
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
1134+
}
11311135
continue
11321136
}
11331137
} else if !kv.IsErrNotFound(err) {
@@ -1137,12 +1141,45 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
11371141
for _, uk := range r.uniqueKeys {
11381142
_, err := txn.Get(ctx, uk.newKey)
11391143
if err == nil {
1144+
<<<<<<< HEAD
11401145
// If duplicate keys were found in BatchGet, mark row = nil.
11411146
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
11421147
skip = true
11431148
break
11441149
}
11451150
if !kv.IsErrNotFound(err) {
1151+
=======
1152+
if replace {
1153+
_, handle, err := tables.FetchDuplicatedHandle(
1154+
ctx,
1155+
uk.newKey,
1156+
true,
1157+
txn,
1158+
e.Table.Meta().ID,
1159+
uk.commonHandle,
1160+
)
1161+
if err != nil {
1162+
return err
1163+
}
1164+
if handle == nil {
1165+
continue
1166+
}
1167+
_, err = e.removeRow(ctx, txn, handle, r, true)
1168+
if err != nil {
1169+
return err
1170+
}
1171+
} else {
1172+
// If duplicate keys were found in BatchGet, mark row = nil.
1173+
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
1174+
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
1175+
// lock duplicated unique key on insert-ignore
1176+
txnCtx.AddUnchangedRowKey(uk.newKey)
1177+
}
1178+
skip = true
1179+
break
1180+
}
1181+
} else if !kv.IsErrNotFound(err) {
1182+
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
11461183
return err
11471184
}
11481185
}
@@ -1187,7 +1224,18 @@ func (e *InsertValues) removeRow(ctx context.Context, txn kv.Transaction, r toBe
11871224
return err
11881225
}
11891226
if identical {
1227+
<<<<<<< HEAD
11901228
return nil
1229+
=======
1230+
if inReplace {
1231+
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
1232+
}
1233+
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
1234+
if err != nil {
1235+
return false, err
1236+
}
1237+
return true, nil
1238+
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
11911239
}
11921240

11931241
err = r.t.RemoveRecord(e.ctx, handle, oldRow)

executor/insert_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -2032,3 +2032,69 @@ func TestIssue32213(t *testing.T) {
20322032
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
20332033
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
20342034
}
2035+
2036+
func TestInsertLock(t *testing.T) {
2037+
store := testkit.CreateMockStore(t)
2038+
tk1 := testkit.NewTestKit(t, store)
2039+
tk2 := testkit.NewTestKit(t, store)
2040+
tk1.MustExec("use test")
2041+
tk2.MustExec("use test")
2042+
2043+
for _, tt := range []struct {
2044+
name string
2045+
ddl string
2046+
dml string
2047+
}{
2048+
{
2049+
"replace-pk",
2050+
"create table t (c int primary key clustered)",
2051+
"replace into t values (1)",
2052+
},
2053+
{
2054+
"replace-uk",
2055+
"create table t (c int unique key)",
2056+
"replace into t values (1)",
2057+
},
2058+
{
2059+
"insert-ingore-pk",
2060+
"create table t (c int primary key clustered)",
2061+
"insert ignore into t values (1)",
2062+
},
2063+
{
2064+
"insert-ingore-uk",
2065+
"create table t (c int unique key)",
2066+
"insert ignore into t values (1)",
2067+
},
2068+
{
2069+
"insert-update-pk",
2070+
"create table t (c int primary key clustered)",
2071+
"insert into t values (1) on duplicate key update c = values(c)",
2072+
},
2073+
{
2074+
"insert-update-uk",
2075+
"create table t (c int unique key)",
2076+
"insert into t values (1) on duplicate key update c = values(c)",
2077+
},
2078+
} {
2079+
t.Run(tt.name, func(t *testing.T) {
2080+
tk1.MustExec("drop table if exists t")
2081+
tk1.MustExec(tt.ddl)
2082+
tk1.MustExec("insert into t values (1)")
2083+
tk1.MustExec("begin")
2084+
tk1.MustExec(tt.dml)
2085+
done := make(chan struct{})
2086+
go func() {
2087+
tk2.MustExec("delete from t")
2088+
done <- struct{}{}
2089+
}()
2090+
select {
2091+
case <-done:
2092+
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
2093+
case <-time.After(100 * time.Millisecond):
2094+
}
2095+
tk1.MustExec("commit")
2096+
<-done
2097+
tk1.MustQuery("select * from t").Check([][]interface{}{})
2098+
})
2099+
}
2100+
}

executor/write.go

+20-16
Original file line numberDiff line numberDiff line change
@@ -126,22 +126,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
126126
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
127127
sc.AddAffectedRows(1)
128128
}
129-
130-
physicalID := t.Meta().ID
131-
if pt, ok := t.(table.PartitionedTable); ok {
132-
p, err := pt.GetPartitionByRow(sctx, oldData)
133-
if err != nil {
134-
return false, err
135-
}
136-
physicalID = p.GetPhysicalID()
137-
}
138-
139-
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
140-
txnCtx := sctx.GetSessionVars().TxnCtx
141-
if txnCtx.IsPessimistic {
142-
txnCtx.AddUnchangedRowKey(unchangedRowKey)
143-
}
144-
return false, nil
129+
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
130+
return false, err
145131
}
146132

147133
// Fill values into on-update-now fields, only if they are really changed.
@@ -207,6 +193,24 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
207193
return true, nil
208194
}
209195

196+
func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
197+
txnCtx := sctx.GetSessionVars().TxnCtx
198+
if !txnCtx.IsPessimistic {
199+
return false, nil
200+
}
201+
physicalID := t.Meta().ID
202+
if pt, ok := t.(table.PartitionedTable); ok {
203+
p, err := pt.GetPartitionByRow(sctx, row)
204+
if err != nil {
205+
return false, err
206+
}
207+
physicalID = p.GetPhysicalID()
208+
}
209+
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
210+
txnCtx.AddUnchangedRowKey(unchangedRowKey)
211+
return true, nil
212+
}
213+
210214
func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
211215
tableInfo := t.Meta()
212216
if !tableInfo.ContainsAutoRandomBits() {

tests/realtikvtest/pessimistictest/pessimistic_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,7 @@ func TestOptimisticConflicts(t *testing.T) {
543543
tk.MustExec("begin pessimistic")
544544
// This SQL use BatchGet and cache data in the txn snapshot.
545545
// It can be changed to other SQLs that use BatchGet.
546-
tk.MustExec("insert ignore into conflict values (1, 2)")
546+
tk.MustExec("select * from conflict where id in (1, 2, 3)")
547547

548548
tk2.MustExec("update conflict set c = c - 1")
549549

0 commit comments

Comments
 (0)