From 8a214402da13d5bdc1ffb1d2726bdbb47f9beb74 Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 11 Aug 2023 11:37:10 +0800 Subject: [PATCH] use tidb_kv_read_timeout as first kv request timeout (#919) * support tidb_kv_read_timeout as first round kv request timeout Signed-off-by: crazycs520 * fix ci Signed-off-by: crazycs520 * fix ci Signed-off-by: crazycs520 * fix ci Signed-off-by: crazycs520 * fix ci Signed-off-by: crazycs520 * fix ci Signed-off-by: crazycs520 * update comment Signed-off-by: crazycs520 * refine test Signed-off-by: crazycs520 --------- Signed-off-by: crazycs520 --- examples/gcworker/gcworker.go | 1 - internal/client/client_batch.go | 6 ++ internal/locate/region_request.go | 77 +++++++++++++++++--- internal/locate/region_request3_test.go | 95 +++++++++++++++++++++++-- txnkv/txnsnapshot/snapshot.go | 29 +++++++- 5 files changed, 191 insertions(+), 17 deletions(-) diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index 1191adcd2..1deafdd7e 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -37,7 +37,6 @@ func main() { panic(err) } - sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10)) if err != nil { panic(err) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index c8842e9c2..6a270bc6d 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } func (a *batchConn) getClientAndSend() { + if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil { + if timeout, ok := val.(int); ok && timeout > 0 { + time.Sleep(time.Duration(timeout * int(time.Millisecond))) + } + } + // Choose a connection by round-robbin. var ( cli *batchCommandsClient diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9f4840588..e071d739a 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -245,6 +245,8 @@ type replica struct { peer *metapb.Peer epoch uint32 attempts int + // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. + deadlineErrUsingConfTimeout bool } func (r *replica) isEpochStale() bool { @@ -377,7 +379,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep } func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { - selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true} } // tryFollower is the state where we cannot access the known leader @@ -391,19 +393,23 @@ type tryFollower struct { stateBase leaderIdx AccessIndex lastIdx AccessIndex + // fromOnNotLeader indicates whether the state is changed from onNotLeader. + fromOnNotLeader bool } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { var targetReplica *replica + hasDeadlineExceededErr := false // Search replica that is not attempted from the last accessed replica for i := 1; i < len(selector.replicas); i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + targetReplica = selector.replicas[idx] + hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout if idx == state.leaderIdx { continue } - targetReplica = selector.replicas[idx] // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable { + if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout { state.lastIdx = idx selector.targetIdx = idx break @@ -411,16 +417,33 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { + if hasDeadlineExceededErr { + // when meet deadline exceeded error, do fast retry without invalidate region cache. + return nil, nil + } metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return nil, err + } + // If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used. + if !state.fromOnNotLeader { + replicaRead := selector.targetIdx != state.leaderIdx + rpcCtx.contextPatcher.replicaRead = &replicaRead + } + disableStaleRead := false + rpcCtx.contextPatcher.staleRead = &disableStaleRead + return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + if state.fromOnNotLeader { + if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { + panic("the store must exist") + } } } @@ -617,6 +640,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector zap.Bool("leader-invalid", leaderInvalid), zap.Any("labels", state.option.labels)) } + // If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout. + if leader.deadlineErrUsingConfTimeout { + return nil, nil + } if leaderInvalid { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() @@ -665,7 +692,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { // the epoch is staled or retry exhausted, or the store is unreachable. - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable { + if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } // The request can only be sent to the leader. @@ -947,6 +974,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { s.state.onSendFailure(bo, s, err) } +func (s *replicaSelector) onDeadlineExceeded() { + if target := s.targetReplica(); target != nil { + target.deadlineErrUsingConfTimeout = true + } + if accessLeader, ok := s.state.(*accessKnownLeader); ok { + // If leader return deadline exceeded error, we should try to access follower next time. + s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx} + } +} + func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState { store := accessReplica.store liveness := store.requestLiveness(bo, s.regionCache) @@ -1608,7 +1645,7 @@ func (s *RegionRequestSender) sendReqToRegion( return nil, false, err } } - if e := s.onSendFail(bo, rpcCtx, err); e != nil { + if e := s.onSendFail(bo, rpcCtx, req, err); e != nil { return nil, false, err } return nil, true, nil @@ -1638,7 +1675,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) { logutil.BgLogger().Warn("release store token failed, count equals to 0") } -func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error { +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -1649,6 +1686,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e return errors.WithStack(err) } else if LoadShuttingDown() > 0 { return errors.WithStack(tikverr.ErrTiDBShuttingDown) + } else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) { + if s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + return nil + } } if status.Code(errors.Cause(err)) == codes.Canceled { select { @@ -1740,6 +1782,9 @@ func regionErrorToLabel(e *errorpb.Error) string { } else if e.GetEpochNotMatch() != nil { return "epoch_not_match" } else if e.GetServerIsBusy() != nil { + if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") { + return "deadline_exceeded" + } return "server_is_busy" } else if e.GetStaleCommand() != nil { return "stale_command" @@ -1767,10 +1812,16 @@ func regionErrorToLabel(e *errorpb.Error) string { return "flashback_not_prepared" } else if e.GetIsWitness() != nil { return "peer_is_witness" + } else if isDeadlineExceeded(e) { + return "deadline_exceeded" } return "unknown" } +func isDeadlineExceeded(e *errorpb.Error) bool { + return strings.Contains(e.GetMessage(), "Deadline is exceeded") +} + func (s *RegionRequestSender) onRegionError( bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, ) (shouldRetry bool, err error) { @@ -1918,6 +1969,10 @@ func (s *RegionRequestSender) onRegionError( } if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil { + if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") { + s.replicaSelector.onDeadlineExceeded() + return true, nil + } if s.replicaSelector != nil { return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy) } @@ -2046,6 +2101,10 @@ func (s *RegionRequestSender) onRegionError( return true, nil } + if isDeadlineExceeded(regionErr) && s.replicaSelector != nil { + s.replicaSelector.onDeadlineExceeded() + } + logutil.Logger(bo.GetCtx()).Debug( "tikv reports region failed", zap.Stringer("regionErr", regionErr), diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 91f7f22f1..f56b4022f 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -46,15 +46,18 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pkg/errors" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/apicodec" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" ) func TestRegionRequestToThreeStores(t *testing.T) { @@ -711,7 +714,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { // Normal bo := retry.NewBackoffer(context.Background(), -1) sender := s.regionRequestSender - resp, _, err := sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.True(bo.GetTotalBackoffTimes() == 0) @@ -720,7 +723,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { bo = retry.NewBackoffer(context.Background(), -1) s.cluster.ChangeLeader(s.regionID, s.peerIDs[1]) s.cluster.StopStore(s.storeIDs[0]) - resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -729,8 +732,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { atomic.StoreUint32(®ionStore.stores[0].livenessState, uint32(reachable)) // Leader is updated because of send success, so no backoff. + reloadRegion() bo = retry.NewBackoffer(context.Background(), -1) - resp, _, err = sender.SendReq(bo, req, region.Region, time.Second) + resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) s.Nil(err) s.NotNil(resp) s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) @@ -1092,7 +1096,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.NotEqual(leaderAddr, "") for i := 0; i < 10; i++ { bo := retry.NewBackofferWithVars(context.Background(), 100, nil) - resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) @@ -1135,7 +1139,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() for i := 0; i < 100; i++ { bo := retry.NewBackofferWithVars(context.Background(), 1, nil) - resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp) // since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0. @@ -1199,3 +1203,84 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.True(ok) s.Equal(getResp.Value, value) } + +func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() { + leaderAddr := "" + reqTargetAddrs := make(map[string]struct{}) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + bo := retry.NewBackoffer(context.Background(), 10000) + mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + reqTargetAddrs[addr] = struct{}{} + if req.Context.MaxExecutionDurationMs < 10 { + return nil, context.DeadlineExceeded + } + if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + }} + getLocFn := func() *KeyLocation { + loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a")) + s.Nil(err) + region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region) + leaderStore, _, _, _ := region.WorkStorePeer(region.getStore()) + leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore) + s.Nil(err) + return loc + } + resetStats := func() { + reqTargetAddrs = make(map[string]struct{}) + s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient) + s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats() + } + + //Test different read type. + staleReadTypes := []bool{false, true} + replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed} + for _, staleRead := range staleReadTypes { + for _, tp := range replicaReadTypes { + log.Info("TestSendReqFirstTimeout", zap.Bool("stale-read", staleRead), zap.String("replica-read-type", tp.String())) + resetStats() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{}) + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + loc := getLocFn() + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV) + s.Nil(err) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.True(IsFakeRegionError(regionErr)) + s.Equal(1, len(s.regionRequestSender.Stats)) + if staleRead { + rpcNum := s.regionRequestSender.Stats[tikvrpc.CmdGet].Count + s.True(rpcNum == 1 || rpcNum == 2) // 1 rpc or 2 rpc + } else { + s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 3 rpc + s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store. + } + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + // warn: must rest MaxExecutionDurationMs before retry. + resetStats() + if staleRead { + req.EnableStaleRead() + } else { + req.ReplicaRead = tp.IsFollowerRead() + req.ReplicaReadType = tp + } + req.Context.MaxExecutionDurationMs = 0 + resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV) + s.Nil(err) + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) + s.Equal(1, len(s.regionRequestSender.Stats)) + s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 1 rpc + s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry. + } + } +} diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index a4253d62e..ab2965535 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -118,6 +118,7 @@ type KVSnapshot struct { resolvedLocks util.TSSet committedLocks util.TSSet scanBatchSize int + readTimeout time.Duration // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, @@ -387,6 +388,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, pending := batch.keys var resolvingRecordToken *int + useConfigurableKVTimeout := true for { s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{ @@ -416,6 +418,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, if isStaleness { req.EnableStaleRead() } + timeout := client.ReadTimeoutMedium + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) ops := make([]locate.StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, locate.WithMatchLabels(matchStoreLabels)) @@ -427,7 +435,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } req.ReplicaReadType = readType } - resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, client.ReadTimeoutMedium, tikvrpc.TiKV, "", ops...) + resp, _, _, err := cli.SendReqCtx(bo, req, batch.region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return err } @@ -651,13 +659,20 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] var firstLock *txnlock.Lock var resolvingRecordToken *int + useConfigurableKVTimeout := true for { util.EvalFailpoint("beforeSendPointGet") loc, err := s.store.GetRegionCache().LocateKey(bo, k) if err != nil { return nil, err } - resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV, "", ops...) + timeout := client.ReadTimeoutShort + if useConfigurableKVTimeout && s.readTimeout > 0 { + useConfigurableKVTimeout = false + timeout = s.readTimeout + } + req.MaxExecutionDurationMs = uint64(timeout.Milliseconds()) + resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, timeout, tikvrpc.TiKV, "", ops...) if err != nil { return nil, err } @@ -984,6 +999,16 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.R } } +// SetKVReadTimeout sets timeout for individual KV read operations under this snapshot +func (s *KVSnapshot) SetKVReadTimeout(readTimeout time.Duration) { + s.readTimeout = readTimeout +} + +// GetKVReadTimeout returns timeout for individual KV read operations under this snapshot or 0 if timeout is not set +func (s *KVSnapshot) GetKVReadTimeout() time.Duration { + return s.readTimeout +} + func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { s.mu.RLock() defer s.mu.RUnlock()