From 50e86f7d3c1cf6b82f133df303f99f6f21f0829d Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Tue, 7 Feb 2023 12:31:34 +0800 Subject: [PATCH] Add metrics and statistics about aggressive locking (#687) Signed-off-by: MyonKeminta --- metrics/metrics.go | 10 ++++++++ metrics/shortcuts.go | 18 +++++++++++++ txnkv/transaction/txn.go | 55 ++++++++++++++++++++++++++++++++-------- util/execdetails.go | 49 ++++++++++++++++++++--------------- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index 943553b9f..ff948055b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -97,6 +97,7 @@ var ( TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec TiKVGrpcConnectionState *prometheus.GaugeVec + TiKVAggressiveLockedKeysCounter *prometheus.CounterVec ) // Label constants. @@ -598,6 +599,14 @@ func initMetrics(namespace, subsystem string) { Help: "State of gRPC connection", }, []string{"connection_id", "store_ip", "grpc_state"}) + TiKVAggressiveLockedKeysCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "aggressive_locking_count", + Help: "Counter of keys locked in aggressive locking mode", + }, []string{LblType}) + initShortcuts() } @@ -669,6 +678,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec) prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter) prometheus.MustRegister(TiKVGrpcConnectionState) + prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) } // readCounter reads the value of a prometheus.Counter. diff --git a/metrics/shortcuts.go b/metrics/shortcuts.go index dacbd9719..48e3c8e6b 100644 --- a/metrics/shortcuts.go +++ b/metrics/shortcuts.go @@ -136,6 +136,11 @@ var ( PrewriteAssertionUsageCounterExist prometheus.Counter PrewriteAssertionUsageCounterNotExist prometheus.Counter PrewriteAssertionUsageCounterUnknown prometheus.Counter + + AggressiveLockedKeysNew prometheus.Counter + AggressiveLockedKeysDerived prometheus.Counter + AggressiveLockedKeysLockedWithConflict prometheus.Counter + AggressiveLockedKeysNonForceLock prometheus.Counter ) func initShortcuts() { @@ -237,4 +242,17 @@ func initShortcuts() { PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist") PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist") PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown") + + // Counts new locks trying to acquire inside an aggressive locking stage. + AggressiveLockedKeysNew = TiKVAggressiveLockedKeysCounter.WithLabelValues("new") + // Counts locks trying to acquire inside an aggressive locking stage, but it's already locked in the previous + // aggressive locking stage (before the latest invocation to `RetryAggressiveLocking`), in which case the lock + // can be *derived* from the previous stage and no RPC is needed for the key. + AggressiveLockedKeysDerived = TiKVAggressiveLockedKeysCounter.WithLabelValues("derived") + // Counts locks that's forced acquired ignoring the WriteConflict. + AggressiveLockedKeysLockedWithConflict = TiKVAggressiveLockedKeysCounter.WithLabelValues("locked_with_conflict") + // Counts locks that's acquired within an aggressive locking stage, but with force-lock disabled (by passing + // `WakeUpMode = PessimisticLockWakeUpMode_WakeUpModeNormal`, which will disable `allow_lock_with_conflict` in + // TiKV). + AggressiveLockedKeysNonForceLock = TiKVAggressiveLockedKeysCounter.WithLabelValues("non_force_lock") } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 8d5989904..7b6def3e9 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -825,6 +825,28 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][] return keys, nil } +// collectAggressiveLockingStats collects statistics about aggressive locking and updates metrics if needed. +func (txn *KVTxn) collectAggressiveLockingStats(lockCtx *tikv.LockCtx, keys int, skippedLockKeys int, filteredAggressiveLockedKeysCount int, lockWakeUpMode kvrpcpb.PessimisticLockWakeUpMode) { + if keys > 0 { + lockCtx.Stats.AggressiveLockNewCount += keys - skippedLockKeys + + lockedWithConflictCount := 0 + for _, v := range lockCtx.Values { + if v.LockedWithConflictTS != 0 { + lockedWithConflictCount++ + } + } + lockCtx.Stats.LockedWithConflictCount += lockedWithConflictCount + metrics.AggressiveLockedKeysLockedWithConflict.Add(float64(lockedWithConflictCount)) + + if lockWakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal { + metrics.AggressiveLockedKeysNonForceLock.Add(float64(keys)) + } + } + + lockCtx.Stats.AggressiveLockDerivedCount += filteredAggressiveLockedKeysCount +} + // LockKeys tries to lock the entries with the keys in KV store. // lockCtx is the context for lock, lockCtx.lockWaitTime in ms func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput ...[]byte) error { @@ -956,7 +978,9 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } keys = deduplicateKeys(keys) checkedExistence := false + filteredAggressiveLockedKeysCount := 0 var assignedPrimaryKey bool + lockWakeUpMode := kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 { if txn.committer == nil { // sessionID is used for log. @@ -979,6 +1003,10 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() txn.committer.forUpdateTS = lockCtx.ForUpdateTS allKeys := keys + lockCtx.Stats = &util.LockKeysDetails{ + LockKeys: int32(len(keys)), + ResolveLock: util.ResolveLockDetail{}, + } // If aggressive locking is enabled and we don't need to update the primary for all locks, we can avoid sending // RPC to those already locked keys. @@ -988,24 +1016,26 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() return err } + filteredAggressiveLockedKeysCount = len(allKeys) - len(keys) + metrics.AggressiveLockedKeysDerived.Add(float64(filteredAggressiveLockedKeysCount)) + metrics.AggressiveLockedKeysNew.Add(float64(len(keys))) + if len(keys) == 0 { + if lockCtx.Stats != nil { + txn.collectAggressiveLockingStats(lockCtx, 0, 0, filteredAggressiveLockedKeysCount, lockWakeUpMode) + } return nil } } - lockWaitMode := kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeNormal if txn.aggressiveLockingContext != nil && len(keys) == 1 && !lockCtx.LockOnlyIfExists { - lockWaitMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock - } - lockCtx.Stats = &util.LockKeysDetails{ - LockKeys: int32(len(keys)), - ResolveLock: util.ResolveLockDetail{}, + lockWakeUpMode = kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock } bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars) // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1 - err = txn.committer.pessimisticLockMutations(bo, lockCtx, lockWaitMode, &PlainMutations{keys: keys}) + err = txn.committer.pessimisticLockMutations(bo, lockCtx, lockWakeUpMode, &PlainMutations{keys: keys}) if lockCtx.Stats != nil && bo.GetTotalSleep() > 0 { atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.GetTotalSleep())*int64(time.Millisecond)) lockCtx.Stats.Mu.Lock() @@ -1095,7 +1125,7 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } txn.unsetPrimaryKeyIfNeeded(lockCtx) } - skipedLockKeys := 0 + skippedLockKeys := 0 for _, key := range keys { valExists := true // PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist. @@ -1124,13 +1154,18 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() } // TODO: Fix the calculation when aggressive-locking is active if lockCtx.LockOnlyIfExists && !valExists { - skipedLockKeys++ + skippedLockKeys++ continue } memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, setValExists) } } - txn.lockedCnt += len(keys) - skipedLockKeys + + // Update statistics information. + txn.lockedCnt += len(keys) - skippedLockKeys + if txn.aggressiveLockingContext != nil && lockCtx.Stats != nil { + txn.collectAggressiveLockingStats(lockCtx, len(keys), skippedLockKeys, filteredAggressiveLockedKeysCount, lockWakeUpMode) + } return nil } diff --git a/util/execdetails.go b/util/execdetails.go index eb9418aca..64d779914 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -239,12 +239,15 @@ func (cd *CommitDetails) Clone() *CommitDetails { // LockKeysDetails contains pessimistic lock keys detail information. type LockKeysDetails struct { - TotalTime time.Duration - RegionNum int32 - LockKeys int32 - ResolveLock ResolveLockDetail - BackoffTime int64 - Mu struct { + TotalTime time.Duration + RegionNum int32 + LockKeys int32 + AggressiveLockNewCount int + AggressiveLockDerivedCount int + LockedWithConflictCount int + ResolveLock ResolveLockDetail + BackoffTime int64 + Mu struct { sync.Mutex BackoffTypes []string SlowestReqTotalTime time.Duration @@ -262,6 +265,9 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) { ld.TotalTime += lockKey.TotalTime ld.RegionNum += lockKey.RegionNum ld.LockKeys += lockKey.LockKeys + ld.AggressiveLockNewCount += lockKey.AggressiveLockNewCount + ld.AggressiveLockDerivedCount += lockKey.AggressiveLockDerivedCount + ld.LockedWithConflictCount += lockKey.LockedWithConflictCount ld.ResolveLock.ResolveLockTime += lockKey.ResolveLock.ResolveLockTime ld.BackoffTime += lockKey.BackoffTime ld.LockRPCTime += lockKey.LockRPCTime @@ -294,14 +300,17 @@ func (ld *LockKeysDetails) MergeReqDetails(reqDuration time.Duration, regionID u // Clone returns a deep copy of itself. func (ld *LockKeysDetails) Clone() *LockKeysDetails { lock := &LockKeysDetails{ - TotalTime: ld.TotalTime, - RegionNum: ld.RegionNum, - LockKeys: ld.LockKeys, - BackoffTime: ld.BackoffTime, - LockRPCTime: ld.LockRPCTime, - LockRPCCount: ld.LockRPCCount, - RetryCount: ld.RetryCount, - ResolveLock: ld.ResolveLock, + TotalTime: ld.TotalTime, + RegionNum: ld.RegionNum, + LockKeys: ld.LockKeys, + AggressiveLockNewCount: ld.AggressiveLockNewCount, + AggressiveLockDerivedCount: ld.AggressiveLockDerivedCount, + LockedWithConflictCount: ld.LockedWithConflictCount, + BackoffTime: ld.BackoffTime, + LockRPCTime: ld.LockRPCTime, + LockRPCCount: ld.LockRPCCount, + RetryCount: ld.RetryCount, + ResolveLock: ld.ResolveLock, } lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...) lock.Mu.SlowestReqTotalTime = ld.Mu.SlowestReqTotalTime @@ -321,12 +330,12 @@ type ExecDetails struct { // FormatDuration uses to format duration, this function will prune precision before format duration. // Pruning precision is for human readability. The prune rule is: -// 1. if the duration was less than 1us, return the original string. -// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as: -// 9.412345ms -> 9.41ms -// 10.412345ms -> 10.4ms -// 5.999s -> 6s -// 100.45µs -> 100.5µs +// 1. if the duration was less than 1us, return the original string. +// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as: +// 9.412345ms -> 9.41ms +// 10.412345ms -> 10.4ms +// 5.999s -> 6s +// 100.45µs -> 100.5µs func FormatDuration(d time.Duration) string { if d <= time.Microsecond { return d.String()