Skip to content

Commit

Permalink
change point/batch point get lock record to put
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Jul 14, 2021
1 parent b9eb721 commit 6ff9b8d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
35 changes: 35 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,23 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if len(lockKeys) > 0 {
hasKeysToLock = true
}
// Change the unique index LOCK into PUT record.
if len(indexKeys) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
membuf := e.txn.GetMemBuffer()
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
}
}
}
}
// Fetch all values.
values, err = batchGetter.BatchGet(ctx, keys)
Expand All @@ -309,6 +326,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.lock && rc {
existKeys = make([]kv.Key, 0, 2*len(values))
}
changeLockToPutIdxKeys := make([]kv.Key, 0, len(indexKeys))
e.values = make([][]byte, 0, len(values))
for i, key := range keys {
val := values[string(key)]
Expand All @@ -325,6 +343,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
existKeys = append(existKeys, key)
if len(indexKeys) > 0 {
existKeys = append(existKeys, indexKeys[i])
changeLockToPutIdxKeys = append(changeLockToPutIdxKeys, indexKeys[i])
}
}
}
Expand All @@ -338,6 +357,22 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if len(existKeys) > 0 {
hasKeysToLock = true
}
if len(changeLockToPutIdxKeys) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
for _, idxKey := range changeLockToPutIdxKeys {
membuf := e.txn.GetMemBuffer()
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
return kv.ErrNotExist
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
}
}
}
}
if hasKeysToLock {
// Update partition table IDs
Expand Down
11 changes: 11 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
// Change the unique index LOCK into PUT record.
if e.lock && len(e.handleVal) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
}
}
}
if len(e.handleVal) == 0 {
return nil
Expand Down
54 changes: 54 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,3 +2389,57 @@ func (s *testPessimisticSuite) TestIssue20975SelectForUpdateBatchPointGetWithPar
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testPessimisticSuite) TestChangeLockToPut(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("use test")
tk2.MustExec("use test")
tk.MustExec("drop table if exists tk")
tk.MustExec("create table t1(c1 varchar(20) key, c2 int, c3 int, unique key k1(c2), key k2(c3))")
tk.MustExec(`insert into t1 values ("1", 1, 1), ("2", 2, 2), ("3", 3, 3)`)

// Test point get change lock to put.
for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} {
tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode))
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk.MustExec("commit")
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk.MustExec("commit")
tk.MustExec("admin check table t1")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k1) where c2 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery(`select * from t1 use index(k1) where c2 = "3" for update`).Check(testkit.Rows("3 3 3"))
tk2.MustExec("commit")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k2) where c3 = 1`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery("select * from t1 use index(k2) where c3 > 1").Check(testkit.Rows("2 2 2", "3 3 3"))
tk2.MustExec("commit")
}

// Test batch point get change lock to put.
for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} {
tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode))
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 in ("1", "5", "3") for update`).Check(testkit.Rows("1 1 1", "3 3 3"))
tk.MustExec("commit")
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 in ("1", "2", "8") for update`).Check(testkit.Rows("1 1 1", "2 2 2"))
tk.MustExec("commit")
tk.MustExec("admin check table t1")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k1) where c2 in ("1", "2", "3") for update`).Check(testkit.Rows("1 1 1", "2 2 2", "3 3 3"))
tk2.MustQuery(`select * from t1 use index(k2) where c2 in ("2") for update`).Check(testkit.Rows("2 2 2"))
tk2.MustExec("commit")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k2) where c3 in (5, 8)`).Check(testkit.Rows())
tk2.MustQuery(`select * from t1 use index(k2) where c3 in (1, 8) for update`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery(`select * from t1 use index(k2) where c3 > 1`).Check(testkit.Rows("2 2 2", "3 3 3"))
tk2.MustExec("commit")
}

tk.MustExec("admin check table t1")
}

0 comments on commit 6ff9b8d

Please sign in to comment.