diff --git a/metrics/metrics.go b/metrics/metrics.go index a09390ab9e33c..60efda0a1df28 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -151,5 +151,6 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVRangeTaskStats) prometheus.MustRegister(TiKVRangeTaskPushDuration) prometheus.MustRegister(HandleSchemaValidate) + prometheus.MustRegister(TiKVTxnHeartBeatHistogram) prometheus.MustRegister(GRPCConnTransientFailureCounter) } diff --git a/metrics/tikvclient.go b/metrics/tikvclient.go index ce6be01ce596d..f1e52ddde43ab 100644 --- a/metrics/tikvclient.go +++ b/metrics/tikvclient.go @@ -223,4 +223,13 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), Help: "duration to push sub tasks to range task workers", }, []string{LblType}) + + TiKVTxnHeartBeatHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tidb", + Subsystem: "tikvclient", + Name: "txn_heart_beat", + Help: "Bucketed histogram of the txn_heartbeat request duration.", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 18), // 1ms ~ 292s + }, []string{LblType}) ) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 3dbe8aa7566f7..98abee8433e2d 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -365,14 +365,13 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) { syncCh <- struct{}{} tk.MustQuery("select c from conflict where id = 1").Check(testkit.Rows("3")) - // Check outdated pessimistic lock is resolved. + // Check pessimistic lock is not resolved. tk.MustExec("begin pessimistic") tk.MustExec("update conflict set c = 4 where id = 1") - time.Sleep(300 * time.Millisecond) tk2.MustExec("begin optimistic") tk2.MustExec("update conflict set c = 5 where id = 1") - tk2.MustExec("commit") - _, err := tk.Exec("commit") + // TODO: ResolveLock block until timeout, takes about 40s, makes CI slow! + _, err := tk2.Exec("commit") c.Check(err, NotNil) // Update snapshotTS after a conflict, invalidate snapshot cache. diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index 8753ad3033dda..3a30ba637d627 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -986,7 +986,6 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { } // If current transaction's lock exists. if ok && dec.lock.startTS == startTS { - // If the lock has already outdated, clean up it. if currentTS == 0 || uint64(oracle.ExtractPhysical(dec.lock.startTS))+dec.lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { if err = rollbackLock(batch, dec.lock, key, startTS); err != nil { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 1af2cbd45865a..88cd7a8722272 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/execdetails" @@ -52,11 +53,13 @@ const ( var ( tikvSecondaryLockCleanupFailureCounterCommit = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit") tikvSecondaryLockCleanupFailureCounterRollback = metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback") + tiKVTxnHeartBeatHistogramOK = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("ok") + tiKVTxnHeartBeatHistogramError = metrics.TiKVTxnHeartBeatHistogram.WithLabelValues("err") ) // Global variable set by config file. var ( - PessimisticLockTTL uint64 + PessimisticLockTTL uint64 = 15000 // 15s ~ 40s ) func (ca twoPhaseCommitAction) String() string { @@ -112,6 +115,8 @@ type twoPhaseCommitter struct { isFirstLock bool // regionTxnSize stores the number of keys involved in each region regionTxnSize map[uint64]int + // Used by pessimistic transaction and large transaction. + ttlManager } type mutationEx struct { @@ -128,6 +133,9 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro startTS: txn.StartTS(), connID: connID, regionTxnSize: map[uint64]int{}, + ttlManager: ttlManager{ + ch: make(chan struct{}), + }, }, nil } @@ -628,6 +636,80 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } } +type ttlManagerState uint32 + +const ( + stateUninitialized ttlManagerState = iota + stateRunning + stateClosed +) + +type ttlManager struct { + state ttlManagerState + ch chan struct{} +} + +func (tm *ttlManager) run(c *twoPhaseCommitter) { + // Run only once. + if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) { + return + } + go tm.keepAlive(c) +} + +func (tm *ttlManager) close() { + if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateRunning), uint32(stateClosed)) { + return + } + close(tm.ch) +} + +func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { + // Ticker is set to 1/3 of the PessimisticLockTTL. + ticker := time.NewTicker(time.Duration(PessimisticLockTTL) * time.Millisecond / 3) + defer ticker.Stop() + for { + select { + case <-tm.ch: + return + case <-ticker.C: + bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff) + now, err := c.store.GetOracle().GetTimestamp(bo.ctx) + if err != nil { + err1 := bo.Backoff(BoPDRPC, err) + if err1 != nil { + logutil.Logger(context.Background()).Warn("keepAlive get tso fail", + zap.Error(err)) + return + } + continue + } + + uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) + const c10min = 10 * 60 * 1000 + if uptime > c10min { + // Set a 10min maximum lifetime for the ttlManager, so when something goes wrong + // the key will not be locked forever. + logutil.Logger(context.Background()).Info("ttlManager live up to its lifetime", + zap.Uint64("txnStartTS", c.startTS)) + return + } + + newTTL := uptime + PessimisticLockTTL + startTime := time.Now() + _, err = sendTxnHeartBeat(bo, c.store, c.primary(), c.startTS, newTTL) + if err != nil { + tiKVTxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) + logutil.Logger(context.Background()).Warn("send TxnHeartBeat failed", + zap.Error(err), + zap.Uint64("txnStartTS", c.startTS)) + return + } + tiKVTxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) + } + } +} + func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error { mutations := make([]*pb.Mutation, len(batch.keys)) for i, k := range batch.keys { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index a92f33fdff9fd..05b932ff4562c 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" ) @@ -539,9 +540,29 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { lockInfo := s.getLockInfo(c, key) elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL c.Assert(elapsedTTL, GreaterEqual, uint64(100)) - c.Assert(elapsedTTL, Less, uint64(200)) - lockInfo2 := s.getLockInfo(c, key2) - c.Assert(lockInfo2.LockTtl, Equals, lockInfo.LockTtl) + + lr := newLockResolver(s.store) + bo := NewBackoffer(context.Background(), getMaxBackoff) + status, err := lr.getTxnStatus(bo, txn.startTS, key2, txn.startTS) + c.Assert(err, IsNil) + c.Assert(status.ttl, Equals, lockInfo.LockTtl) + + // Check primary lock TTL is auto increasing while the pessimistic txn is ongoing. + for i := 0; i < 50; i++ { + lockInfoNew := s.getLockInfo(c, key) + if lockInfoNew.LockTtl > lockInfo.LockTtl { + currentTS, err := lr.store.GetOracle().GetTimestamp(bo.ctx) + c.Assert(err, IsNil) + // Check that the TTL is update to a reasonable range. + expire := oracle.ExtractPhysical(txn.startTS) + int64(lockInfoNew.LockTtl) + now := oracle.ExtractPhysical(currentTS) + c.Assert(expire > now, IsTrue) + c.Assert(uint64(expire-now) <= PessimisticLockTTL, IsTrue) + return + } + time.Sleep(100 * time.Millisecond) + } + c.Assert(false, IsTrue, Commentf("update pessimistic ttl fail")) } func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 7d671c0be6495..01a32795437d0 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -273,6 +273,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { return errors.Trace(err) } } + defer committer.ttlManager.close() if err := committer.initKeysAndMutations(); err != nil { return errors.Trace(err) } @@ -331,6 +332,7 @@ func (txn *tikvTxn) Rollback() error { // Clean up pessimistic lock. if txn.IsPessimistic() && txn.committer != nil { err := txn.rollbackPessimisticLocks() + txn.committer.ttlManager.close() if err != nil { logutil.Logger(context.Background()).Error(err.Error()) } @@ -412,6 +414,9 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keysInput } return err } + if assignedPrimaryKey { + txn.committer.ttlManager.run(txn.committer) + } } txn.mu.Lock() txn.lockKeys = append(txn.lockKeys, keys...)