From fc17587604f45e82c392f57f565b05e52ba336f9 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 27 Nov 2023 17:15:34 +0800 Subject: [PATCH 1/3] Add GetHistoryHotRegions interface and more tests Signed-off-by: JmPotato --- client/http/client.go | 35 +++++++++- client/http/types.go | 42 ++++++++++++ tests/integrations/client/http_client_test.go | 64 ++++++++++++++++--- 3 files changed, 132 insertions(+), 9 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index cd7be205c12..2f4a060c8b6 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -49,6 +49,7 @@ type Client interface { GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) /* Rule-related interfaces */ @@ -191,12 +192,23 @@ func (c *client) execDuration(name string, duration time.Duration) { c.executionDuration.WithLabelValues(name).Observe(duration.Seconds()) } +// HeaderOption configures the HTTP header. +type HeaderOption func(header http.Header) + +// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request. +func WithAllowFollowerHandle(allow bool) HeaderOption { + return func(header http.Header) { + header.Set("PD-Allow-follower-handle", fmt.Sprintf("%t", allow)) + } +} + // At present, we will use the retry strategy of polling by default to keep // it consistent with the current implementation of some clients (e.g. TiDB). func (c *client) requestWithRetry( ctx context.Context, name, uri, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { var ( err error @@ -204,7 +216,7 @@ func (c *client) requestWithRetry( ) for idx := 0; idx < len(c.pdAddrs); idx++ { addr = c.pdAddrs[idx] - err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res) + err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...) if err == nil { break } @@ -218,6 +230,7 @@ func (c *client) request( ctx context.Context, name, url, method string, body io.Reader, res interface{}, + headerOpts ...HeaderOption, ) error { logFields := []zap.Field{ zap.String("name", name), @@ -229,6 +242,9 @@ func (c *client) request( log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) } + for _, opt := range headerOpts { + opt(req.Header) + } start := time.Now() resp, err := c.cli.Do(req) if err != nil { @@ -361,6 +377,23 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e return &hotWriteRegions, nil } +// GetHistoryHotRegions gets the history hot region statistics info. +func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) { + reqJSON, err := json.Marshal(req) + if err != nil { + return nil, errors.Trace(err) + } + var historyHotRegions HistoryHotRegions + err = c.requestWithRetry(ctx, + "GetHistoryHotRegions", HotHistory, + http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, + WithAllowFollowerHandle(true)) + if err != nil { + return nil, err + } + return &historyHotRegions, nil +} + // GetRegionStatusByKeyRange gets the region status by key range. // The keys in the key range should be encoded in the UTF-8 bytes format. func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) { diff --git a/client/http/types.go b/client/http/types.go index df448e7e20d..4e99d911e0b 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -19,6 +19,8 @@ import ( "encoding/json" "net/url" "time" + + "github.com/pingcap/kvproto/pkg/encryptionpb" ) // KeyRange defines a range of keys in bytes. @@ -166,6 +168,46 @@ type HotPeerStatShow struct { LastUpdateTime time.Time `json:"last_update_time,omitempty"` } +// HistoryHotRegionsRequest wrap the request conditions. +type HistoryHotRegionsRequest struct { + StartTime int64 `json:"start_time,omitempty"` + EndTime int64 `json:"end_time,omitempty"` + RegionIDs []uint64 `json:"region_ids,omitempty"` + StoreIDs []uint64 `json:"store_ids,omitempty"` + PeerIDs []uint64 `json:"peer_ids,omitempty"` + IsLearners []bool `json:"is_learners,omitempty"` + IsLeaders []bool `json:"is_leaders,omitempty"` + HotRegionTypes []string `json:"hot_region_type,omitempty"` +} + +// HistoryHotRegions wraps historyHotRegion +type HistoryHotRegions struct { + HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"` +} + +// HistoryHotRegion wraps hot region info +// it is storage format of hot_region_storage +type HistoryHotRegion struct { + UpdateTime int64 `json:"update_time"` + RegionID uint64 `json:"region_id"` + PeerID uint64 `json:"peer_id"` + StoreID uint64 `json:"store_id"` + IsLeader bool `json:"is_leader"` + IsLearner bool `json:"is_learner"` + HotRegionType string `json:"hot_region_type"` + HotDegree int64 `json:"hot_degree"` + FlowBytes float64 `json:"flow_bytes"` + KeyRate float64 `json:"key_rate"` + QueryRate float64 `json:"query_rate"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + // Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key. + // IV for end_key is calculated from (encryption_meta.iv + len(start_key)). + // The field is only used by PD and should be ignored otherwise. + // If encryption_meta is empty (i.e. nil), it means start_key and end_key are unencrypted. + EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"` +} + // StoresInfo represents the information of all TiKV/TiFlash stores. type StoresInfo struct { Count int `json:"count"` diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 3a52e91e1f8..e3b960df1f3 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -55,8 +55,16 @@ func (suite *httpClientTestSuite) SetupSuite() { re.NoError(err) leader := suite.cluster.WaitLeader() re.NotEmpty(leader) - err = suite.cluster.GetLeaderServer().BootstrapCluster() + leaderServer := suite.cluster.GetLeaderServer() + err = leaderServer.BootstrapCluster() re.NoError(err) + for _, region := range []*core.RegionInfo{ + core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), + core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), + } { + err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + re.NoError(err) + } var ( testServers = suite.cluster.GetServers() endpoints = make([]string, 0, len(testServers)) @@ -73,6 +81,53 @@ func (suite *httpClientTestSuite) TearDownSuite() { suite.cluster.Destroy() } +func (suite *httpClientTestSuite) TestMeta() { + re := suite.Require() + region, err := suite.client.GetRegionByID(suite.ctx, 10) + re.NoError(err) + re.Equal(int64(10), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey) + region, err = suite.client.GetRegionByKey(suite.ctx, []byte("a2")) + re.NoError(err) + re.Equal(int64(11), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey) + regions, err := suite.client.GetRegions(suite.ctx) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = suite.client.GetRegionsByStoreID(suite.ctx, 1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + re.NoError(err) + re.Equal(2, regionStats.Count) + hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx) + re.NoError(err) + re.Len(hotReadRegions.AsPeer, 1) + re.Len(hotReadRegions.AsLeader, 1) + hotWriteRegions, err := suite.client.GetHotWriteRegions(suite.ctx) + re.NoError(err) + re.Len(hotWriteRegions.AsPeer, 1) + re.Len(hotWriteRegions.AsLeader, 1) + historyHorRegions, err := suite.client.GetHistoryHotRegions(suite.ctx, &pd.HistoryHotRegionsRequest{ + StartTime: 0, + EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), + }) + re.NoError(err) + re.Len(historyHorRegions.HistoryHotRegion, 0) + store, err := suite.client.GetStores(suite.ctx) + re.NoError(err) + re.Equal(1, store.Count) + re.Len(store.Stores, 1) +} + func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { re := suite.Require() testMinResolvedTS := tsoutil.TimeToTS(time.Now()) @@ -271,13 +326,6 @@ func (suite *httpClientTestSuite) TestRegionLabel() { func (suite *httpClientTestSuite) TestAccelerateSchedule() { re := suite.Require() raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() - for _, region := range []*core.RegionInfo{ - core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), - core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), - } { - err := raftCluster.HandleRegionHeartbeat(region) - re.NoError(err) - } suspectRegions := raftCluster.GetSuspectRegions() re.Len(suspectRegions, 0) err := suite.client.AccelerateSchedule(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) From d961677fca28b41d59d9d22541249970c1ac398c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 28 Nov 2023 15:16:22 +0800 Subject: [PATCH 2/3] Address the comment Signed-off-by: JmPotato --- client/http/client.go | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index 2f4a060c8b6..d15693e11d4 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -52,6 +52,9 @@ type Client interface { GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + /* Config-related interfaces */ + GetScheduleConfig(context.Context) (map[string]interface{}, error) + SetScheduleConfig(context.Context, map[string]interface{}) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -196,9 +199,9 @@ func (c *client) execDuration(name string, duration time.Duration) { type HeaderOption func(header http.Header) // WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request. -func WithAllowFollowerHandle(allow bool) HeaderOption { +func WithAllowFollowerHandle() HeaderOption { return func(header http.Header) { - header.Set("PD-Allow-follower-handle", fmt.Sprintf("%t", allow)) + header.Set("PD-Allow-Follower-Handle", "true") } } @@ -387,7 +390,7 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion err = c.requestWithRetry(ctx, "GetHistoryHotRegions", HotHistory, http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, - WithAllowFollowerHandle(true)) + WithAllowFollowerHandle()) if err != nil { return nil, err } @@ -408,6 +411,29 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan return ®ionStats, nil } +// GetScheduleConfig gets the schedule configurations. +func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { + var config map[string]interface{} + err := c.requestWithRetry(ctx, + "GetScheduleConfig", ScheduleConfig, + http.MethodGet, http.NoBody, &config) + if err != nil { + return nil, err + } + return config, nil +} + +// SetScheduleConfig sets the schedule configurations. +func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interface{}) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "SetScheduleConfig", ScheduleConfig, + http.MethodPost, bytes.NewBuffer(configJSON), nil) +} + // GetStores gets the stores info. func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo From 0f5985667817dc19df0e068c9255ec701ce63e52 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 29 Nov 2023 10:46:45 +0800 Subject: [PATCH 3/3] Add TestScheduleConfig Signed-off-by: JmPotato --- tests/integrations/client/http_client_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index e3b960df1f3..a007b893187 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -343,3 +343,18 @@ func (suite *httpClientTestSuite) TestAccelerateSchedule() { suspectRegions = raftCluster.GetSuspectRegions() re.Len(suspectRegions, 2) } + +func (suite *httpClientTestSuite) TestScheduleConfig() { + re := suite.Require() + config, err := suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(4), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) + config["leader-schedule-limit"] = float64(8) + err = suite.client.SetScheduleConfig(suite.ctx, config) + re.NoError(err) + config, err = suite.client.GetScheduleConfig(suite.ctx) + re.NoError(err) + re.Equal(float64(8), config["leader-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) +}