From 03bbadb2777ec8ab003123f24bc828dc48c03c31 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Wed, 28 Feb 2024 19:10:32 +0800 Subject: [PATCH] Support receving health feedback (#1153) * Register health feedback handler in batch client Signed-off-by: MyonKeminta * update protocol Signed-off-by: MyonKeminta * Collect the health status and record to the stores Signed-off-by: MyonKeminta * Remove the filter which looks not necessary Signed-off-by: MyonKeminta * Add metrics Signed-off-by: MyonKeminta * Fix checkAndUpdateStoreHealthStatus panicking Signed-off-by: MyonKeminta * Add logs Signed-off-by: MyonKeminta * Merge SetHealthFeedbackHandler to Client Signed-off-by: MyonKeminta * Try another pattern: separated callback registry Signed-off-by: MyonKeminta * Try region cache eventListener pattern Signed-off-by: MyonKeminta * Add tests Signed-off-by: MyonKeminta * Test receiving froim callback Signed-off-by: MyonKeminta * Add tests to region cache Signed-off-by: MyonKeminta * Remove unnecessary debug log Signed-off-by: MyonKeminta * Add comments Signed-off-by: MyonKeminta * remove replace of tidb repo Signed-off-by: MyonKeminta * fix build Signed-off-by: MyonKeminta * Update comments; fix lint Signed-off-by: MyonKeminta * Add the isSlow method of Store back Signed-off-by: MyonKeminta * remove unused method for now to make lint happy Signed-off-by: MyonKeminta * Address comments Signed-off-by: MyonKeminta --------- Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta --- go.mod | 2 +- go.sum | 4 +- integration_tests/async_commit_test.go | 4 +- integration_tests/go.mod | 2 +- integration_tests/go.sum | 22 +- internal/client/client.go | 30 +- internal/client/client_batch.go | 22 ++ internal/client/client_interceptor_test.go | 2 + internal/client/client_test.go | 72 ++++ .../client/mockserver/mock_tikv_service.go | 8 + internal/locate/region_cache.go | 321 ++++++++++++++---- internal/locate/region_cache_test.go | 125 ++++++- internal/locate/region_request.go | 8 +- internal/locate/region_request3_test.go | 10 +- internal/locate/region_request_test.go | 11 +- internal/locate/replica_selector_test.go | 2 +- internal/mockstore/mocktikv/rpc.go | 4 + metrics/metrics.go | 11 + tikv/client.go | 3 + tikv/kv.go | 5 +- 20 files changed, 573 insertions(+), 95 deletions(-) diff --git a/go.mod b/go.mod index 4da01d254..07058bd3b 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 + github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index a985ebf64..5febcb3e9 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik= -github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI= +github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 388d7df4f..8aa36fe98 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -76,9 +76,7 @@ type unistoreClientWrapper struct { *unistore.RPCClient } -func (c *unistoreClientWrapper) CloseAddr(addr string) error { - return nil -} +func (c *unistoreClientWrapper) SetEventListener(listener tikv.ClientEventListener) {} func (s *testAsyncCommitCommon) setUpTest() { if *withTiKV { diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 31ffc2a7b..54b567048 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 + github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f github.com/pingcap/tidb v1.1.0-beta.0.20240126041650-de177d85b19e github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index fe65d61b7..923587ae9 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4= -cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw= -cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI= +cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg= +cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= @@ -194,8 +194,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= -github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -237,8 +237,6 @@ github.com/google/pprof v0.0.0-20240117000934-35fc243c5815/go.mod h1:czg5+yv1E0Z github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= @@ -415,8 +413,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik= -github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI= +github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY= @@ -800,16 +798,10 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU= -google.golang.org/genproto v0.0.0-20240205150955-31a09d347014/go.mod h1:xEgQu1e4stdSSsxPDK8Azkrk/ECl5HvdPf6nbZrTS5M= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= -google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8= -google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 h1:x9PwdEgd11LgK+orcck69WVRo7DezSO4VUMPI4xpc8A= google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014/go.mod h1:rbHMSEDyoYX62nRVLOCc4Qt1HbsdytAYoVwgjiOhF3I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk= google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -820,8 +812,6 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY= -google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs= diff --git a/internal/client/client.go b/internal/client/client.go index b6052a4bc..0212081cb 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -108,6 +108,15 @@ type Client interface { CloseAddr(addr string) error // SendRequest sends Request. SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) + // SetEventListener registers an event listener for the Client instance. If it's called more than once, the + // previously set one will be replaced. + SetEventListener(listener ClientEventListener) +} + +// ClientEventListener is a listener to handle events produced by `Client`. +type ClientEventListener interface { + // OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information. + OnHealthFeedback(feedback *tikvpb.HealthFeedback) } type connArray struct { @@ -127,7 +136,7 @@ type connArray struct { } func newConnArray(maxSize uint, addr string, security config.Security, - idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) { + idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, eventListener *atomic.Pointer[ClientEventListener], opts []grpc.DialOption) (*connArray, error) { a := &connArray{ index: 0, v: make([]*monitoredConn, maxSize), @@ -136,7 +145,7 @@ func newConnArray(maxSize uint, addr string, security config.Security, dialTimeout: dialTimeout, monitor: m, } - if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil { + if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil { return nil, err } return a, nil @@ -228,7 +237,7 @@ func (c *monitoredConn) Close() error { return nil } -func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error { +func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, eventListener *atomic.Pointer[ClientEventListener], opts ...grpc.DialOption) error { a.target = addr opt := grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -317,6 +326,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint tikvLoad: &a.tikvTransportLayerLoad, dialTimeout: a.dialTimeout, tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false}, + eventListener: eventListener, } batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit) a.batchCommandsClients = append(a.batchCommandsClients, batchClient) @@ -401,8 +411,12 @@ type RPCClient struct { isClosed bool connMonitor *connMonitor + + eventListener *atomic.Pointer[ClientEventListener] } +var _ Client = &RPCClient{} + // NewRPCClient creates a client that manages connections and rpc calls with tikv-servers. func NewRPCClient(opts ...Opt) *RPCClient { cli := &RPCClient{ @@ -410,7 +424,8 @@ func NewRPCClient(opts ...Opt) *RPCClient { option: &option{ dialTimeout: dialTimeout, }, - connMonitor: &connMonitor{}, + connMonitor: &connMonitor{}, + eventListener: new(atomic.Pointer[ClientEventListener]), } for _, opt := range opts { opt(cli.option) @@ -462,6 +477,7 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func( enableBatch, c.option.dialTimeout, c.connMonitor, + c.eventListener, c.option.gRPCDialOptions) if err != nil { @@ -810,6 +826,12 @@ func (c *RPCClient) CloseAddr(addr string) error { return nil } +// SetEventListener registers an event listener for the Client instance. If it's called more than once, the +// previously set one will be replaced. +func (c *RPCClient) SetEventListener(listener ClientEventListener) { + c.eventListener.Store(&listener) +} + type spanInfo struct { name string dur uint64 diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index d241494b4..3964efc1f 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -551,10 +551,15 @@ type batchCommandsClient struct { closed int32 // tryLock protects client when re-create the streaming. tryLock + // sent is the number of the requests are processed by tikv server. sent atomic.Int64 // maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv maxConcurrencyRequestLimit atomic.Int64 + + // eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic + // pointer to make setting thread-safe. + eventListener *atomic.Pointer[ClientEventListener] } func (c *batchCommandsClient) isStopped() bool { @@ -694,6 +699,17 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport continue } + if resp.GetHealthFeedback() != nil { + if val, err := util.EvalFailpoint("injectHealthFeedbackSlowScore"); err == nil { + v, ok := val.(int) + if !ok || v < 0 || v > 100 { + panic(fmt.Sprintf("invalid injection in failpoint injectHealthFeedbackSlowScore: %+q", v)) + } + resp.GetHealthFeedback().SlowScore = int32(v) + } + c.onHealthFeedback(resp.GetHealthFeedback()) + } + responses := resp.GetResponses() for i, requestID := range resp.GetRequestIds() { value, ok := c.batched.Load(requestID) @@ -725,6 +741,12 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport } } +func (c *batchCommandsClient) onHealthFeedback(feedback *tikvpb.HealthFeedback) { + if h := c.eventListener.Load(); h != nil { + (*h).OnHealthFeedback(feedback) + } +} + func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *batchCommandsStream, epoch *uint64) (stopped bool) { // Forbids the batchSendLoop using the old client and // blocks other streams trying to recreate. diff --git a/internal/client/client_interceptor_test.go b/internal/client/client_interceptor_test.go index 88fc0af7e..de6c6924c 100644 --- a/internal/client/client_interceptor_test.go +++ b/internal/client/client_interceptor_test.go @@ -39,6 +39,8 @@ func (c emptyClient) CloseAddr(addr string) error { return nil } +func (c emptyClient) SetEventListener(listener ClientEventListener) {} + func TestInterceptedClient(t *testing.T) { executed := false client := NewInterceptedClient(emptyClient{}) diff --git a/internal/client/client_test.go b/internal/client/client_test.go index d4d22bfe6..8794a5935 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -154,6 +154,8 @@ func (c *chanClient) CloseAddr(addr string) error { return nil } +func (c *chanClient) SetEventListener(listener ClientEventListener) {} + func (c *chanClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { c.wg.Wait() c.ch <- req @@ -807,3 +809,73 @@ func TestPrioritySentLimit(t *testing.T) { re.Less(highDur.Load()/highQps.Load()*2, mediumDur.Load()/mediumQps.Load()) server.Stop() } + +type testClientEventListener struct { + healthFeedbackCh chan *tikvpb.HealthFeedback +} + +func newTestClientEventListener() *testClientEventListener { + return &testClientEventListener{ + healthFeedbackCh: make(chan *tikvpb.HealthFeedback, 100), + } +} + +func (l *testClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) { + l.healthFeedbackCh <- feedback +} + +func TestBatchClientReceiveHealthFeedback(t *testing.T) { + server, port := mockserver.StartMockTikvService() + require.True(t, port > 0) + require.True(t, server.IsRunning()) + defer server.Stop() + addr := server.Addr() + + client := NewRPCClient() + defer client.Close() + + conn, err := client.getConnArray(addr, true) + assert.NoError(t, err) + tikvClient := tikvpb.NewTikvClient(conn.Get()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := tikvClient.BatchCommands(ctx) + assert.NoError(t, err) + + for reqID := uint64(1); reqID <= 3; reqID++ { + assert.NoError(t, stream.Send(&tikvpb.BatchCommandsRequest{ + Requests: []*tikvpb.BatchCommandsRequest_Request{{ + Cmd: &tikvpb.BatchCommandsRequest_Request_Get{Get: &kvrpcpb.GetRequest{ + Context: &kvrpcpb.Context{}, + Key: []byte("k"), + Version: 1, + }}, + }}, + RequestIds: []uint64{reqID}, + })) + resp, err := stream.Recv() + assert.NoError(t, err) + assert.Equal(t, []uint64{reqID}, resp.GetRequestIds()) + assert.Len(t, resp.GetResponses(), 1) + assert.Equal(t, uint64(1), resp.GetHealthFeedback().GetStoreId()) + assert.Equal(t, reqID, resp.GetHealthFeedback().GetFeedbackSeqNo()) + assert.Equal(t, int32(1), resp.GetHealthFeedback().GetSlowScore()) + } + cancel() + + eventListener := newTestClientEventListener() + client.SetEventListener(eventListener) + ctx = context.Background() + resp, err := client.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), time.Second) + assert.NoError(t, err) + assert.NotNil(t, resp.Resp) + + select { + case feedback := <-eventListener.healthFeedbackCh: + assert.Equal(t, uint64(1), feedback.GetStoreId()) + assert.Equal(t, int32(1), feedback.GetSlowScore()) + default: + assert.Fail(t, "health feedback not received") + } +} diff --git a/internal/client/mockserver/mock_tikv_service.go b/internal/client/mockserver/mock_tikv_service.go index 90c1b535d..e0964f735 100644 --- a/internal/client/mockserver/mock_tikv_service.go +++ b/internal/client/mockserver/mock_tikv_service.go @@ -79,6 +79,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err } + var feedbackSeq uint64 = 1 for { req, err := ss.Recv() if err != nil { @@ -98,7 +99,14 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { err = ss.Send(&tikvpb.BatchCommandsResponse{ Responses: responses, RequestIds: req.GetRequestIds(), + HealthFeedback: &tikvpb.HealthFeedback{ + StoreId: 1, + FeedbackSeqNo: feedbackSeq, + SlowScore: 1, + }, }) + feedbackSeq++ + if err != nil { logutil.BgLogger().Error("batch commands send fail", zap.Error(err)) return err diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 03a935af0..4b8db934d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -55,6 +55,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/config" @@ -312,7 +313,7 @@ func (r *regionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex { func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool { _, s := r.accessStore(tiKVOnly, aidx) // filter label unmatched store and slow stores when ReplicaReadMode == PreferLeader - return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.isSlow())) + return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.healthStatus.IsSlow())) } func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Region, error) { @@ -707,7 +708,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) }) return false }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) - c.bg.schedule(repeat(c.checkAndUpdateStoreSlowScores), time.Duration(refreshStoreInterval/4)*time.Second) + c.bg.schedule(repeat(c.checkAndUpdateStoreHealthStatus), time.Duration(refreshStoreInterval/4)*time.Second) c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { c.bg.schedule(func(ctx context.Context, _ time.Time) bool { @@ -773,14 +774,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* // SetRegionCacheStore is used to set a store in region cache, for testing only func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - c.putStore(&Store{ - storeID: id, - storeType: storeType, - state: state, - labels: labels, - addr: addr, - peerAddr: peerAddr, - }) + c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels)) } // SetPDClient replaces pd client,for testing only @@ -2177,15 +2171,15 @@ func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (re } for _, s := range stores { if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) { - res = append(res, &Store{ - storeID: s.GetId(), - addr: s.GetAddress(), - peerAddr: s.GetPeerAddress(), - saddr: s.GetStatusAddress(), - storeType: tikvrpc.GetStoreTypeByMeta(s), - labels: s.GetLabels(), - state: uint64(resolved), - }) + res = append(res, newStore( + s.GetId(), + s.GetAddress(), + s.GetPeerAddress(), + s.GetStatusAddress(), + tikvrpc.GetStoreTypeByMeta(s), + resolved, + s.GetLabels(), + )) } } return res, nil @@ -2584,6 +2578,169 @@ func (r *Region) ContainsByEnd(key []byte) bool { (bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0) } +const ( + tikvSlowScoreDecayRate float64 = 20.0 / 60.0 // s^(-1), linear decaying + tikvSlowScoreSlowThreshold int64 = 80 + + tikvSlowScoreUpdateInterval = time.Millisecond * 100 + tikvSlowScoreUpdateFromPDInterval = time.Minute +) + +type StoreHealthStatus struct { + isSlow atomic.Bool + + // A statistic for counting the request latency to this store + clientSideSlowScore SlowScoreStat + + tikvSideSlowScore struct { + sync.Mutex + + // The following atomic fields is designed to be able to be read by atomic options directly, but only written + // while holding the mutex. + + hasTiKVFeedback atomic.Bool + score atomic.Int64 + lastUpdateTime atomic.Pointer[time.Time] + } +} + +type HealthStatusDetail struct { + ClientSideSlowScore int64 + TiKVSideSlowScore int64 +} + +func newStoreHealthStatus() *StoreHealthStatus { + return &StoreHealthStatus{} +} + +// IsSlow returns whether current Store is slow. +func (s *StoreHealthStatus) IsSlow() bool { + return s.isSlow.Load() +} + +// GetHealthStatusDetail gets the current detailed information about the store's health status. +func (s *StoreHealthStatus) GetHealthStatusDetail() HealthStatusDetail { + return HealthStatusDetail{ + ClientSideSlowScore: int64(s.clientSideSlowScore.getSlowScore()), + TiKVSideSlowScore: s.tikvSideSlowScore.score.Load(), + } +} + +// tick updates the health status that changes over time, such as slow score's decaying, etc. This function is expected +// to be called periodically. +func (s *StoreHealthStatus) tick(now time.Time) { + s.clientSideSlowScore.updateSlowScore() + s.updateTiKVServerSideSlowScoreOnTick(now) + s.updateSlowFlag() +} + +// recordClientSideSlowScoreStat records timecost of each request to update the client side slow score. +func (s *StoreHealthStatus) recordClientSideSlowScoreStat(timecost time.Duration) { + s.clientSideSlowScore.recordSlowScoreStat(timecost) + s.updateSlowFlag() +} + +// markAlreadySlow marks the related store already slow. +func (s *StoreHealthStatus) markAlreadySlow() { + s.clientSideSlowScore.markAlreadySlow() + s.updateSlowFlag() +} + +// updateTiKVServerSideSlowScoreOnTick updates the slow score actively, which is expected to be a periodic job. +// It skips updating if the last update time didn't elapse long enough, or it's being updated concurrently. +func (s *StoreHealthStatus) updateTiKVServerSideSlowScoreOnTick(now time.Time) { + if !s.tikvSideSlowScore.hasTiKVFeedback.Load() { + // Do nothing if no feedback has been received from this store yet. + return + } + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime == nil || now.Sub(*lastUpdateTime) < tikvSlowScoreUpdateFromPDInterval { + // If the first feedback is + return + } + + if !s.tikvSideSlowScore.TryLock() { + // It must be being updated concurrently. + return + } + defer s.tikvSideSlowScore.Unlock() + + // Reload update time as it might be updated concurrently before acquiring mutex + lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + elapsed := now.Sub(*lastUpdateTime) + if elapsed < tikvSlowScoreUpdateFromPDInterval { + return + } + + // TODO: Try to get store status from PD here. But it's not mandatory. + // Don't forget to update tests if getting slow score from PD is implemented here. + + // If updating from PD is not successful: decay the slow score. + score := s.tikvSideSlowScore.score.Load() + if score < 1 { + return + } + // Linear decay by time + score = max(int64(math.Round(float64(score)-tikvSlowScoreDecayRate*elapsed.Seconds())), 1) + s.tikvSideSlowScore.score.Store(score) + + newUpdateTime := new(time.Time) + *newUpdateTime = now + s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) +} + +// updateTiKVServerSideSlowScore updates the tikv side slow score with the given value. +// Ignores if the last update time didn't elapse long enough, or it's being updated concurrently. +func (s *StoreHealthStatus) updateTiKVServerSideSlowScore(score int64, currTime time.Time) { + defer s.updateSlowFlag() + + lastScore := s.tikvSideSlowScore.score.Load() + + if lastScore == score { + return + } + + lastUpdateTime := s.tikvSideSlowScore.lastUpdateTime.Load() + + if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { + return + } + + if !s.tikvSideSlowScore.TryLock() { + // It must be being updated concurrently. Skip. + return + } + defer s.tikvSideSlowScore.Unlock() + + s.tikvSideSlowScore.hasTiKVFeedback.Store(true) + // Reload update time as it might be updated concurrently before acquiring mutex + lastUpdateTime = s.tikvSideSlowScore.lastUpdateTime.Load() + if lastUpdateTime != nil && currTime.Sub(*lastUpdateTime) < tikvSlowScoreUpdateInterval { + return + } + + newScore := score + + newUpdateTime := new(time.Time) + *newUpdateTime = currTime + + s.tikvSideSlowScore.score.Store(newScore) + s.tikvSideSlowScore.lastUpdateTime.Store(newUpdateTime) +} + +func (s *StoreHealthStatus) updateSlowFlag() { + isSlow := s.clientSideSlowScore.isSlow() || s.tikvSideSlowScore.score.Load() >= tikvSlowScoreSlowThreshold + s.isSlow.Store(isSlow) +} + +// setTiKVSlowScoreLastUpdateTimeForTest force sets last update time of TiKV server side slow score to specified value. +// For test purpose only. +func (s *StoreHealthStatus) setTiKVSlowScoreLastUpdateTimeForTest(lastUpdateTime time.Time) { + s.tikvSideSlowScore.Lock() + defer s.tikvSideSlowScore.Unlock() + s.tikvSideSlowScore.lastUpdateTime.Store(&lastUpdateTime) +} + // Store contains a kv process's address. type Store struct { addr string // loaded store address @@ -2605,8 +2762,7 @@ type Store struct { livenessState uint32 unreachableSince time.Time - // A statistic for counting the request latency to this store - slowScore SlowScoreStat + healthStatus *StoreHealthStatus // A statistic for counting the flows of different replicas on this store replicaFlowsStats [numReplicaFlowsType]uint64 } @@ -2652,6 +2808,37 @@ func (s resolveState) String() string { } } +func newStore( + id uint64, + addr string, + peerAddr string, + statusAddr string, + storeType tikvrpc.EndpointType, + state resolveState, + labels []*metapb.StoreLabel, +) *Store { + return &Store{ + storeID: id, + storeType: storeType, + state: uint64(state), + labels: labels, + addr: addr, + peerAddr: peerAddr, + saddr: statusAddr, + // Make sure healthStatus field is never null. + healthStatus: newStoreHealthStatus(), + } +} + +// newUninitializedStore creates a `Store` instance with only storeID initialized. +func newUninitializedStore(id uint64) *Store { + return &Store{ + storeID: id, + // Make sure healthStatus field is never null. + healthStatus: newStoreHealthStatus(), + } +} + // IsTiFlash returns true if the storeType is TiFlash func (s *Store) IsTiFlash() bool { return s.storeType == tikvrpc.TiFlash @@ -2752,11 +2939,19 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { storeType := tikvrpc.GetStoreTypeByMeta(store) addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { - newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} + newStore := newStore( + s.storeID, + addr, + store.GetPeerAddress(), + store.GetStatusAddress(), + storeType, + resolved, + store.GetLabels(), + ) newStore.livenessState = atomic.LoadUint32(&s.livenessState) newStore.unreachableSince = s.unreachableSince if s.addr == addr { - newStore.slowScore = s.slowScore + newStore.healthStatus = s.healthStatus } c.putStore(newStore) s.setResolveState(deleted) @@ -3104,48 +3299,28 @@ func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { return } -// getSlowScore returns the slow score of store. -func (s *Store) getSlowScore() uint64 { - return s.slowScore.getSlowScore() -} - -// isSlow returns whether current Store is slow or not. -func (s *Store) isSlow() bool { - return s.slowScore.isSlow() -} - -// updateSlowScore updates the slow score of this store according to the timecost of current request. -func (s *Store) updateSlowScoreStat() { - s.slowScore.updateSlowScore() -} - -// recordSlowScoreStat records timecost of each request. -func (s *Store) recordSlowScoreStat(timecost time.Duration) { - s.slowScore.recordSlowScoreStat(timecost) -} - -// markAlreadySlow marks the related store already slow. -func (s *Store) markAlreadySlow() { - s.slowScore.markAlreadySlow() -} - -// checkAndUpdateStoreSlowScores checks and updates slowScore on each store. -func (c *RegionCache) checkAndUpdateStoreSlowScores() { +// checkAndUpdateStoreHealthStatus checks and updates health stats on each store. +func (c *RegionCache) checkAndUpdateStoreHealthStatus() { defer func() { r := recover() if r != nil { - logutil.BgLogger().Error("panic in the checkAndUpdateStoreSlowScores goroutine", + logutil.BgLogger().Error("panic in the checkAndUpdateStoreHealthStatus goroutine", zap.Any("r", r), zap.Stack("stack trace")) + if _, err := util.EvalFailpoint("doNotRecoverStoreHealthCheckPanic"); err == nil { + panic(r) + } } }() - slowScoreMetrics := make(map[uint64]float64) + healthDetails := make(map[uint64]HealthStatusDetail) + now := time.Now() c.forEachStore(func(store *Store) { - store.updateSlowScoreStat() - slowScoreMetrics[store.storeID] = float64(store.getSlowScore()) + store.healthStatus.tick(now) + healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail() }) - for store, score := range slowScoreMetrics { - metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(score) + for store, details := range healthDetails { + metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore)) + metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore)) } } @@ -3164,6 +3339,13 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { atomic.AddUint64(&s.replicaFlowsStats[destType], 1) } +func (s *Store) recordHealthFeedback(feedback *tikvpb.HealthFeedback) { + // Note that the `FeedbackSeqNo` field of `HealthFeedback` is not used yet. It's a monotonic value that can help + // to drop out-of-order feedback messages. But it's not checked for now since it's not very necessary to receive + // only a slow score. It's prepared for possible use in the future. + s.healthStatus.updateTiKVServerSideSlowScore(int64(feedback.GetSlowScore()), time.Now()) +} + // reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. func (c *RegionCache) reportStoreReplicaFlows() { c.forEachStore(func(store *Store) { @@ -3270,7 +3452,7 @@ func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { c.storeMu.Lock() store, exists := c.storeMu.stores[id] if !exists { - store = &Store{storeID: id} + store = newUninitializedStore(id) c.storeMu.stores[id] = store } c.storeMu.Unlock() @@ -3341,3 +3523,28 @@ func (c *RegionCache) markStoreNeedCheck(store *Store) { func (c *RegionCache) getCheckStoreEvents() <-chan struct{} { return c.notifyCheckCh } + +func (c *RegionCache) onHealthFeedback(feedback *tikvpb.HealthFeedback) { + store, ok := c.getStore(feedback.GetStoreId()) + if !ok { + logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId())) + return + } + store.recordHealthFeedback(feedback) +} + +// GetClientEventListener returns the listener to observe the RPC client's events and let the region cache respond to +// them. When creating the `KVStore` using `tikv.NewKVStore` function, the listener will be setup immediately. +func (c *RegionCache) GetClientEventListener() client.ClientEventListener { + return ®ionCacheClientEventListener{c: c} +} + +// regionCacheClientEventListener is the listener to let RegionCache respond to events in the RPC client. +type regionCacheClientEventListener struct { + c *RegionCache +} + +// OnHealthFeedback implements the `client.ClientEventListener` interface. +func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) { + l.c.onHealthFeedback(feedback) +} diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 397b17a65..441df39c7 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -47,8 +47,10 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" @@ -237,6 +239,8 @@ func (s *testRegionCacheSuite) SetupTest() { pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} s.cache = NewRegionCache(pdCli) s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionCacheSuite) TearDownTest() { @@ -245,6 +249,8 @@ func (s *testRegionCacheSuite) TearDownTest() { if s.onClosed != nil { s.onClosed() } + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } func (s *testRegionCacheSuite) storeAddr(id uint64) string { @@ -2081,6 +2087,123 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { }, 3*time.Second, time.Second) } +func (s *testRegionCacheSuite) TestTiKVSideSlowScore() { + stats := newStoreHealthStatus() + s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) + now := time.Now() + stats.tick(now) + s.LessOrEqual(stats.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) + s.False(stats.tikvSideSlowScore.hasTiKVFeedback.Load()) + s.False(stats.IsSlow()) + + now = now.Add(tikvSlowScoreUpdateInterval * 2) + stats.updateTiKVServerSideSlowScore(50, now) + s.Equal(int64(50), stats.GetHealthStatusDetail().TiKVSideSlowScore) + s.True(stats.tikvSideSlowScore.hasTiKVFeedback.Load()) + s.False(stats.IsSlow()) + + now = now.Add(tikvSlowScoreUpdateInterval * 2) + stats.updateTiKVServerSideSlowScore(100, now) + s.Equal(int64(100), stats.GetHealthStatusDetail().TiKVSideSlowScore) + s.True(stats.IsSlow()) + + now = now.Add(time.Minute * 2) + stats.tick(now) + s.Equal(int64(60), stats.GetHealthStatusDetail().TiKVSideSlowScore) + s.False(stats.IsSlow()) + + now = now.Add(time.Minute * 3) + stats.tick(now) + s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) + s.False(stats.IsSlow()) + + now = now.Add(time.Minute) + stats.tick(now) + s.Equal(int64(1), stats.GetHealthStatusDetail().TiKVSideSlowScore) + s.False(stats.IsSlow()) +} + +func (s *testRegionCacheSuite) TestStoreHealthStatus() { + stats := newStoreHealthStatus() + now := time.Now() + s.False(stats.IsSlow()) + + for !stats.clientSideSlowScore.isSlow() { + stats.clientSideSlowScore.recordSlowScoreStat(time.Minute) + } + stats.tick(now) + s.True(stats.IsSlow()) + s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) + + now = now.Add(time.Second) + stats.updateTiKVServerSideSlowScore(100, now) + s.True(stats.IsSlow()) + s.Equal(int64(100), stats.GetHealthStatusDetail().TiKVSideSlowScore) + + for stats.clientSideSlowScore.isSlow() { + stats.clientSideSlowScore.recordSlowScoreStat(time.Millisecond) + stats.tick(now) + } + s.True(stats.IsSlow()) + s.Equal(int64(stats.clientSideSlowScore.getSlowScore()), stats.GetHealthStatusDetail().ClientSideSlowScore) + + now = now.Add(time.Second) + stats.updateTiKVServerSideSlowScore(1, now) + s.False(stats.IsSlow()) +} + +func (s *testRegionCacheSuite) TestRegionCacheHandleHealthStatus() { + _, err := s.cache.LocateKey(s.bo, []byte("k")) + s.Nil(err) + + store1, exists := s.cache.getStore(s.store1) + s.True(exists) + s.False(store1.healthStatus.IsSlow()) + + feedbackMsg := &tikvpb.HealthFeedback{ + StoreId: s.store1, + FeedbackSeqNo: 1, + SlowScore: 100, + } + s.cache.onHealthFeedback(feedbackMsg) + s.True(store1.healthStatus.IsSlow()) + s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) + + feedbackMsg = &tikvpb.HealthFeedback{ + StoreId: s.store1, + FeedbackSeqNo: 2, + SlowScore: 90, + } + // Ignore too frequent update + s.cache.onHealthFeedback(feedbackMsg) + s.Equal(int64(100), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) + + feedbackMsg = &tikvpb.HealthFeedback{ + StoreId: s.store1, + FeedbackSeqNo: 3, + SlowScore: 90, + } + store1.healthStatus.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Second)) + s.cache.onHealthFeedback(feedbackMsg) + s.Equal(int64(90), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) + + feedbackMsg = &tikvpb.HealthFeedback{ + StoreId: s.store1, + FeedbackSeqNo: 4, + SlowScore: 50, + } + store1.healthStatus.setTiKVSlowScoreLastUpdateTimeForTest(time.Now().Add(-time.Second)) + s.cache.onHealthFeedback(feedbackMsg) + s.False(store1.healthStatus.IsSlow()) + s.Equal(int64(50), store1.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore) + + store2, exists := s.cache.getStore(s.store2) + s.True(exists) + // Store 2 is never affected by updating store 1 + s.LessOrEqual(store2.healthStatus.GetHealthStatusDetail().TiKVSideSlowScore, int64(1)) + s.False(store2.healthStatus.IsSlow()) +} + func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { _ = s.cache.refreshRegionIndex(s.bo) r, _ := s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) @@ -2089,7 +2212,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { region, _ := s.cache.LocateRegionByID(s.bo, s.region) v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} - st := &Store{storeID: s.store} + st := newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 5d7ad8935..58327f238 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -842,7 +842,7 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool } // And If the leader store is abnormal to be accessed under `ReplicaReadPreferLeader` mode, we should choose other valid followers // as candidates to serve the Read request. - if state.option.preferLeader && replica.store.isSlow() { + if state.option.preferLeader && replica.store.healthStatus.IsSlow() { return false } // Choose a replica with matched labels. @@ -1175,7 +1175,7 @@ func (s *replicaSelector) invalidateReplicaStore(replica *replica, cause error) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() // schedule a store addr resolve. s.regionCache.markStoreNeedCheck(store) - store.markAlreadySlow() + store.healthStatus.markAlreadySlow() } } @@ -1276,7 +1276,7 @@ func (s *replicaSelector) onServerIsBusy( } else if ctx != nil && ctx.Store != nil { // Mark the server is busy (the next incoming READs could be redirect // to expected followers. ) - ctx.Store.markAlreadySlow() + ctx.Store.healthStatus.markAlreadySlow() if s.canFallback2Follower() { return true, nil } @@ -1734,7 +1734,7 @@ func (s *RegionRequestSender) sendReqToRegion( } // Record timecost of external requests on related Store when `ReplicaReadMode == "PreferLeader"`. if rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) { - rpcCtx.Store.recordSlowScoreStat(rpcDuration) + rpcCtx.Store.healthStatus.recordClientSideSlowScoreStat(rpcDuration) } if s.Stats != nil { RecordRegionRequestRuntimeStats(s.Stats, req.Type, rpcDuration) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 73008a522..979ef9baf 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -89,6 +89,8 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() { s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { @@ -97,6 +99,8 @@ func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { if s.onClosed != nil { s.onClosed() } + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { @@ -404,7 +408,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner} tikvLearnerAccessIdx := len(regionStore.stores) regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx) - regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId}) + regionStore.stores = append(regionStore.stores, newUninitializedStore(tikvLearner.StoreId)) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ @@ -455,7 +459,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Add a TiFlash peer to the region. tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()} regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores)) - regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash}) + tiflashStore := newUninitializedStore(tiflash.StoreId) + tiflashStore.storeType = tikvrpc.TiFlash + regionStore.stores = append(regionStore.stores, tiflashStore) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 1d12d1f96..837d6c5ab 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -45,6 +45,7 @@ import ( "time" "unsafe" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/disaggregated" "github.com/pingcap/kvproto/pkg/errorpb" @@ -89,11 +90,15 @@ func (s *testRegionRequestToSingleStoreSuite) SetupTest() { s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToSingleStoreSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } type fnClient struct { @@ -110,6 +115,8 @@ func (f *fnClient) CloseAddr(addr string) error { return nil } +func (f *fnClient) SetEventListener(listener client.ClientEventListener) {} + func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { return f.fn(ctx, addr, req, timeout) } @@ -628,7 +635,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { // test kv load new region with new start-key and new epoch v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} - st := &Store{storeID: s.store} + st := newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) @@ -638,7 +645,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} - st = &Store{storeID: s.store} + st = newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) diff --git a/internal/locate/replica_selector_test.go b/internal/locate/replica_selector_test.go index 6b9d15fc8..6556d148a 100644 --- a/internal/locate/replica_selector_test.go +++ b/internal/locate/replica_selector_test.go @@ -487,7 +487,7 @@ func (ca *replicaSelectorAccessPathCase) run(s *testReplicaSelectorSuite) { rc := s.cache.GetCachedRegionWithRLock(loc.Region) s.NotNil(rc) for _, store := range rc.getStore().stores { - store.slowScore.resetSlowScore() + store.healthStatus.clientSideSlowScore.resetSlowScore() atomic.StoreUint32(&store.livenessState, uint32(reachable)) store.setResolveState(resolved) } diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index bd50d91b8..1e1ae5341 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" ) @@ -1095,3 +1096,6 @@ func (c *RPCClient) Close() error { func (c *RPCClient) CloseAddr(addr string) error { return nil } + +// SetEventListener does nothing. +func (c *RPCClient) SetEventListener(listener client.ClientEventListener) {} diff --git a/metrics/metrics.go b/metrics/metrics.go index e634b91b3..f5e8827b0 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -101,6 +101,7 @@ var ( TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec + TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec TiKVPreferLeaderFlowsGauge *prometheus.GaugeVec TiKVStaleReadCounter *prometheus.CounterVec TiKVStaleReadReqCounter *prometheus.CounterVec @@ -706,6 +707,15 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblStore}) + TiKVFeedbackSlowScoreGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "feedback_slow_score", + Help: "Slow scores of each tikv node that is calculated by TiKV and sent to the client by health feedback", + ConstLabels: constLabels, + }, []string{LblStore}) + TiKVPreferLeaderFlowsGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -846,6 +856,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) prometheus.MustRegister(TiKVStoreSlowScoreGauge) + prometheus.MustRegister(TiKVFeedbackSlowScoreGauge) prometheus.MustRegister(TiKVPreferLeaderFlowsGauge) prometheus.MustRegister(TiKVStaleReadCounter) prometheus.MustRegister(TiKVStaleReadReqCounter) diff --git a/tikv/client.go b/tikv/client.go index 90e10bc1a..9e92ed69d 100644 --- a/tikv/client.go +++ b/tikv/client.go @@ -44,6 +44,9 @@ import ( // It should not be used after calling Close(). type Client = client.Client +// ClientEventListener is a listener to handle events produced by `Client`. +type ClientEventListener = client.ClientEventListener + // ClientOpt defines the option to create RPC client. type ClientOpt = client.Opt diff --git a/tikv/kv.go b/tikv/kv.go index c13aa21d2..172f6f614 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -242,12 +242,13 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl return nil, err } ctx, cancel := context.WithCancel(context.Background()) + regionCache := locate.NewRegionCache(pdClient) store := &KVStore{ clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, oracle: o, pdClient: pdClient, - regionCache: locate.NewRegionCache(pdClient), + regionCache: regionCache, kv: spkv, safePoint: 0, spTime: time.Now(), @@ -257,6 +258,8 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl gP: NewSpool(128, 10*time.Second), } store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) + store.clientMu.client.SetEventListener(regionCache.GetClientEventListener()) + store.lockResolver = txnlock.NewLockResolver(store) loadOption(store, opt...)