Skip to content

Commit

Permalink
Add GetHistoryHotRegions interface and more tests
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Nov 27, 2023
1 parent 4bcac20 commit 27a4a36
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 9 deletions.
35 changes: 34 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Client interface {
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
Expand Down Expand Up @@ -191,20 +192,31 @@ func (c *client) execDuration(name string, duration time.Duration) {
c.executionDuration.WithLabelValues(name).Observe(duration.Seconds())
}

// HeaderOption configures the HTTP header.
type HeaderOption func(header http.Header)

// WithAllowFollowerHandle sets the header field to allow a PD follower to handle this request.
func WithAllowFollowerHandle(allow bool) HeaderOption {
return func(header http.Header) {
header.Set("PD-Allow-follower-handle", fmt.Sprintf("%t", allow))
}
}

// At present, we will use the retry strategy of polling by default to keep
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
ctx context.Context,
name, uri, method string,
body io.Reader, res interface{},
headerOpts ...HeaderOption,
) error {
var (
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res)
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...)
if err == nil {
break
}
Expand All @@ -218,6 +230,7 @@ func (c *client) request(
ctx context.Context,
name, url, method string,
body io.Reader, res interface{},
headerOpts ...HeaderOption,
) error {
logFields := []zap.Field{
zap.String("name", name),
Expand All @@ -229,6 +242,9 @@ func (c *client) request(
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
for _, opt := range headerOpts {
opt(req.Header)
}
start := time.Now()
resp, err := c.cli.Do(req)
if err != nil {
Expand Down Expand Up @@ -361,6 +377,23 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e
return &hotWriteRegions, nil
}

// GetHistoryHotRegions gets the history hot region statistics info.
func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) {
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, errors.Trace(err)

Check warning on line 384 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L384

Added line #L384 was not covered by tests
}
var historyHotRegions HistoryHotRegions
err = c.requestWithRetry(ctx,
"GetHistoryHotRegions", HotHistory,
http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions,
WithAllowFollowerHandle(true))
if err != nil {
return nil, err

Check warning on line 392 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L392

Added line #L392 was not covered by tests
}
return &historyHotRegions, nil
}

// GetRegionStatusByKeyRange gets the region status by key range.
// The keys in the key range should be encoded in the UTF-8 bytes format.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
Expand Down
42 changes: 42 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/json"
"net/url"
"time"

"github.com/pingcap/kvproto/pkg/encryptionpb"
)

// KeyRange defines a range of keys in bytes.
Expand Down Expand Up @@ -166,6 +168,46 @@ type HotPeerStatShow struct {
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

// HistoryHotRegionsRequest wrap the request conditions.
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
IsLearners []bool `json:"is_learners,omitempty"`
IsLeaders []bool `json:"is_leaders,omitempty"`
HotRegionTypes []string `json:"hot_region_type,omitempty"`
}

// HistoryHotRegions wraps historyHotRegion
type HistoryHotRegions struct {
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"`
}

// HistoryHotRegion wraps hot region info
// it is storage format of hot_region_storage
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time"`
RegionID uint64 `json:"region_id"`
PeerID uint64 `json:"peer_id"`
StoreID uint64 `json:"store_id"`
IsLeader bool `json:"is_leader"`
IsLearner bool `json:"is_learner"`
HotRegionType string `json:"hot_region_type"`
HotDegree int64 `json:"hot_degree"`
FlowBytes float64 `json:"flow_bytes"`
KeyRate float64 `json:"key_rate"`
QueryRate float64 `json:"query_rate"`
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
// Encryption metadata for start_key and end_key. encryption_meta.iv is IV for start_key.
// IV for end_key is calculated from (encryption_meta.iv + len(start_key)).
// The field is only used by PD and should be ignored otherwise.
// If encryption_meta is empty (i.e. nil), it means start_key and end_key are unencrypted.
EncryptionMeta *encryptionpb.EncryptionMeta `json:"encryption_meta,omitempty"`
}

// StoresInfo represents the information of all TiKV/TiFlash stores.
type StoresInfo struct {
Count int `json:"count"`
Expand Down
64 changes: 56 additions & 8 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,16 @@ func (suite *httpClientTestSuite) SetupSuite() {
re.NoError(err)
leader := suite.cluster.WaitLeader()
re.NotEmpty(leader)
err = suite.cluster.GetLeaderServer().BootstrapCluster()
leaderServer := suite.cluster.GetLeaderServer()
err = leaderServer.BootstrapCluster()
re.NoError(err)
for _, region := range []*core.RegionInfo{
core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")),
core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")),
} {
err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region)
re.NoError(err)
}
var (
testServers = suite.cluster.GetServers()
endpoints = make([]string, 0, len(testServers))
Expand All @@ -73,6 +81,53 @@ func (suite *httpClientTestSuite) TearDownSuite() {
suite.cluster.Destroy()
}

func (suite *httpClientTestSuite) TestMeta() {
re := suite.Require()
region, err := suite.client.GetRegionByID(suite.ctx, 10)
re.NoError(err)
re.Equal(int64(10), region.ID)
re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey)
re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey)
region, err = suite.client.GetRegionByKey(suite.ctx, []byte("a2"))
re.NoError(err)
re.Equal(int64(11), region.ID)
re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey)
re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey)
regions, err := suite.client.GetRegions(suite.ctx)
re.NoError(err)
re.Equal(int64(2), regions.Count)
re.Len(regions.Regions, 2)
regions, err = suite.client.GetRegionsByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1)
re.NoError(err)
re.Equal(int64(2), regions.Count)
re.Len(regions.Regions, 2)
regions, err = suite.client.GetRegionsByStoreID(suite.ctx, 1)
re.NoError(err)
re.Equal(int64(2), regions.Count)
re.Len(regions.Regions, 2)
regionStats, err := suite.client.GetRegionStatusByKeyRange(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")))
re.NoError(err)
re.Equal(2, regionStats.Count)
hotReadRegions, err := suite.client.GetHotReadRegions(suite.ctx)
re.NoError(err)
re.Len(hotReadRegions.AsPeer, 1)
re.Len(hotReadRegions.AsLeader, 1)
hotWriteRegions, err := suite.client.GetHotWriteRegions(suite.ctx)
re.NoError(err)
re.Len(hotWriteRegions.AsPeer, 1)
re.Len(hotWriteRegions.AsLeader, 1)
historyHorRegions, err := suite.client.GetHistoryHotRegions(suite.ctx, &pd.HistoryHotRegionsRequest{
StartTime: 0,
EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond),
})
re.NoError(err)
re.Len(historyHorRegions.HistoryHotRegion, 0)
store, err := suite.client.GetStores(suite.ctx)
re.NoError(err)
re.Equal(1, store.Count)
re.Len(store.Stores, 1)
}

func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() {
re := suite.Require()
testMinResolvedTS := tsoutil.TimeToTS(time.Now())
Expand Down Expand Up @@ -271,13 +326,6 @@ func (suite *httpClientTestSuite) TestRegionLabel() {
func (suite *httpClientTestSuite) TestAccelerateSchedule() {
re := suite.Require()
raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster()
for _, region := range []*core.RegionInfo{
core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")),
core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")),
} {
err := raftCluster.HandleRegionHeartbeat(region)
re.NoError(err)
}
suspectRegions := raftCluster.GetSuspectRegions()
re.Len(suspectRegions, 0)
err := suite.client.AccelerateSchedule(suite.ctx, pd.NewKeyRange([]byte("a1"), []byte("a2")))
Expand Down

0 comments on commit 27a4a36

Please sign in to comment.