Skip to content

Commit

Permalink
Support receving health feedback (#1153)
Browse files Browse the repository at this point in the history
* Register health feedback handler in batch client

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* update protocol

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Collect the health status and record to the stores

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Remove the filter which looks not necessary

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add metrics

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix checkAndUpdateStoreHealthStatus panicking

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add logs

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Merge SetHealthFeedbackHandler to Client

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Try another pattern: separated callback registry

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Try region cache eventListener pattern

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add tests

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Test receiving froim callback

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add tests to region cache

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Remove unnecessary debug log

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* remove replace of tidb repo

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* fix build

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Update comments; fix lint

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add the isSlow method of Store back

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* remove unused method for now to make lint happy

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

---------

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta and MyonKeminta authored Feb 28, 2024
1 parent a305dde commit 03bbadb
Show file tree
Hide file tree
Showing 20 changed files with 573 additions and 95 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ type unistoreClientWrapper struct {
*unistore.RPCClient
}

func (c *unistoreClientWrapper) CloseAddr(addr string) error {
return nil
}
func (c *unistoreClientWrapper) SetEventListener(listener tikv.ClientEventListener) {}

func (s *testAsyncCommitCommon) setUpTest() {
if *withTiKV {
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f
github.com/pingcap/tidb v1.1.0-beta.0.20240126041650-de177d85b19e
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
Expand Down
22 changes: 6 additions & 16 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM=
cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4=
cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw=
cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI=
cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg=
cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
Expand Down Expand Up @@ -194,8 +194,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -237,8 +237,6 @@ github.com/google/pprof v0.0.0-20240117000934-35fc243c5815/go.mod h1:czg5+yv1E0Z
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
Expand Down Expand Up @@ -415,8 +413,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
Expand Down Expand Up @@ -800,16 +798,10 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU=
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014/go.mod h1:xEgQu1e4stdSSsxPDK8Azkrk/ECl5HvdPf6nbZrTS5M=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 h1:x9PwdEgd11LgK+orcck69WVRo7DezSO4VUMPI4xpc8A=
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014/go.mod h1:rbHMSEDyoYX62nRVLOCc4Qt1HbsdytAYoVwgjiOhF3I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand All @@ -820,8 +812,6 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
Expand Down
30 changes: 26 additions & 4 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ type Client interface {
CloseAddr(addr string) error
// SendRequest sends Request.
SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
// previously set one will be replaced.
SetEventListener(listener ClientEventListener)
}

// ClientEventListener is a listener to handle events produced by `Client`.
type ClientEventListener interface {
// OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information.
OnHealthFeedback(feedback *tikvpb.HealthFeedback)
}

type connArray struct {
Expand All @@ -127,7 +136,7 @@ type connArray struct {
}

func newConnArray(maxSize uint, addr string, security config.Security,
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) {
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, eventListener *atomic.Pointer[ClientEventListener], opts []grpc.DialOption) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*monitoredConn, maxSize),
Expand All @@ -136,7 +145,7 @@ func newConnArray(maxSize uint, addr string, security config.Security,
dialTimeout: dialTimeout,
monitor: m,
}
if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil {
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
return nil, err
}
return a, nil
Expand Down Expand Up @@ -228,7 +237,7 @@ func (c *monitoredConn) Close() error {
return nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, eventListener *atomic.Pointer[ClientEventListener], opts ...grpc.DialOption) error {
a.target = addr

opt := grpc.WithTransportCredentials(insecure.NewCredentials())
Expand Down Expand Up @@ -317,6 +326,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
tikvLoad: &a.tikvTransportLayerLoad,
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
Expand Down Expand Up @@ -401,16 +411,21 @@ type RPCClient struct {
isClosed bool

connMonitor *connMonitor

eventListener *atomic.Pointer[ClientEventListener]
}

var _ Client = &RPCClient{}

// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
func NewRPCClient(opts ...Opt) *RPCClient {
cli := &RPCClient{
conns: make(map[string]*connArray),
option: &option{
dialTimeout: dialTimeout,
},
connMonitor: &connMonitor{},
connMonitor: &connMonitor{},
eventListener: new(atomic.Pointer[ClientEventListener]),
}
for _, opt := range opts {
opt(cli.option)
Expand Down Expand Up @@ -462,6 +477,7 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(
enableBatch,
c.option.dialTimeout,
c.connMonitor,
c.eventListener,
c.option.gRPCDialOptions)

if err != nil {
Expand Down Expand Up @@ -810,6 +826,12 @@ func (c *RPCClient) CloseAddr(addr string) error {
return nil
}

// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
// previously set one will be replaced.
func (c *RPCClient) SetEventListener(listener ClientEventListener) {
c.eventListener.Store(&listener)
}

type spanInfo struct {
name string
dur uint64
Expand Down
22 changes: 22 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,15 @@ type batchCommandsClient struct {
closed int32
// tryLock protects client when re-create the streaming.
tryLock

// sent is the number of the requests are processed by tikv server.
sent atomic.Int64
// maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv
maxConcurrencyRequestLimit atomic.Int64

// eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic
// pointer to make setting thread-safe.
eventListener *atomic.Pointer[ClientEventListener]
}

func (c *batchCommandsClient) isStopped() bool {
Expand Down Expand Up @@ -694,6 +699,17 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
continue
}

if resp.GetHealthFeedback() != nil {
if val, err := util.EvalFailpoint("injectHealthFeedbackSlowScore"); err == nil {
v, ok := val.(int)
if !ok || v < 0 || v > 100 {
panic(fmt.Sprintf("invalid injection in failpoint injectHealthFeedbackSlowScore: %+q", v))
}
resp.GetHealthFeedback().SlowScore = int32(v)
}
c.onHealthFeedback(resp.GetHealthFeedback())
}

responses := resp.GetResponses()
for i, requestID := range resp.GetRequestIds() {
value, ok := c.batched.Load(requestID)
Expand Down Expand Up @@ -725,6 +741,12 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
}
}

func (c *batchCommandsClient) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
if h := c.eventListener.Load(); h != nil {
(*h).OnHealthFeedback(feedback)
}
}

func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *batchCommandsStream, epoch *uint64) (stopped bool) {
// Forbids the batchSendLoop using the old client and
// blocks other streams trying to recreate.
Expand Down
2 changes: 2 additions & 0 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (c emptyClient) CloseAddr(addr string) error {
return nil
}

func (c emptyClient) SetEventListener(listener ClientEventListener) {}

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{})
Expand Down
72 changes: 72 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (c *chanClient) CloseAddr(addr string) error {
return nil
}

func (c *chanClient) SetEventListener(listener ClientEventListener) {}

func (c *chanClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
c.wg.Wait()
c.ch <- req
Expand Down Expand Up @@ -807,3 +809,73 @@ func TestPrioritySentLimit(t *testing.T) {
re.Less(highDur.Load()/highQps.Load()*2, mediumDur.Load()/mediumQps.Load())
server.Stop()
}

type testClientEventListener struct {
healthFeedbackCh chan *tikvpb.HealthFeedback
}

func newTestClientEventListener() *testClientEventListener {
return &testClientEventListener{
healthFeedbackCh: make(chan *tikvpb.HealthFeedback, 100),
}
}

func (l *testClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) {
l.healthFeedbackCh <- feedback
}

func TestBatchClientReceiveHealthFeedback(t *testing.T) {
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
defer server.Stop()
addr := server.Addr()

client := NewRPCClient()
defer client.Close()

conn, err := client.getConnArray(addr, true)
assert.NoError(t, err)
tikvClient := tikvpb.NewTikvClient(conn.Get())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := tikvClient.BatchCommands(ctx)
assert.NoError(t, err)

for reqID := uint64(1); reqID <= 3; reqID++ {
assert.NoError(t, stream.Send(&tikvpb.BatchCommandsRequest{
Requests: []*tikvpb.BatchCommandsRequest_Request{{
Cmd: &tikvpb.BatchCommandsRequest_Request_Get{Get: &kvrpcpb.GetRequest{
Context: &kvrpcpb.Context{},
Key: []byte("k"),
Version: 1,
}},
}},
RequestIds: []uint64{reqID},
}))
resp, err := stream.Recv()
assert.NoError(t, err)
assert.Equal(t, []uint64{reqID}, resp.GetRequestIds())
assert.Len(t, resp.GetResponses(), 1)
assert.Equal(t, uint64(1), resp.GetHealthFeedback().GetStoreId())
assert.Equal(t, reqID, resp.GetHealthFeedback().GetFeedbackSeqNo())
assert.Equal(t, int32(1), resp.GetHealthFeedback().GetSlowScore())
}
cancel()

eventListener := newTestClientEventListener()
client.SetEventListener(eventListener)
ctx = context.Background()
resp, err := client.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), time.Second)
assert.NoError(t, err)
assert.NotNil(t, resp.Resp)

select {
case feedback := <-eventListener.healthFeedbackCh:
assert.Equal(t, uint64(1), feedback.GetStoreId())
assert.Equal(t, int32(1), feedback.GetSlowScore())
default:
assert.Fail(t, "health feedback not received")
}
}
8 changes: 8 additions & 0 deletions internal/client/mockserver/mock_tikv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
var feedbackSeq uint64 = 1
for {
req, err := ss.Recv()
if err != nil {
Expand All @@ -98,7 +99,14 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
err = ss.Send(&tikvpb.BatchCommandsResponse{
Responses: responses,
RequestIds: req.GetRequestIds(),
HealthFeedback: &tikvpb.HealthFeedback{
StoreId: 1,
FeedbackSeqNo: feedbackSeq,
SlowScore: 1,
},
})
feedbackSeq++

if err != nil {
logutil.BgLogger().Error("batch commands send fail", zap.Error(err))
return err
Expand Down
Loading

0 comments on commit 03bbadb

Please sign in to comment.