Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: implement a ttlManager to update the TTL of a transaction (#12177) #12579

Merged
merged 4 commits into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,6 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
prometheus.MustRegister(HandleSchemaValidate)
prometheus.MustRegister(TiKVTxnHeartBeatHistogram)
prometheus.MustRegister(GRPCConnTransientFailureCounter)
}
9 changes: 9 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,13 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "duration to push sub tasks to range task workers",
}, []string{LblType})

TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "txn_heart_beat",
Help: "Bucketed histogram of the txn_heartbeat request duration.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 18), // 1ms ~ 292s
}, []string{LblType})
)
7 changes: 3 additions & 4 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,13 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
syncCh <- struct{}{}
tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3"))

// Check outdated pessimistic lock is resolved.
// Check pessimistic lock is not resolved.
tk.MustExec("begin pessimistic")
tk.MustExec("update conflict set c = 4 where id = 1")
time.Sleep(300 * time.Millisecond)
tk2.MustExec("begin optimistic")
tk2.MustExec("update conflict set c = 5 where id = 1")
tk2.MustExec("commit")
_, err := tk.Exec("commit")
// TODO: ResolveLock block until timeout, takes about 40s, makes CI slow!
_, err := tk2.Exec("commit")
c.Check(err, NotNil)

// Update snapshotTS after a conflict, invalidate snapshot cache.
Expand Down
1 change: 0 additions & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error {
}
// If current transaction's lock exists.
if ok && dec.lock.startTS == startTS {

// If the lock has already outdated, clean up it.
if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
if err = rollbackLock(batch, dec.lock, key, startTS); err != nil {
Expand Down
84 changes: 83 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
Expand All @@ -52,11 +53,13 @@ const (
var (
tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit")
tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback")
tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok")
tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err")
)

// Global variable set by config file.
var (
PessimisticLockTTL uint64
PessimisticLockTTL uint64 = 15000 // 15s ~ 40s
)

func (ca twoPhaseCommitAction) String() string {
Expand Down Expand Up @@ -112,6 +115,8 @@ type twoPhaseCommitter struct {
isFirstLock bool
// regionTxnSize stores the number of keys involved in each region
regionTxnSize map[uint64]int
// Used by pessimistic transaction and large transaction.
ttlManager
}

type mutationEx struct {
Expand All @@ -128,6 +133,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
startTS: txn.StartTS(),
connID: connID,
regionTxnSize: map[uint64]int{},
ttlManager: ttlManager{
ch: make(chan struct{}),
},
}, nil
}

Expand Down Expand Up @@ -628,6 +636,80 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
}

type ttlManagerState uint32

const (
stateUninitialized ttlManagerState = iota
stateRunning
stateClosed
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
}

func (tm *ttlManager) run(c *twoPhaseCommitter) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
go tm.keepAlive(c)
}

func (tm *ttlManager) close() {
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateClosed)) {
return
}
close(tm.ch)
}

func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
// Ticker is set to 1/3 of the PessimisticLockTTL.
ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3)
defer ticker.Stop()
for {
select {
case <-tm.ch:
return
case <-ticker.C:
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
now, err := c.store.GetOracle().GetTimestamp(bo.ctx)
if err != nil {
err1 := bo.Backoff(BoPDRPC, err)
if err1 != nil {
logutil.Logger(context.Background()).Warn("keepAlive get tso fail",
zap.Error(err))
return
}
continue
}

uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS))
const c10min = 10 * 60 * 1000
if uptime > c10min {
// Set a 10min maximum lifetime for the ttlManager, so when something goes wrong
// the key will not be locked forever.
logutil.Logger(context.Background()).Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS))
return
}

newTTL := uptime + PessimisticLockTTL
startTime := time.Now()
_, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL)
if err != nil {
tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
logutil.Logger(context.Background()).Warn("send TxnHeartBeat failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return
}
tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
}
}

func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
Expand Down
27 changes: 24 additions & 3 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

Expand Down Expand Up @@ -539,9 +540,29 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
lockInfo := s.getLockInfo(c, key)
elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL
c.Assert(elapsedTTL, GreaterEqual, uint64(100))
c.Assert(elapsedTTL, Less, uint64(200))
lockInfo2 := s.getLockInfo(c, key2)
c.Assert(lockInfo2.LockTtl, Equals, lockInfo.LockTtl)

lr := newLockResolver(s.store)
bo := NewBackoffer(context.Background(), getMaxBackoff)
status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS)
c.Assert(err, IsNil)
c.Assert(status.ttl, Equals, lockInfo.LockTtl)

// Check primary lock TTL is auto increasing while the pessimistic txn is ongoing.
for i := 0; i < 50; i++ {
lockInfoNew := s.getLockInfo(c, key)
if lockInfoNew.LockTtl > lockInfo.LockTtl {
currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx)
c.Assert(err, IsNil)
// Check that the TTL is update to a reasonable range.
expire := oracle.ExtractPhysical(txn.startTS) + int64(lockInfoNew.LockTtl)
now := oracle.ExtractPhysical(currentTS)
c.Assert(expire > now, IsTrue)
c.Assert(uint64(expire-now) <= PessimisticLockTTL, IsTrue)
return
}
time.Sleep(100 * time.Millisecond)
}
c.Assert(false, IsTrue, Commentf("update pessimistic ttl fail"))
}

func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return errors.Trace(err)
}
}
defer committer.ttlManager.close()
if err := committer.initKeysAndMutations(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -331,6 +332,7 @@ func (txn *tikvTxn) Rollback() error {
// Clean up pessimistic lock.
if txn.IsPessimistic() && txn.committer != nil {
err := txn.rollbackPessimisticLocks()
txn.committer.ttlManager.close()
if err != nil {
logutil.Logger(context.Background()).Error(err.Error())
}
Expand Down Expand Up @@ -412,6 +414,9 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput
}
return err
}
if assignedPrimaryKey {
txn.committer.ttlManager.run(txn.committer)
}
}
txn.mu.Lock()
txn.lockKeys = append(txn.lockKeys, keys...)
Expand Down