Skip to content

Commit

Permalink
Merge branch 'release-5.0' into release-5.0-acf2e82b5227
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored May 26, 2021
2 parents 54043c7 + 1c949fb commit 6d075f4
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 22 deletions.
19 changes: 19 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,25 @@ func (s *testSerialSuite) TestIssue20840(c *C) {
tk.MustExec("drop table t1")
}

func (s *testSerialSuite) TestIssueInsertPrefixIndexForNonUTF8Collation(c *C) {
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1 ( c_int int, c_str varchar(40) character set ascii collate ascii_bin, primary key(c_int, c_str(8)) clustered , unique key(c_str))")
tk.MustExec("create table t2 ( c_int int, c_str varchar(40) character set latin1 collate latin1_bin, primary key(c_int, c_str(8)) clustered , unique key(c_str))")
tk.MustExec("insert into t1 values (3, 'fervent brattain')")
tk.MustExec("insert into t2 values (3, 'fervent brattain')")
tk.MustExec("admin check table t1")
tk.MustExec("admin check table t2")

tk.MustExec("create table t3 (x varchar(40) CHARACTER SET ascii COLLATE ascii_bin, UNIQUE KEY uk(x(4)))")
tk.MustExec("insert into t3 select 'abc '")
tk.MustGetErrCode("insert into t3 select 'abc d'", 1062)
}

func (s *testSerialSuite) TestIssue22496(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
19 changes: 12 additions & 7 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2057,9 +2057,9 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) {
conf.TiKVClient.AsyncCommit.SafeWindow = time.Second
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeSchemaCheck", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeSchemaCheck"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
}()

tk := s.newAsyncCommitTestKitWithInit(c)
Expand Down Expand Up @@ -2107,18 +2107,20 @@ func (s *testPessimisticSuite) TestAsyncCommitWithSchemaChange(c *C) {
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tk values(1, 1)")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*pause"), IsNil)
go func() {
time.Sleep(200 * time.Millisecond)
tk2.MustExec("alter table tk add index k2(c2)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
ch <- struct{}{}
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
<-ch
tk.MustQuery("select * from tk where c2 = 1").Check(testkit.Rows("1 1"))
tk3.MustExec("admin check table tk")
}

func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
c.Skip("unstable")
// TODO: implement commit_ts calculation in unistore
if !*withTiKV {
return
Expand Down Expand Up @@ -2164,13 +2166,16 @@ func (s *testPessimisticSuite) Test1PCWithSchemaChange(c *C) {
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tk values(1, 1)")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*pause"), IsNil)
go func() {
time.Sleep(200 * time.Millisecond)
tk2.MustExec("alter table tk add index k2(c2)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
ch <- struct{}{}
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "1*sleep(1200)"), IsNil)
tk.MustExec("commit")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil)
<-ch
tk.MustQuery("select * from tk where c2 = 1").Check(testkit.Rows("1 1"))
tk3.MustExec("admin check table tk")
}

Expand Down
9 changes: 1 addition & 8 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,13 +1106,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
logutil.SetTag(ctx, "commitTs", commitTS)
}

if c.sessionID > 0 {
failpoint.Inject("beforeSchemaCheck", func() {
c.ttlManager.close()
failpoint.Return()
})
}

if !c.isAsyncCommit() {
tryAmend := c.isPessimistic && c.sessionID > 0 && c.txn.schemaAmender != nil
if !tryAmend {
Expand All @@ -1136,7 +1129,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if err != nil {
logutil.Logger(ctx).Info("schema check after amend failed, it means the schema version changed again",
zap.Uint64("startTS", c.startTS),
zap.Uint64("amendTS", c.commitTS),
zap.Uint64("amendTS", commitTS),
zap.Int64("amendedSchemaVersion", relatedSchemaChange.LatestInfoSchema.SchemaMetaVersion()),
zap.Uint64("newCommitTS", newCommitTS))
return errors.Trace(err)
Expand Down
92 changes: 92 additions & 0 deletions store/tikv/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,98 @@ func (s *testAsyncCommitSuite) TestAsyncCommitWithMultiDC(c *C) {
c.Assert(globalTxn.committer.isAsyncCommit(), IsTrue)
}

func (s *testAsyncCommitSuite) TestResolveTxnFallbackFromAsyncCommit(c *C) {
keys := [][]byte{[]byte("k0"), []byte("k1")}
values := [][]byte{[]byte("v00"), []byte("v10")}
initTest := func() *twoPhaseCommitter {
t0 := s.begin(c)
err := t0.Set(keys[0], values[0])
c.Assert(err, IsNil)
err = t0.Set(keys[1], values[1])
c.Assert(err, IsNil)
err = t0.Commit(context.Background())
c.Assert(err, IsNil)

t1 := s.beginAsyncCommit(c)
err = t1.Set(keys[0], []byte("v01"))
c.Assert(err, IsNil)
err = t1.Set(keys[1], []byte("v11"))
c.Assert(err, IsNil)

committer, err := newTwoPhaseCommitterWithInit(t1, 1)
c.Assert(err, IsNil)
atomic.StoreUint64(&committer.lockTTL, 1)
committer.setAsyncCommit(true)
return committer
}
prewriteKey := func(committer *twoPhaseCommitter, idx int, fallback bool) {
bo := NewBackofferWithVars(context.Background(), 5000, nil)
loc, err := s.store.GetRegionCache().LocateKey(bo, keys[idx])
c.Assert(err, IsNil)
var batch batchMutations
batch.mutations = committer.mutations.Slice(idx, idx+1)
batch.region = RegionVerID{loc.Region.GetID(), loc.Region.GetConfVer(), loc.Region.GetVer()}
batch.isPrimary = bytes.Equal(keys[idx], committer.primary())
req := committer.buildPrewriteRequest(batch, 1)
if fallback {
req.Req.(*kvrpcpb.PrewriteRequest).MaxCommitTs = 1
}
resp, err := s.store.SendReq(bo, req, loc.Region, 5000)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
}
readKey := func(idx int) {
t2 := s.begin(c)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
val, err := t2.Get(ctx, keys[idx])
c.Assert(err, IsNil)
c.Assert(val, DeepEquals, values[idx])
}

// Case 1: Fallback primary, read primary
committer := initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(0)
readKey(1)

// Case 2: Fallback primary, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, false)
readKey(1)
readKey(0)

// Case 3: Fallback secondary, read primary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 4: Fallback secondary, read secondary
committer = initTest()
prewriteKey(committer, 0, false)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)

// Case 5: Fallback both, read primary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(0)
readKey(1)

// Case 6: Fallback both, read secondary
committer = initTest()
prewriteKey(committer, 0, true)
prewriteKey(committer, 1, true)
readKey(1)
readKey(0)
}

type mockResolveClient struct {
inner Client
onResolveLock func(*kvrpcpb.ResolveLockRequest) (*tikvrpc.Response, error)
Expand Down
7 changes: 1 addition & 6 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,7 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
var currentTS uint64
var err error
var status TxnStatus
if l.UseAsyncCommit && !forceSyncCommit {
// Async commit doesn't need the current ts since it uses the minCommitTS.
currentTS = 0
// Set to 0 so as not to push forward min commit ts.
callerStartTS = 0
} else if l.TTL == 0 {
if l.TTL == 0 {
// NOTE: l.TTL = 0 is a special protocol!!!
// When the pessimistic txn prewrite meets locks of a txn, it should resolve the lock **unconditionally**.
// In this case, TiKV use lock TTL = 0 to notify TiDB, and TiDB should resolve the lock!
Expand Down
3 changes: 2 additions & 1 deletion tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ func TruncateIndexValue(v *types.Datum, idxCol *model.IndexColumn, tblCol *model
if notStringType {
return
}
originalKind := v.Kind()
isUTF8Charset := tblCol.Charset == charset.CharsetUTF8 || tblCol.Charset == charset.CharsetUTF8MB4
if isUTF8Charset && utf8.RuneCount(v.GetBytes()) > idxCol.Length {
rs := bytes.Runes(v.GetBytes())
Expand All @@ -1303,7 +1304,7 @@ func TruncateIndexValue(v *types.Datum, idxCol *model.IndexColumn, tblCol *model
}
} else if !isUTF8Charset && len(v.GetBytes()) > idxCol.Length {
v.SetBytes(v.GetBytes()[:idxCol.Length])
if v.Kind() == types.KindString {
if originalKind == types.KindString {
v.SetString(v.GetString(), tblCol.Collate)
}
}
Expand Down

0 comments on commit 6d075f4

Please sign in to comment.