Skip to content

Commit

Permalink
executor: add retriver for tidb_hot_regions_history
Browse files Browse the repository at this point in the history
Signed-off-by: IcePigZDB <icepigzdb@gmail.com>
  • Loading branch information
IcePigZDB committed Aug 15, 2021
1 parent fe67ab4 commit 07b0cdd
Show file tree
Hide file tree
Showing 4 changed files with 714 additions and 0 deletions.
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
},
}
case strings.ToLower(infoschema.TableTiDBHotRegionsHistory):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &hotRegionsHistoryRetriver{
extractor: v.Extractor.(*plannercore.HotRegionsHistoryTableExtractor),
},
}
case strings.ToLower(infoschema.TableInspectionResult):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
337 changes: 337 additions & 0 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor

import (
"bytes"
"container/heap"
"context"
"encoding/json"
Expand All @@ -38,18 +39,21 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const clusterLogBatchSize = 256
const hotRegionsHistoryBatchSize = 256

type dummyCloser struct{}

Expand Down Expand Up @@ -698,3 +702,336 @@ func (e *clusterLogRetriever) close() error {
func (e *clusterLogRetriever) getRuntimeStats() execdetails.RuntimeStats {
return nil
}

type hotRegionsStreamResult struct {
addr string
messages *HistoryHotRegions
err error
}

type hotRegionsResponseHeap []hotRegionsStreamResult

func (h hotRegionsResponseHeap) Len() int {
return len(h)
}

func (h hotRegionsResponseHeap) Less(i, j int) bool {
lhs, rhs := h[i].messages.HistoryHotRegion[0], h[j].messages.HistoryHotRegion[0]
if lhs.UpdateTime != rhs.UpdateTime {
return lhs.UpdateTime < rhs.UpdateTime
}
return lhs.HotDegree < rhs.HotDegree
}

func (h hotRegionsResponseHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *hotRegionsResponseHeap) Push(x interface{}) {
*h = append(*h, x.(hotRegionsStreamResult))
}

func (h *hotRegionsResponseHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type hotRegionsHistoryRetriver struct {
isDrained bool
retrieving bool
heap *hotRegionsResponseHeap
extractor *plannercore.HotRegionsHistoryTableExtractor
cancel context.CancelFunc
}

func (e *hotRegionsHistoryRetriver) close() error {
if e.cancel != nil {
e.cancel()
}
return nil
}

func (e *hotRegionsHistoryRetriver) getRuntimeStats() execdetails.RuntimeStats {
return nil
}

// HistoryHotRegionsRequest wrap conditions push down to PD.
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
HotRegionTypes []string `json:"hot_region_types,omitempty"`
LowHotDegree int64 `json:"low_hot_degree,omitempty"`
HighHotDegree int64 `json:"high_hot_degree,omitempty"`
LowFlowBytes float64 `json:"low_flow_bytes,omitempty"`
HighFlowBytes float64 `json:"high_flow_bytes,omitempty"`
LowKeyRate float64 `json:"low_key_rate,omitempty"`
HighKeyRate float64 `json:"high_key_rate,omitempty"`
LowQueryRate float64 `json:"low_query_rate,omitempty"`
HighQueryRate float64 `json:"high_query_rate,omitempty"`
}

// HistoryHotRegions records filtered hot regions stored in each PD
// it's the response of PD.
type HistoryHotRegions struct {
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"`
}

// HistoryHotRegion records each hot region's statistics
// it's the response of PD.
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time,omitempty"`
RegionID uint64 `json:"region_id,omitempty"`
PeerID uint64 `json:"peer_id,omitempty"`
StoreID uint64 `json:"store_id,omitempty"`
HotRegionType string `json:"hot_region_type,omitempty"`
HotDegree int64 `json:"hot_degree,omitempty"`
FlowBytes float64 `json:"flow_bytes,omitempty"`
KeyRate float64 `json:"key_rate,omitempty"`
QueryRate float64 `json:"query_rate,omitempty"`
StartKey []byte `json:"start_key,omitempty"`
EndKey []byte `json:"end_key,omitempty"`
}

func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx sessionctx.Context) ([]chan hotRegionsStreamResult, error) {
// TODO check whether need it
if !hasPriv(sctx, mysql.ProcessPriv) {
return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
pdServers, err := infoschema.GetPDServerInfo(sctx)
if err != nil {
return nil, err
}

// To avoid search hot regions interface overload, the user should specify the time range in normally SQL.
if e.extractor.StartTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the start time, such as `update_time > '2020-01-01 00:00:00'`")
}
if e.extractor.EndTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the end time, such as `update_time < '2020-01-01 00:00:00'`")
}

hotRegionTypes := make([]string, 0, e.extractor.HotRegionTypes.Count())
for typ := range e.extractor.HotRegionTypes {
hotRegionTypes = append(hotRegionTypes, typ)
}

historyHotRegionsRequest := &HistoryHotRegionsRequest{
StartTime: e.extractor.StartTime / 1000, // second in PD
EndTime: e.extractor.EndTime / 1000,
RegionIDs: e.extractor.RegionIDs,
StoreIDs: e.extractor.StoreIDs,
PeerIDs: e.extractor.PeerIDs,
HotRegionTypes: hotRegionTypes,
LowHotDegree: e.extractor.LowHotDegree,
HighHotDegree: e.extractor.HighHotDegree,
LowFlowBytes: e.extractor.LowFlowBytes,
HighFlowBytes: e.extractor.HighFlowBytes,
LowKeyRate: e.extractor.LowKeyRate,
HighKeyRate: e.extractor.HighKeyRate,
LowQueryRate: e.extractor.LowQueryRate,
HighQueryRate: e.extractor.HighQueryRate,
}
jsonBody, err := json.Marshal(historyHotRegionsRequest)
if err != nil {
return nil, err
}
body := bytes.NewBuffer(jsonBody)
return e.startRetrieving(ctx, sctx, pdServers, body)
}

func (e *hotRegionsHistoryRetriver) startRetrieving(
ctx context.Context,
sctx sessionctx.Context,
serversInfo []infoschema.ServerInfo,
body *bytes.Buffer,
) ([]chan hotRegionsStreamResult, error) {
wg := sync.WaitGroup{}
var results []chan hotRegionsStreamResult
for _, srv := range serversInfo {
ch := make(chan hotRegionsStreamResult)
results = append(results, ch)
go func(address string, body *bytes.Buffer) {
util.WithRecovery(func() {
defer wg.Done()
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory)
req, err := http.NewRequest(http.MethodPost, url, body)
if err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
req.Header.Add("PD-Allow-follower-handle", "true")
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
defer func() {
terror.Log(resp.Body.Close())
}()
if resp.StatusCode != http.StatusOK {
ch <- hotRegionsStreamResult{err: errors.Errorf("request %s failed: %s", url, resp.Status)}
return
}
// var nested map[string]interface{}
var historyHotRegions HistoryHotRegions
if err = json.NewDecoder(resp.Body).Decode(&historyHotRegions); err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
ch <- hotRegionsStreamResult{addr: address, messages: &historyHotRegions}
}, nil)
}(srv.StatusAddr, body)
}
return results, nil
}

func (e *hotRegionsHistoryRetriver) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.isDrained {
return nil, nil
}

if !e.retrieving {
e.retrieving = true
results, err := e.initialize(ctx, sctx)
if err != nil {
e.isDrained = true
return nil, err
}
// initialize the heap
e.heap = &hotRegionsResponseHeap{}
for _, ch := range results {
result := <-ch
if result.err != nil || len(result.messages.HistoryHotRegion) == 0 {
if result.err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
}
continue
}
*e.heap = append(*e.heap, result)
}
heap.Init(e.heap)
}
// filter results by db_name, table_name, index_name, table_id, index_id and mearge the results
var finalRows [][]types.Datum
for e.heap.Len() > 0 && len(finalRows) < hotRegionsHistoryBatchSize {
minTimeItem := heap.Pop(e.heap).(hotRegionsStreamResult)
row, err := e.parseAndFilterBySchemaInfo(sctx, minTimeItem.messages.HistoryHotRegion[0])
if err != nil {
return nil, err
}
if row != nil {
finalRows = append(finalRows, row)
}
minTimeItem.messages.HistoryHotRegion = minTimeItem.messages.HistoryHotRegion[1:]
// Current streaming result is drained, read the next to supply.
if len(minTimeItem.messages.HistoryHotRegion) != 0 {
heap.Push(e.heap, minTimeItem)
}
}
// All streams are drained
e.isDrained = e.heap.Len() == 0
return finalRows, nil
}

func (e *hotRegionsHistoryRetriver) filterBySchemaInfo(f *helper.FrameItem) bool {
// TODO Ignore this row can't find responding schema f.
if f == nil {
return false
}
if len(e.extractor.DBNames) != 0 && !e.extractor.DBNames.Exist(f.DBName) {
return false
}
if len(e.extractor.TableNames) != 0 && !e.extractor.TableNames.Exist(f.TableName) {
return false
}
if len(e.extractor.IndexNames) != 0 && !e.extractor.IndexNames.Exist(f.IndexName) {
return false
}
if len(e.extractor.TableIDs) != 0 {
tableIDset := set.NewInt64Set()
for _, tbl := range e.extractor.TableIDs {
tableIDset.Insert(int64(tbl))
}
if !tableIDset.Exist(f.TableID) {
return false
}
}
if len(e.extractor.IndexIDs) != 0 {
indexIDset := set.NewInt64Set()
for _, idx := range e.extractor.IndexIDs {
indexIDset.Insert(int64(idx))
}
if !indexIDset.Exist(f.IndexID) {
return false
}
}
return true
}

func (e *hotRegionsHistoryRetriver) parseAndFilterBySchemaInfo(sctx sessionctx.Context, headMessage *HistoryHotRegion) ([]types.Datum, error) {
region := &tikv.KeyLocation{StartKey: headMessage.StartKey, EndKey: headMessage.EndKey}
hotRange, err := helper.NewRegionFrameRange(region)
if err != nil {
return nil, err
}
allSchemas := sctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
tikvStore, ok := sctx.GetStore().(helper.Storage)
if !ok {
return nil, errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange)
// keep this row or not
keep := e.filterBySchemaInfo(f)
if !keep {
return nil, nil
}
updateTime := time.Unix(headMessage.UpdateTime, 0)
row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsHistoryCols))

row[0].SetString(updateTime.Format("2006/01/02 15:04:05"), mysql.DefaultCollationName)
row[1].SetString(strings.ToUpper(f.DBName), mysql.DefaultCollationName)
row[2].SetString(strings.ToUpper(f.TableName), mysql.DefaultCollationName)
row[3].SetInt64(f.TableID)
if f.IndexName != "" {
row[4].SetString(strings.ToUpper(f.IndexName), mysql.DefaultCollationName)
row[5].SetInt64(f.IndexID)
} else {
row[4].SetNull()
row[5].SetNull()
}
row[6].SetInt64(int64(headMessage.RegionID))
row[7].SetInt64(int64(headMessage.StoreID))
row[8].SetInt64(int64(headMessage.PeerID))
row[9].SetString(strings.ToUpper(headMessage.HotRegionType), mysql.DefaultCollationName)
if headMessage.HotDegree != 0 {
row[10].SetInt64(headMessage.HotDegree)
} else {
row[10].SetNull()
}
if headMessage.FlowBytes != 0 {
row[11].SetFloat64(float64(headMessage.FlowBytes))
} else {
row[11].SetNull()
}
if headMessage.KeyRate != 0 {
row[12].SetFloat64(float64(headMessage.KeyRate))
} else {
row[12].SetNull()
}
if headMessage.QueryRate != 0 {
row[13].SetFloat64(float64(headMessage.QueryRate))
} else {
row[13].SetNull()
}
return row, nil
}
Loading

0 comments on commit 07b0cdd

Please sign in to comment.