Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement the TxnHeartBeat API for the large transaction #11979

Merged
merged 19 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ require (
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)

replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d

go 1.13
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c h1:hvQd3aOLKLF7x
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d h1:/yAB1ikjMxGOyBCYO8O+POBlarPfcMjJkhVy2ZHENCA=
github.com/pingcap/kvproto v0.0.0-20190829095345-6a581f25586d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
23 changes: 23 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limi
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
s.mustPrewriteOK1(c, mutations, primary, startTS, 0)
}

func (s *testMockTiKVSuite) mustPrewriteOK1(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64, ttl uint64) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: []byte(primary),
Expand Down Expand Up @@ -650,3 +654,22 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
c.Assert(ErrAlreadyCommitted(0).Error(), Equals, "txn already committed")
c.Assert((&ErrConflict{}).Error(), Equals, "write conflict")
}

func (s *testMVCCLevelDB) TestTxnHeartBeat(c *C) {
s.mustPrewriteOK1(c, putMutations("pk", "val"), "pk", 5, 666)

// Update the ttl
ttl, err := s.store.TxnHeartBeat([]byte("pk"), 5, 888)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(666))

// Advise ttl is small
ttl, err = s.store.TxnHeartBeat([]byte("pk"), 5, 300)
c.Assert(err, IsNil)
c.Assert(ttl, Greater, uint64(300))

// The lock has already been clean up
c.Assert(s.store.Cleanup([]byte("pk"), 5), IsNil)
_, err = s.store.TxnHeartBeat([]byte("pk"), 5, 1000)
c.Assert(err, NotNil)
}
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ type MVCCStore interface {
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
TxnHeartBeat(primaryKey []byte, startTS uint64, adviseTTL uint64) (uint64, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
GC(startKey, endKey []byte, safePoint uint64) error
Expand Down
40 changes: 40 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,46 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS uint64) error {
return mvcc.db.Write(batch, nil)
}

// TxnHeartBeat implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint64) (uint64, error) {
startKey := mvccEncode(key, lockVer)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

if iter.Valid() {
dec := lockDecoder{
expectKey: key,
}
ok, err := dec.Decode(iter)
if err != nil {
return 0, errors.Trace(err)
}
if ok && dec.lock.startTS == startTS {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
lock := dec.lock
batch := &leveldb.Batch{}
// Increase the ttl of this transaction.
if adviseTTL < lock.ttl {
lock.ttl = lock.ttl/2 + adviseTTL/2
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
} else {
lock.ttl = adviseTTL
}
writeKey := mvccEncode(key, lockVer)
writeValue, err := lock.MarshalBinary()
if err != nil {
return 0, errors.Trace(err)
}
batch.Put(writeKey, writeValue)
if err = mvcc.db.Write(batch, nil); err != nil {
return 0, errors.Trace(err)
}
return lock.ttl, nil
}
}
return 0, errors.New("lock doesn't exist")
}

// ScanLock implements the MVCCStore interface.
func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error) {
mvcc.mu.RLock()
Expand Down
20 changes: 20 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.Clean
return &resp
}

func (h *rpcHandler) handleTxnHeartBeat(req *kvrpcpb.TxnHeartBeatRequest) *kvrpcpb.TxnHeartBeatResponse {
if !h.checkKeyInRegion(req.PrimaryLock) {
panic("KvTxnHeartBeat: key not in region")
}
var resp kvrpcpb.TxnHeartBeatResponse
ttl, err := h.mvccStore.TxnHeartBeat(req.PrimaryLock, req.StartVersion, req.AdviseLockTtl)
if err != nil {
resp.Error = convertToKeyError(err)
}
resp.LockTtl = ttl
return &resp
}

func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
Expand Down Expand Up @@ -766,6 +779,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.Resp = handler.handleKvCleanup(r)
case tikvrpc.CmdTxnHeartBeat:
r := req.TxnHeartBeat()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Resp = &kvrpcpb.TxnHeartBeatResponse{RegionError: err}
return resp, nil
}
resp.Resp = handler.handleTxnHeartBeat(r)
case tikvrpc.CmdBatchGet:
r := req.BatchGet()
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
Expand Down
37 changes: 37 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,43 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
}, nil
}

func sendTxnHeartBeat(bo *Backoffer, store *tikvStore, primary []byte, startTS, ttl uint64) error {
req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &pb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
AdviseLockTtl: ttl,
})
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*pb.TxnHeartBeatResponse)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if keyErr := cmdResp.GetError(); keyErr != nil {
return errors.Errorf("txn %d heartbeat fail, primary key = %v, err = %s", startTS, primary, keyErr.Abort)
}
return nil
}
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
var (
keys [][]byte
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,24 @@ func (s *testLockSuite) TestGetTxnStatus(c *C) {
c.Assert(status.IsCommitted(), IsFalse)
}

func (s *testLockSuite) TestTxnHeartBeat(c *C) {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
s.prewriteTxn(c, txn.(*tikvTxn))

bo := NewBackoffer(context.Background(), prewriteMaxBackoff)
err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, IsNil)

tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
// The getTxnStatus API is confusing, it really means rollback!
_, err = newLockResolver(s.store).getTxnStatus(bo, txn.StartTS(), []byte("key"))
c.Assert(err, IsNil)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved

err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
c.Assert(err, NotNil)
}

func (s *testLockSuite) prewriteTxn(c *C, txn *tikvTxn) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
Expand Down
8 changes: 8 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,14 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) KvCheckTxnStatus(ctx context.Context, in *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
return nil, errors.New("unreachable")
}

func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
// prepare a mock tikv grpc server
addr := "localhost:56341"
Expand Down
14 changes: 14 additions & 0 deletions store/tikv/tikvrpc/tikvrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
CmdDeleteRange
CmdPessimisticLock
CmdPessimisticRollback
CmdTxnHeartBeat

CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
Expand Down Expand Up @@ -129,6 +130,8 @@ func (t CmdType) String() string {
return "SplitRegion"
case CmdDebugGetRegionProperties:
return "DebugGetRegionProperties"
case CmdTxnHeartBeat:
return "TxnHeartBeat"
}
return "Unknown"
}
Expand Down Expand Up @@ -304,6 +307,11 @@ func (req *Request) Empty() *tikvpb.BatchCommandsEmptyRequest {
return req.req.(*tikvpb.BatchCommandsEmptyRequest)
}

// TxnHeartBeat returns TxnHeartBeatRequest in request.
func (req *Request) TxnHeartBeat() *kvrpcpb.TxnHeartBeatRequest {
return req.req.(*kvrpcpb.TxnHeartBeatRequest)
}

// ToBatchCommandsRequest converts the request to an entry in BatchCommands request.
func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request {
switch req.Type {
Expand Down Expand Up @@ -353,6 +361,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback()}}
case CmdEmpty:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Empty{Empty: req.Empty()}}
case CmdTxnHeartBeat:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat()}}
}
return nil
}
Expand Down Expand Up @@ -498,6 +508,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
req.SplitRegion().Context = ctx
case CmdEmpty:
req.SplitRegion().Context = ctx
case CmdTxnHeartBeat:
req.TxnHeartBeat().Context = ctx
default:
return fmt.Errorf("invalid request type %v", req.Type)
}
Expand Down Expand Up @@ -714,6 +726,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.SplitRegion(ctx, req.SplitRegion())
case CmdEmpty:
resp.Resp, err = &tikvpb.BatchCommandsEmptyResponse{}, nil
case CmdTxnHeartBeat:
resp.Resp, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
Expand Down
2 changes: 2 additions & 0 deletions tools/check/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ require (
gopkg.in/yaml.v2 v2.2.2 // indirect
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3
)

go 1.13