Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add retriver for tidb_hot_regions_history #27375

Merged
merged 36 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d8f5844
executor: add retriver for tidb_hot_regions_history
IcePigZDB Aug 15, 2021
f87a112
executor: unified use of ms in update_time, and move IntSet init to e…
IcePigZDB Aug 16, 2021
644b406
executor: fix time formate bug
IcePigZDB Aug 16, 2021
9067e13
executor: fix history hot regions memtable reader test time bug
IcePigZDB Aug 16, 2021
4724738
executor: call DecodeBytes for region range key, and update test rang…
IcePigZDB Aug 18, 2021
78a9484
executor: change UPDATE_TIME type to TypeTimestamp and add timezone c…
IcePigZDB Aug 19, 2021
ff0c630
executor: devide read and write hot types into two http request to fi…
IcePigZDB Aug 22, 2021
faf8f52
executor: add is_leader in request and update retriver test
IcePigZDB Aug 22, 2021
0330dc6
executor: remove debug test case
IcePigZDB Aug 22, 2021
42a5c61
executor: close httpServers after test down
IcePigZDB Aug 22, 2021
c8e440d
executor: roles from intset to slice
IcePigZDB Aug 23, 2021
034533d
execuotr: use bool for IsLeader
IcePigZDB Aug 23, 2021
6ba28be
executor: use http.MethodGet
IcePigZDB Aug 24, 2021
8008798
executor: remove extraction unnecessary columns
IcePigZDB Aug 25, 2021
95093e9
executor: review from @rleungx
IcePigZDB Aug 25, 2021
cb2cbed
exectuor:change PD-Allow-follower-handle to false
IcePigZDB Aug 25, 2021
0490b49
executor: add is_learner field
IcePigZDB Aug 28, 2021
faf03b0
executor: formate retriver
IcePigZDB Sep 22, 2021
df7af2d
executor: add init of cancel
IcePigZDB Sep 22, 2021
27f8e4b
executor: formate
IcePigZDB Sep 23, 2021
f8b7416
Update executor/memtable_reader.go
IcePigZDB Oct 8, 2021
bbc6ef8
executor: simplfy code and move channel close to retrive func
IcePigZDB Oct 8, 2021
d78ae5c
executor: move close channel to startRetriving
IcePigZDB Oct 10, 2021
06f65df
Merge branch 'master' into hishotregionsexecutor
IcePigZDB Oct 11, 2021
c3f408f
Merge branch 'master' into hishotregionsexecutor
IcePigZDB Oct 15, 2021
6d6ed52
Update executor/memtable_reader.go
IcePigZDB Oct 15, 2021
1077f70
move supplement of HotTypes, IsLearners and IsLeaders to extractor
IcePigZDB Oct 17, 2021
c8d8c11
planner: keep role bool slice unique
IcePigZDB Oct 20, 2021
4ddc37f
planner: update comment
IcePigZDB Oct 20, 2021
485b76f
Merge branch 'master' into hishotregionsexecutor
IcePigZDB Oct 20, 2021
61dcc1b
Merge branch 'master' into hishotregionsexecutor
IcePigZDB Nov 9, 2021
57b2144
Merge branch 'master' into hishotregionsexecutor
nolouch Nov 16, 2021
957fe33
Merge branch 'master' into hishotregionsexecutor
IcePigZDB Nov 17, 2021
e05a248
fix conflict due to the remove of pdapi.clusterVersion
IcePigZDB Nov 17, 2021
e4d113f
address comment
IcePigZDB Nov 17, 2021
fa68994
Merge branch 'master' into hishotregionsexecutor
ti-chi-bot Nov 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
},
}
case strings.ToLower(infoschema.TableTiDBHotRegionsHistory):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &hotRegionsHistoryRetriver{
extractor: v.Extractor.(*plannercore.HotRegionsHistoryTableExtractor),
},
}
case strings.ToLower(infoschema.TableInspectionResult):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
301 changes: 301 additions & 0 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"container/heap"
"context"
"encoding/json"
Expand All @@ -39,18 +40,22 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const clusterLogBatchSize = 256
const hotRegionsHistoryBatchSize = 256

type dummyCloser struct{}

Expand Down Expand Up @@ -696,3 +701,299 @@ func (e *clusterLogRetriever) close() error {
func (e *clusterLogRetriever) getRuntimeStats() execdetails.RuntimeStats {
return nil
}

type hotRegionsResult struct {
addr string
messages *HistoryHotRegions
err error
}

type hotRegionsResponseHeap []hotRegionsResult

func (h hotRegionsResponseHeap) Len() int {
return len(h)
}

func (h hotRegionsResponseHeap) Less(i, j int) bool {
lhs, rhs := h[i].messages.HistoryHotRegion[0], h[j].messages.HistoryHotRegion[0]
if lhs.UpdateTime != rhs.UpdateTime {
return lhs.UpdateTime < rhs.UpdateTime
}
return lhs.HotDegree < rhs.HotDegree
}

func (h hotRegionsResponseHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *hotRegionsResponseHeap) Push(x interface{}) {
*h = append(*h, x.(hotRegionsResult))
}

func (h *hotRegionsResponseHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type hotRegionsHistoryRetriver struct {
dummyCloser
isDrained bool
retrieving bool
heap *hotRegionsResponseHeap
extractor *plannercore.HotRegionsHistoryTableExtractor
}

// HistoryHotRegionsRequest wrap conditions push down to PD.
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
IsLearners []bool `json:"is_learners,omitempty"`
IsLeaders []bool `json:"is_leaders,omitempty"`
HotRegionTypes []string `json:"hot_region_type,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typ is enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but now pd-ctl use a slice, we will fix it after pd-ctl and pd change to type.

}

// HistoryHotRegions records filtered hot regions stored in each PD.
// it's the response of PD.
type HistoryHotRegions struct {
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"`
}

// HistoryHotRegion records each hot region's statistics.
// it's the response of PD.
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time,omitempty"`
RegionID uint64 `json:"region_id,omitempty"`
StoreID uint64 `json:"store_id,omitempty"`
PeerID uint64 `json:"peer_id,omitempty"`
IsLearner bool `json:"is_learner,omitempty"`
IsLeader bool `json:"is_leader,omitempty"`
HotRegionType string `json:"hot_region_type,omitempty"`
HotDegree int64 `json:"hot_degree,omitempty"`
FlowBytes float64 `json:"flow_bytes,omitempty"`
KeyRate float64 `json:"key_rate,omitempty"`
QueryRate float64 `json:"query_rate,omitempty"`
StartKey []byte `json:"start_key,omitempty"`
EndKey []byte `json:"end_key,omitempty"`
}

func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx sessionctx.Context) ([]chan hotRegionsResult, error) {
if !hasPriv(sctx, mysql.ProcessPriv) {
return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
pdServers, err := infoschema.GetPDServerInfo(sctx)
if err != nil {
return nil, err
}

// To avoid search hot regions interface overload, the user should specify the time range in normally SQL.
if e.extractor.StartTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the start time, such as `update_time > '2020-01-01 00:00:00'`")
}
if e.extractor.EndTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the end time, such as `update_time < '2020-01-01 00:00:00'`")
}

historyHotRegionsRequest := &HistoryHotRegionsRequest{
StartTime: e.extractor.StartTime,
EndTime: e.extractor.EndTime,
RegionIDs: e.extractor.RegionIDs,
StoreIDs: e.extractor.StoreIDs,
PeerIDs: e.extractor.PeerIDs,
IsLearners: e.extractor.IsLearners,
IsLeaders: e.extractor.IsLeaders,
}

return e.startRetrieving(ctx, sctx, pdServers, historyHotRegionsRequest)
}

func (e *hotRegionsHistoryRetriver) startRetrieving(
ctx context.Context,
sctx sessionctx.Context,
pdServers []infoschema.ServerInfo,
req *HistoryHotRegionsRequest,
) ([]chan hotRegionsResult, error) {

var results []chan hotRegionsResult
for _, srv := range pdServers {
for typ := range e.extractor.HotRegionTypes {
req.HotRegionTypes = []string{typ}
jsonBody, err := json.Marshal(req)
if err != nil {
return nil, err
}
body := bytes.NewBuffer(jsonBody)
ch := make(chan hotRegionsResult)
results = append(results, ch)
go func(ch chan hotRegionsResult, address string, body *bytes.Buffer) {
util.WithRecovery(func() {
defer close(ch)
IcePigZDB marked this conversation as resolved.
Show resolved Hide resolved
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory)
req, err := http.NewRequest(http.MethodGet, url, body)
if err != nil {
ch <- hotRegionsResult{err: errors.Trace(err)}
return
}
req.Header.Add("PD-Allow-follower-handle", "true")
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
ch <- hotRegionsResult{err: errors.Trace(err)}
return
}
defer func() {
terror.Log(resp.Body.Close())
}()
if resp.StatusCode != http.StatusOK {
ch <- hotRegionsResult{err: errors.Errorf("request %s failed: %s", url, resp.Status)}
return
}
var historyHotRegions HistoryHotRegions
if err = json.NewDecoder(resp.Body).Decode(&historyHotRegions); err != nil {
ch <- hotRegionsResult{err: errors.Trace(err)}
return
}
ch <- hotRegionsResult{addr: address, messages: &historyHotRegions}
}, nil)
}(ch, srv.StatusAddr, body)
}
}
return results, nil
}

func (e *hotRegionsHistoryRetriver) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.isDrained {
return nil, nil
}

if !e.retrieving {
e.retrieving = true
results, err := e.initialize(ctx, sctx)
if err != nil {
e.isDrained = true
return nil, err
}
// Initialize the heap
e.heap = &hotRegionsResponseHeap{}
for _, ch := range results {
result := <-ch
if result.err != nil || len(result.messages.HistoryHotRegion) == 0 {
if result.err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
}
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move continue to line 886 and check len(result.messages.HistoryHotRegion) > 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
IcePigZDB marked this conversation as resolved.
Show resolved Hide resolved
*e.heap = append(*e.heap, result)
}
heap.Init(e.heap)
}
// Merge the results
var finalRows [][]types.Datum
allSchemas := sctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
tz := sctx.GetSessionVars().Location()
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return nil, errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
for e.heap.Len() > 0 && len(finalRows) < hotRegionsHistoryBatchSize {
minTimeItem := heap.Pop(e.heap).(hotRegionsResult)
row, err := e.getHotRegionRowWithSchemaInfo(minTimeItem.messages.HistoryHotRegion[0], tikvHelper, allSchemas, tz)
if err != nil {
return nil, err
}
if row != nil {
finalRows = append(finalRows, row)
}
minTimeItem.messages.HistoryHotRegion = minTimeItem.messages.HistoryHotRegion[1:]
// Fetch next message item
if len(minTimeItem.messages.HistoryHotRegion) != 0 {
heap.Push(e.heap, minTimeItem)
}
}
// All streams are drained
e.isDrained = e.heap.Len() == 0
return finalRows, nil
}

func (e *hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo(
hisHotRegion *HistoryHotRegion,
tikvHelper *helper.Helper,
allSchemas []*model.DBInfo,
tz *time.Location,
) ([]types.Datum, error) {
_, startKey, _ := codec.DecodeBytes(hisHotRegion.StartKey, []byte{})
_, endKey, _ := codec.DecodeBytes(hisHotRegion.EndKey, []byte{})
region := &tikv.KeyLocation{StartKey: startKey, EndKey: endKey}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there possible the old table is dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we have discussed this case, dropped tables' hot region will be ignore in current implement.

	_, startKey, _ := codec.DecodeBytes(hisHotRegion.StartKey, []byte{})
	_, endKey, _ := codec.DecodeBytes(hisHotRegion.EndKey, []byte{})
	region := &tikv.KeyLocation{StartKey: startKey, EndKey: endKey}
	hotRange, err := helper.NewRegionFrameRange(region)
	if err != nil {
		return nil, err
	}

	f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange)
	// Ignore row without corresponding schema f.
	if f == nil {
		return nil, nil
	}

This case has been tested in memtable_reader_test.go/TestTiDBHotRegionsHistory line 1097-1099, code run as expected.

		//      table_id = 131, index_id = 1, index_value = 1, deleted schema
		{"2019-10-10 10:10:23", "UNKONW", "UNKONW", "131", "UNKONW", "1", "5", "5", "55555", "0", "1", "READ", "99", "99", "99", "99"},
		{"2019-10-10 10:10:24", "UNKONW", "UNKONW", "131", "UNKONW", "1", "6", "6", "66666", "0", "0", "WRITE", "99", "99", "99", "99"},

hotRange, err := helper.NewRegionFrameRange(region)
if err != nil {
return nil, err
}

f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange)
// Ignore row without corresponding schema f.
if f == nil {
return nil, nil
}
row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsHistoryCols))
updateTimestamp := time.Unix(hisHotRegion.UpdateTime/1000, (hisHotRegion.UpdateTime%1000)*int64(time.Millisecond))

if updateTimestamp.Location() != tz {
updateTimestamp.In(tz)
}
updateTime := types.NewTime(types.FromGoTime(updateTimestamp), mysql.TypeTimestamp, types.MinFsp)
row[0].SetMysqlTime(updateTime)
row[1].SetString(strings.ToUpper(f.DBName), mysql.DefaultCollationName)
row[2].SetString(strings.ToUpper(f.TableName), mysql.DefaultCollationName)
row[3].SetInt64(f.TableID)
if f.IndexName != "" {
row[4].SetString(strings.ToUpper(f.IndexName), mysql.DefaultCollationName)
row[5].SetInt64(f.IndexID)
} else {
row[4].SetNull()
row[5].SetNull()
}
row[6].SetInt64(int64(hisHotRegion.RegionID))
row[7].SetInt64(int64(hisHotRegion.StoreID))
row[8].SetInt64(int64(hisHotRegion.PeerID))
if hisHotRegion.IsLearner {
row[9].SetInt64(1)
} else {
row[9].SetInt64(0)
}
if hisHotRegion.IsLeader {
row[10].SetInt64(1)
} else {
row[10].SetInt64(0)
}

row[11].SetString(strings.ToUpper(hisHotRegion.HotRegionType), mysql.DefaultCollationName)
if hisHotRegion.HotDegree != 0 {
row[12].SetInt64(hisHotRegion.HotDegree)
} else {
row[12].SetNull()
}
if hisHotRegion.FlowBytes != 0 {
row[13].SetFloat64(hisHotRegion.FlowBytes)
} else {
row[13].SetNull()
}
if hisHotRegion.KeyRate != 0 {
row[14].SetFloat64(hisHotRegion.KeyRate)
} else {
row[14].SetNull()
}
if hisHotRegion.QueryRate != 0 {
row[15].SetFloat64(hisHotRegion.QueryRate)
} else {
row[15].SetNull()
}
return row, nil
}
Loading