Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/tikv/client-go into refin…
Browse files Browse the repository at this point in the history
…e-log
  • Loading branch information
crazycs520 committed Jul 31, 2023
2 parents bfe250b + 9f94221 commit ef8c596
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 10 deletions.
38 changes: 38 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2473,6 +2473,24 @@ const (
tombstone
)

// String implements fmt.Stringer interface.
func (s resolveState) String() string {
switch s {
case unresolved:
return "unresolved"
case resolved:
return "resolved"
case needCheck:
return "needCheck"
case deleted:
return "deleted"
case tombstone:
return "tombstone"
default:
return fmt.Sprintf("unknown-%v", uint64(s))
}
}

// IsTiFlash returns true if the storeType is TiFlash
func (s *Store) IsTiFlash() bool {
return s.storeType == tikvrpc.TiFlash
Expand Down Expand Up @@ -2612,6 +2630,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
logutil.BgLogger().Info("change store resolve state",
zap.Uint64("store", s.storeID),
zap.String("addr", s.addr),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("liveness-state", s.getLivenessState().String()))
return true
}
}
Expand Down Expand Up @@ -2712,6 +2736,20 @@ const (
unknown
)

// String implements fmt.Stringer interface.
func (s livenessState) String() string {
switch s {
case unreachable:
return "unreachable"
case reachable:
return "reachable"
case unknown:
return "unknown"
default:
return fmt.Sprintf("unknown-%v", uint32(s))
}
}

func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
// This mechanism doesn't support non-TiKV stores currently.
if s.storeType != tikvrpc.TiKV {
Expand Down
96 changes: 86 additions & 10 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
package locate

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -608,14 +609,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
if len(state.option.labels) > 0 {
logutil.Logger(bo.GetCtx()).Warn(
"unable to find stores with given labels",
zap.Any("labels", state.option.labels),
)
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
if leaderInvalid {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -1189,6 +1191,7 @@ func (s *RegionRequestSender) SendReqCtx(
}()
}

totalErrors := make(map[string]int)
for {
if retryTimes > 0 {
req.IsRetryRequest = true
Expand Down Expand Up @@ -1221,10 +1224,7 @@ func (s *RegionRequestSender) SendReqCtx(

// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
logutil.Logger(bo.GetCtx()).Debug(
"throwing pseudo region error due to region not found in cache",
zap.Stringer("region", &regionID),
)
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
return resp, nil, retryTimes, err
}
Expand All @@ -1250,6 +1250,8 @@ func (s *RegionRequestSender) SendReqCtx(
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
if err != nil {
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
return nil, nil, retryTimes, err
}

Expand Down Expand Up @@ -1281,14 +1283,19 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, retryTimes, err
}
if regionErr != nil {
regionErrLabel := regionErrorToLabel(regionErr)
totalErrors[regionErrLabel]++
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
return nil, nil, retryTimes, err
}
if retry {
retryTimes++
continue
}
s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, totalErrors)
} else {
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
Expand All @@ -1301,6 +1308,75 @@ func (s *RegionRequestSender) SendReqCtx(
}
}

func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
var replicaStatus []string
replicaSelectorState := "nil"
cacheRegionIsValid := "unknown"
if s.replicaSelector != nil {
switch s.replicaSelector.state.(type) {
case *accessKnownLeader:
replicaSelectorState = "accessKnownLeader"
case *accessFollower:
replicaSelectorState = "accessFollower"
case *accessByKnownProxy:
replicaSelectorState = "accessByKnownProxy"
case *tryFollower:
replicaSelectorState = "tryFollower"
case *tryNewProxy:
replicaSelectorState = "tryNewProxy"
case *invalidLeader:
replicaSelectorState = "invalidLeader"
case *invalidStore:
replicaSelectorState = "invalidStore"
case *stateBase:
replicaSelectorState = "stateBase"
case nil:
replicaSelectorState = "nil"
}
if s.replicaSelector.region != nil {
if s.replicaSelector.region.isValid() {
cacheRegionIsValid = "true"
} else {
cacheRegionIsValid = "false"
}
}
for _, replica := range s.replicaSelector.replicas {
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
replica.peer.GetId(),
replica.store.storeID,
replica.isEpochStale(),
replica.attempts,
replica.epoch,
atomic.LoadUint32(&replica.store.epoch),
replica.store.getResolveState(),
replica.store.getLivenessState(),
))
}
}
var totalErrorStr bytes.Buffer
for err, cnt := range totalErrors {
if totalErrorStr.Len() > 0 {
totalErrorStr.WriteString(", ")
}
totalErrorStr.WriteString(err)
totalErrorStr.WriteString(":")
totalErrorStr.WriteString(strconv.Itoa(cnt))
}
logutil.Logger(bo.GetCtx()).Info(msg,
zap.Uint64("req-ts", req.GetStartTS()),
zap.String("req-type", req.Type.String()),
zap.String("region", regionID.String()),
zap.String("region-is-valid", cacheRegionIsValid),
zap.Int("retry-times", retryTimes),
zap.String("replica-read-type", req.ReplicaReadType.String()),
zap.String("replica-selector-state", replicaSelectorState),
zap.Bool("stale-read", req.StaleRead),
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
zap.String("total-region-errors", totalErrorStr.String()))
}

// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
type RPCCancellerCtxKey struct{}

Expand Down
14 changes: 14 additions & 0 deletions internal/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
package retry

import (
"bytes"
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
errMsg += "\n" + err.Error()
}
}
var backoffDetail bytes.Buffer
totalTimes := 0
for name, times := range b.backoffTimes {
totalTimes += times
if backoffDetail.Len() > 0 {
backoffDetail.WriteString(", ")
}
backoffDetail.WriteString(name)
backoffDetail.WriteString(":")
backoffDetail.WriteString(strconv.Itoa(times))
}
errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String())
returnedErr := err
if longestSleepCfg != nil {
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
Expand Down
20 changes: 20 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
package kv

import (
"fmt"

"go.uber.org/atomic"
)

Expand Down Expand Up @@ -72,3 +74,21 @@ const (
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}

// String implements fmt.Stringer interface.
func (r ReplicaReadType) String() string {
switch r {
case ReplicaReadLeader:
return "leader"
case ReplicaReadFollower:
return "follower"
case ReplicaReadMixed:
return "mixed"
case ReplicaReadLearner:
return "learner"
case ReplicaReadPreferLeader:
return "prefer-leader"
default:
return fmt.Sprintf("unknown-%v", byte(r))
}
}

0 comments on commit ef8c596

Please sign in to comment.