Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support receving health feedback #1153

Merged
merged 23 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e714b6a
Register health feedback handler in batch client
MyonKeminta Feb 6, 2024
12166fa
update protocol
MyonKeminta Feb 7, 2024
419649a
Collect the health status and record to the stores
MyonKeminta Feb 7, 2024
9b9d0d0
Remove the filter which looks not necessary
MyonKeminta Feb 7, 2024
c381ab5
Add metrics
MyonKeminta Feb 8, 2024
cce241a
Fix checkAndUpdateStoreHealthStatus panicking
MyonKeminta Feb 18, 2024
f769b67
Add logs
MyonKeminta Feb 18, 2024
3d92daf
Merge SetHealthFeedbackHandler to Client
MyonKeminta Feb 19, 2024
a29a129
Try another pattern: separated callback registry
MyonKeminta Feb 19, 2024
1ddc60d
Try region cache eventListener pattern
MyonKeminta Feb 19, 2024
1af7e4a
Add tests
MyonKeminta Feb 21, 2024
3d685e7
Test receiving froim callback
MyonKeminta Feb 21, 2024
91c7bbe
Add tests to region cache
MyonKeminta Feb 26, 2024
903ff59
Remove unnecessary debug log
MyonKeminta Feb 26, 2024
daa3dd0
Add comments
MyonKeminta Feb 26, 2024
f800ae1
remove replace of tidb repo
MyonKeminta Feb 26, 2024
c2509b4
Merge branch 'master' of https://github.com/tikv/client-go into m/rec…
MyonKeminta Feb 27, 2024
f99235a
fix build
MyonKeminta Feb 27, 2024
0be8fd6
Update comments; fix lint
MyonKeminta Feb 27, 2024
cf5330d
Add the isSlow method of Store back
MyonKeminta Feb 27, 2024
c6535a8
remove unused method for now to make lint happy
MyonKeminta Feb 28, 2024
eb1421f
Address comments
MyonKeminta Feb 28, 2024
64f11b1
Merge branch 'master' into m/receive-health-feedback
MyonKeminta Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 1 addition & 3 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ type unistoreClientWrapper struct {
*unistore.RPCClient
}

func (c *unistoreClientWrapper) CloseAddr(addr string) error {
return nil
}
func (c *unistoreClientWrapper) SetEventListener(listener tikv.ClientEventListener) {}

func (s *testAsyncCommitCommon) setUpTest() {
if *withTiKV {
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20231212100244-799fae176cfb
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f
github.com/pingcap/tidb v1.1.0-beta.0.20240126041650-de177d85b19e
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
Expand Down
22 changes: 6 additions & 16 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM=
cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4=
cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw=
cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI=
cloud.google.com/go/compute v1.24.0 h1:phWcR2eWzRJaL/kOiJwfFsPs4BaKq1j6vnpZrc1YlVg=
cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
Expand Down Expand Up @@ -194,8 +194,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w
github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE=
github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -237,8 +237,6 @@ github.com/google/pprof v0.0.0-20240117000934-35fc243c5815/go.mod h1:czg5+yv1E0Z
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
Expand Down Expand Up @@ -415,8 +413,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f h1:2xvTjl4OrQY+XF38p8H7qVCXpaUYc5rLiYQhSd07aTI=
github.com/pingcap/kvproto v0.0.0-20240208102409-a554af8ee11f/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
Expand Down Expand Up @@ -800,16 +798,10 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 h1:g/4bk7P6TPMkAUbUhquq98xey1slwvuVJPosdBqYJlU=
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014/go.mod h1:xEgQu1e4stdSSsxPDK8Azkrk/ECl5HvdPf6nbZrTS5M=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe h1:0poefMBYvYbs7g5UkjS6HcxBPaTRAmznle9jnxYoAI8=
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA=
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 h1:x9PwdEgd11LgK+orcck69WVRo7DezSO4VUMPI4xpc8A=
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014/go.mod h1:rbHMSEDyoYX62nRVLOCc4Qt1HbsdytAYoVwgjiOhF3I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9 h1:hZB7eLIaYlW9qXRfCq/qDaPdbeY3757uARz5Vvfv+cY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:YUWgXUFRPfoYK1IHMuxH5K6nPEXSCzIMljnQ59lLRCk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
Expand All @@ -820,8 +812,6 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.61.1 h1:kLAiWrZs7YeDM6MumDe7m3y4aM6wacLzM1Y/wiLP9XY=
google.golang.org/grpc v1.61.1/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk=
google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
Expand Down
30 changes: 26 additions & 4 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ type Client interface {
CloseAddr(addr string) error
// SendRequest sends Request.
SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
// previously set one will be replaced.
SetEventListener(listener ClientEventListener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just declare that the method is not thread safe and should be called before SendRequest so that we can get rid of atomic.Pointer and make related code simpler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I'm afraid that the assumption might be broken easily by mistake in the future...

}

// ClientEventListener is a listener to handle events produced by `Client`.
type ClientEventListener interface {
// OnHealthFeedback is called when `Client` receives a response that carries the HealthFeedback information.
OnHealthFeedback(feedback *tikvpb.HealthFeedback)
}

type connArray struct {
Expand All @@ -127,7 +136,7 @@ type connArray struct {
}

func newConnArray(maxSize uint, addr string, security config.Security,
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, opts []grpc.DialOption) (*connArray, error) {
idleNotify *uint32, enableBatch bool, dialTimeout time.Duration, m *connMonitor, eventListener *atomic.Pointer[ClientEventListener], opts []grpc.DialOption) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*monitoredConn, maxSize),
Expand All @@ -136,7 +145,7 @@ func newConnArray(maxSize uint, addr string, security config.Security,
dialTimeout: dialTimeout,
monitor: m,
}
if err := a.Init(addr, security, idleNotify, enableBatch, opts...); err != nil {
if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil {
return nil, err
}
return a, nil
Expand Down Expand Up @@ -228,7 +237,7 @@ func (c *monitoredConn) Close() error {
return nil
}

func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, opts ...grpc.DialOption) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, enableBatch bool, eventListener *atomic.Pointer[ClientEventListener], opts ...grpc.DialOption) error {
a.target = addr

opt := grpc.WithTransportCredentials(insecure.NewCredentials())
Expand Down Expand Up @@ -317,6 +326,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
tikvLoad: &a.tikvTransportLayerLoad,
dialTimeout: a.dialTimeout,
tryLock: tryLock{sync.NewCond(new(sync.Mutex)), false},
eventListener: eventListener,
}
batchClient.maxConcurrencyRequestLimit.Store(cfg.TiKVClient.MaxConcurrencyRequestLimit)
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
Expand Down Expand Up @@ -401,16 +411,21 @@ type RPCClient struct {
isClosed bool

connMonitor *connMonitor

eventListener *atomic.Pointer[ClientEventListener]
}

var _ Client = &RPCClient{}

// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
func NewRPCClient(opts ...Opt) *RPCClient {
cli := &RPCClient{
conns: make(map[string]*connArray),
option: &option{
dialTimeout: dialTimeout,
},
connMonitor: &connMonitor{},
connMonitor: &connMonitor{},
eventListener: new(atomic.Pointer[ClientEventListener]),
}
for _, opt := range opts {
opt(cli.option)
Expand Down Expand Up @@ -462,6 +477,7 @@ func (c *RPCClient) createConnArray(addr string, enableBatch bool, opts ...func(
enableBatch,
c.option.dialTimeout,
c.connMonitor,
c.eventListener,
c.option.gRPCDialOptions)

if err != nil {
Expand Down Expand Up @@ -810,6 +826,12 @@ func (c *RPCClient) CloseAddr(addr string) error {
return nil
}

// SetEventListener registers an event listener for the Client instance. If it's called more than once, the
// previously set one will be replaced.
func (c *RPCClient) SetEventListener(listener ClientEventListener) {
c.eventListener.Store(&listener)
}

type spanInfo struct {
name string
dur uint64
Expand Down
22 changes: 22 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,15 @@ type batchCommandsClient struct {
closed int32
// tryLock protects client when re-create the streaming.
tryLock

// sent is the number of the requests are processed by tikv server.
sent atomic.Int64
// maxConcurrencyRequestLimit is the max allowed number of requests to be sent the tikv
maxConcurrencyRequestLimit atomic.Int64

// eventListener is the listener set by external code to observe some events in the client. It's stored in a atomic
// pointer to make setting thread-safe.
eventListener *atomic.Pointer[ClientEventListener]
}

func (c *batchCommandsClient) isStopped() bool {
Expand Down Expand Up @@ -694,6 +699,17 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
continue
}

if resp.GetHealthFeedback() != nil {
if val, err := util.EvalFailpoint("injectHealthFeedbackSlowScore"); err == nil {
v, ok := val.(int)
if !ok || v < 0 || v > 100 {
panic(fmt.Sprintf("invalid injection in failpoint injectHealthFeedbackSlowScore: %+q", v))
}
resp.GetHealthFeedback().SlowScore = int32(v)
}
c.onHealthFeedback(resp.GetHealthFeedback())
}

responses := resp.GetResponses()
for i, requestID := range resp.GetRequestIds() {
value, ok := c.batched.Load(requestID)
Expand Down Expand Up @@ -725,6 +741,12 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransport
}
}

func (c *batchCommandsClient) onHealthFeedback(feedback *tikvpb.HealthFeedback) {
if h := c.eventListener.Load(); h != nil {
(*h).OnHealthFeedback(feedback)
}
}

func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *batchCommandsStream, epoch *uint64) (stopped bool) {
// Forbids the batchSendLoop using the old client and
// blocks other streams trying to recreate.
Expand Down
2 changes: 2 additions & 0 deletions internal/client/client_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (c emptyClient) CloseAddr(addr string) error {
return nil
}

func (c emptyClient) SetEventListener(listener ClientEventListener) {}

func TestInterceptedClient(t *testing.T) {
executed := false
client := NewInterceptedClient(emptyClient{})
Expand Down
72 changes: 72 additions & 0 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (c *chanClient) CloseAddr(addr string) error {
return nil
}

func (c *chanClient) SetEventListener(listener ClientEventListener) {}

func (c *chanClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
c.wg.Wait()
c.ch <- req
Expand Down Expand Up @@ -807,3 +809,73 @@ func TestPrioritySentLimit(t *testing.T) {
re.Less(highDur.Load()/highQps.Load()*2, mediumDur.Load()/mediumQps.Load())
server.Stop()
}

type testClientEventListener struct {
healthFeedbackCh chan *tikvpb.HealthFeedback
}

func newTestClientEventListener() *testClientEventListener {
return &testClientEventListener{
healthFeedbackCh: make(chan *tikvpb.HealthFeedback, 100),
}
}

func (l *testClientEventListener) OnHealthFeedback(feedback *tikvpb.HealthFeedback) {
l.healthFeedbackCh <- feedback
}

func TestBatchClientReceiveHealthFeedback(t *testing.T) {
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
require.True(t, server.IsRunning())
defer server.Stop()
addr := server.Addr()

client := NewRPCClient()
defer client.Close()

conn, err := client.getConnArray(addr, true)
assert.NoError(t, err)
tikvClient := tikvpb.NewTikvClient(conn.Get())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := tikvClient.BatchCommands(ctx)
assert.NoError(t, err)

for reqID := uint64(1); reqID <= 3; reqID++ {
assert.NoError(t, stream.Send(&tikvpb.BatchCommandsRequest{
Requests: []*tikvpb.BatchCommandsRequest_Request{{
Cmd: &tikvpb.BatchCommandsRequest_Request_Get{Get: &kvrpcpb.GetRequest{
Context: &kvrpcpb.Context{},
Key: []byte("k"),
Version: 1,
}},
}},
RequestIds: []uint64{reqID},
}))
resp, err := stream.Recv()
assert.NoError(t, err)
assert.Equal(t, []uint64{reqID}, resp.GetRequestIds())
assert.Len(t, resp.GetResponses(), 1)
assert.Equal(t, uint64(1), resp.GetHealthFeedback().GetStoreId())
assert.Equal(t, reqID, resp.GetHealthFeedback().GetFeedbackSeqNo())
assert.Equal(t, int32(1), resp.GetHealthFeedback().GetSlowScore())
}
cancel()

eventListener := newTestClientEventListener()
client.SetEventListener(eventListener)
ctx = context.Background()
resp, err := client.SendRequest(ctx, addr, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), time.Second)
assert.NoError(t, err)
assert.NotNil(t, resp.Resp)

select {
case feedback := <-eventListener.healthFeedbackCh:
assert.Equal(t, uint64(1), feedback.GetStoreId())
assert.Equal(t, int32(1), feedback.GetSlowScore())
default:
assert.Fail(t, "health feedback not received")
}
}
8 changes: 8 additions & 0 deletions internal/client/mockserver/mock_tikv_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
if err := s.checkMetadata(ss.Context()); err != nil {
return err
}
var feedbackSeq uint64 = 1
for {
req, err := ss.Recv()
if err != nil {
Expand All @@ -98,7 +99,14 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error {
err = ss.Send(&tikvpb.BatchCommandsResponse{
Responses: responses,
RequestIds: req.GetRequestIds(),
HealthFeedback: &tikvpb.HealthFeedback{
StoreId: 1,
FeedbackSeqNo: feedbackSeq,
SlowScore: 1,
},
})
feedbackSeq++

if err != nil {
logutil.BgLogger().Error("batch commands send fail", zap.Error(err))
return err
Expand Down
Loading
Loading