-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: add retriver for tidb_hot_regions_history #27375
Changes from 34 commits
d8f5844
f87a112
644b406
9067e13
4724738
78a9484
ff0c630
faf8f52
0330dc6
42a5c61
c8e440d
034533d
6ba28be
8008798
95093e9
cb2cbed
0490b49
faf03b0
df7af2d
27f8e4b
f8b7416
bbc6ef8
d78ae5c
06f65df
c3f408f
6d6ed52
1077f70
c8d8c11
4ddc37f
485b76f
61dcc1b
57b2144
957fe33
e05a248
e4d113f
fa68994
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
package executor | ||
|
||
import ( | ||
"bytes" | ||
"container/heap" | ||
"context" | ||
"encoding/json" | ||
|
@@ -39,18 +40,22 @@ import ( | |
plannercore "github.com/pingcap/tidb/planner/core" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/sessionctx/variable" | ||
"github.com/pingcap/tidb/store/helper" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util" | ||
"github.com/pingcap/tidb/util/chunk" | ||
"github.com/pingcap/tidb/util/codec" | ||
"github.com/pingcap/tidb/util/execdetails" | ||
"github.com/pingcap/tidb/util/pdapi" | ||
"github.com/pingcap/tidb/util/set" | ||
"github.com/tikv/client-go/v2/tikv" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials" | ||
) | ||
|
||
const clusterLogBatchSize = 256 | ||
const hotRegionsHistoryBatchSize = 256 | ||
|
||
type dummyCloser struct{} | ||
|
||
|
@@ -696,3 +701,299 @@ func (e *clusterLogRetriever) close() error { | |
func (e *clusterLogRetriever) getRuntimeStats() execdetails.RuntimeStats { | ||
return nil | ||
} | ||
|
||
type hotRegionsResult struct { | ||
addr string | ||
messages *HistoryHotRegions | ||
err error | ||
} | ||
|
||
type hotRegionsResponseHeap []hotRegionsResult | ||
|
||
func (h hotRegionsResponseHeap) Len() int { | ||
return len(h) | ||
} | ||
|
||
func (h hotRegionsResponseHeap) Less(i, j int) bool { | ||
lhs, rhs := h[i].messages.HistoryHotRegion[0], h[j].messages.HistoryHotRegion[0] | ||
if lhs.UpdateTime != rhs.UpdateTime { | ||
return lhs.UpdateTime < rhs.UpdateTime | ||
} | ||
return lhs.HotDegree < rhs.HotDegree | ||
} | ||
|
||
func (h hotRegionsResponseHeap) Swap(i, j int) { | ||
h[i], h[j] = h[j], h[i] | ||
} | ||
|
||
func (h *hotRegionsResponseHeap) Push(x interface{}) { | ||
*h = append(*h, x.(hotRegionsResult)) | ||
} | ||
|
||
func (h *hotRegionsResponseHeap) Pop() interface{} { | ||
old := *h | ||
n := len(old) | ||
x := old[n-1] | ||
*h = old[0 : n-1] | ||
return x | ||
} | ||
|
||
type hotRegionsHistoryRetriver struct { | ||
dummyCloser | ||
isDrained bool | ||
retrieving bool | ||
heap *hotRegionsResponseHeap | ||
extractor *plannercore.HotRegionsHistoryTableExtractor | ||
} | ||
|
||
// HistoryHotRegionsRequest wrap conditions push down to PD. | ||
type HistoryHotRegionsRequest struct { | ||
StartTime int64 `json:"start_time,omitempty"` | ||
EndTime int64 `json:"end_time,omitempty"` | ||
RegionIDs []uint64 `json:"region_ids,omitempty"` | ||
StoreIDs []uint64 `json:"store_ids,omitempty"` | ||
PeerIDs []uint64 `json:"peer_ids,omitempty"` | ||
IsLearners []bool `json:"is_learners,omitempty"` | ||
IsLeaders []bool `json:"is_leaders,omitempty"` | ||
HotRegionTypes []string `json:"hot_region_type,omitempty"` | ||
} | ||
|
||
// HistoryHotRegions records filtered hot regions stored in each PD. | ||
// it's the response of PD. | ||
type HistoryHotRegions struct { | ||
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"` | ||
} | ||
|
||
// HistoryHotRegion records each hot region's statistics. | ||
// it's the response of PD. | ||
type HistoryHotRegion struct { | ||
UpdateTime int64 `json:"update_time,omitempty"` | ||
RegionID uint64 `json:"region_id,omitempty"` | ||
StoreID uint64 `json:"store_id,omitempty"` | ||
PeerID uint64 `json:"peer_id,omitempty"` | ||
IsLearner bool `json:"is_learner,omitempty"` | ||
IsLeader bool `json:"is_leader,omitempty"` | ||
HotRegionType string `json:"hot_region_type,omitempty"` | ||
HotDegree int64 `json:"hot_degree,omitempty"` | ||
FlowBytes float64 `json:"flow_bytes,omitempty"` | ||
KeyRate float64 `json:"key_rate,omitempty"` | ||
QueryRate float64 `json:"query_rate,omitempty"` | ||
StartKey []byte `json:"start_key,omitempty"` | ||
EndKey []byte `json:"end_key,omitempty"` | ||
} | ||
|
||
func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx sessionctx.Context) ([]chan hotRegionsResult, error) { | ||
if !hasPriv(sctx, mysql.ProcessPriv) { | ||
return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS") | ||
} | ||
pdServers, err := infoschema.GetPDServerInfo(sctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// To avoid search hot regions interface overload, the user should specify the time range in normally SQL. | ||
if e.extractor.StartTime == 0 { | ||
return nil, errors.New("denied to scan hot regions, please specified the start time, such as `update_time > '2020-01-01 00:00:00'`") | ||
} | ||
if e.extractor.EndTime == 0 { | ||
return nil, errors.New("denied to scan hot regions, please specified the end time, such as `update_time < '2020-01-01 00:00:00'`") | ||
} | ||
|
||
historyHotRegionsRequest := &HistoryHotRegionsRequest{ | ||
StartTime: e.extractor.StartTime, | ||
EndTime: e.extractor.EndTime, | ||
RegionIDs: e.extractor.RegionIDs, | ||
StoreIDs: e.extractor.StoreIDs, | ||
PeerIDs: e.extractor.PeerIDs, | ||
IsLearners: e.extractor.IsLearners, | ||
IsLeaders: e.extractor.IsLeaders, | ||
} | ||
|
||
return e.startRetrieving(ctx, sctx, pdServers, historyHotRegionsRequest) | ||
} | ||
|
||
func (e *hotRegionsHistoryRetriver) startRetrieving( | ||
ctx context.Context, | ||
sctx sessionctx.Context, | ||
pdServers []infoschema.ServerInfo, | ||
req *HistoryHotRegionsRequest, | ||
) ([]chan hotRegionsResult, error) { | ||
|
||
var results []chan hotRegionsResult | ||
for _, srv := range pdServers { | ||
for typ := range e.extractor.HotRegionTypes { | ||
req.HotRegionTypes = []string{typ} | ||
jsonBody, err := json.Marshal(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
body := bytes.NewBuffer(jsonBody) | ||
ch := make(chan hotRegionsResult) | ||
results = append(results, ch) | ||
go func(ch chan hotRegionsResult, address string, body *bytes.Buffer) { | ||
util.WithRecovery(func() { | ||
defer close(ch) | ||
IcePigZDB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory) | ||
req, err := http.NewRequest(http.MethodGet, url, body) | ||
if err != nil { | ||
ch <- hotRegionsResult{err: errors.Trace(err)} | ||
return | ||
} | ||
req.Header.Add("PD-Allow-follower-handle", "true") | ||
resp, err := util.InternalHTTPClient().Do(req) | ||
if err != nil { | ||
ch <- hotRegionsResult{err: errors.Trace(err)} | ||
return | ||
} | ||
defer func() { | ||
terror.Log(resp.Body.Close()) | ||
}() | ||
if resp.StatusCode != http.StatusOK { | ||
ch <- hotRegionsResult{err: errors.Errorf("request %s failed: %s", url, resp.Status)} | ||
return | ||
} | ||
var historyHotRegions HistoryHotRegions | ||
if err = json.NewDecoder(resp.Body).Decode(&historyHotRegions); err != nil { | ||
ch <- hotRegionsResult{err: errors.Trace(err)} | ||
return | ||
} | ||
ch <- hotRegionsResult{addr: address, messages: &historyHotRegions} | ||
}, nil) | ||
}(ch, srv.StatusAddr, body) | ||
} | ||
} | ||
return results, nil | ||
} | ||
|
||
func (e *hotRegionsHistoryRetriver) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { | ||
if e.extractor.SkipRequest || e.isDrained { | ||
return nil, nil | ||
} | ||
|
||
if !e.retrieving { | ||
e.retrieving = true | ||
results, err := e.initialize(ctx, sctx) | ||
if err != nil { | ||
e.isDrained = true | ||
return nil, err | ||
} | ||
// Initialize the heap | ||
e.heap = &hotRegionsResponseHeap{} | ||
for _, ch := range results { | ||
result := <-ch | ||
if result.err != nil { | ||
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) | ||
} | ||
if result.err != nil || len(result.messages.HistoryHotRegion) == 0 { | ||
continue | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
} | ||
*e.heap = append(*e.heap, result) | ||
} | ||
heap.Init(e.heap) | ||
} | ||
// Merge the results | ||
var finalRows [][]types.Datum | ||
allSchemas := sctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas() | ||
tz := sctx.GetSessionVars().Location() | ||
tikvStore, ok := sctx.GetStore().(helper.Storage) | ||
if !ok { | ||
return nil, errors.New("Information about hot region can be gotten only when the storage is TiKV") | ||
} | ||
tikvHelper := &helper.Helper{ | ||
Store: tikvStore, | ||
RegionCache: tikvStore.GetRegionCache(), | ||
} | ||
for e.heap.Len() > 0 && len(finalRows) < hotRegionsHistoryBatchSize { | ||
minTimeItem := heap.Pop(e.heap).(hotRegionsResult) | ||
row, err := e.getHotRegionRowWithSchemaInfo(minTimeItem.messages.HistoryHotRegion[0], tikvHelper, allSchemas, tz) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if row != nil { | ||
finalRows = append(finalRows, row) | ||
} | ||
minTimeItem.messages.HistoryHotRegion = minTimeItem.messages.HistoryHotRegion[1:] | ||
// Fetch next message item | ||
if len(minTimeItem.messages.HistoryHotRegion) != 0 { | ||
heap.Push(e.heap, minTimeItem) | ||
} | ||
} | ||
// All streams are drained | ||
e.isDrained = e.heap.Len() == 0 | ||
return finalRows, nil | ||
} | ||
|
||
func (e *hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo( | ||
hisHotRegion *HistoryHotRegion, | ||
tikvHelper *helper.Helper, | ||
allSchemas []*model.DBInfo, | ||
tz *time.Location, | ||
) ([]types.Datum, error) { | ||
_, startKey, _ := codec.DecodeBytes(hisHotRegion.StartKey, []byte{}) | ||
_, endKey, _ := codec.DecodeBytes(hisHotRegion.EndKey, []byte{}) | ||
region := &tikv.KeyLocation{StartKey: startKey, EndKey: endKey} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there possible the old table is dropped? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, we have discussed this case, dropped tables' hot region will be ignore in current implement. _, startKey, _ := codec.DecodeBytes(hisHotRegion.StartKey, []byte{})
_, endKey, _ := codec.DecodeBytes(hisHotRegion.EndKey, []byte{})
region := &tikv.KeyLocation{StartKey: startKey, EndKey: endKey}
hotRange, err := helper.NewRegionFrameRange(region)
if err != nil {
return nil, err
}
f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange)
// Ignore row without corresponding schema f.
if f == nil {
return nil, nil
} This case has been tested in // table_id = 131, index_id = 1, index_value = 1, deleted schema
{"2019-10-10 10:10:23", "UNKONW", "UNKONW", "131", "UNKONW", "1", "5", "5", "55555", "0", "1", "READ", "99", "99", "99", "99"},
{"2019-10-10 10:10:24", "UNKONW", "UNKONW", "131", "UNKONW", "1", "6", "6", "66666", "0", "0", "WRITE", "99", "99", "99", "99"}, |
||
hotRange, err := helper.NewRegionFrameRange(region) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange) | ||
// Ignore row without corresponding schema f. | ||
if f == nil { | ||
return nil, nil | ||
} | ||
row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsHistoryCols)) | ||
updateTimestamp := time.Unix(hisHotRegion.UpdateTime/1000, (hisHotRegion.UpdateTime%1000)*int64(time.Millisecond)) | ||
|
||
if updateTimestamp.Location() != tz { | ||
updateTimestamp.In(tz) | ||
} | ||
updateTime := types.NewTime(types.FromGoTime(updateTimestamp), mysql.TypeTimestamp, types.MinFsp) | ||
row[0].SetMysqlTime(updateTime) | ||
row[1].SetString(strings.ToUpper(f.DBName), mysql.DefaultCollationName) | ||
row[2].SetString(strings.ToUpper(f.TableName), mysql.DefaultCollationName) | ||
row[3].SetInt64(f.TableID) | ||
if f.IndexName != "" { | ||
row[4].SetString(strings.ToUpper(f.IndexName), mysql.DefaultCollationName) | ||
row[5].SetInt64(f.IndexID) | ||
} else { | ||
row[4].SetNull() | ||
row[5].SetNull() | ||
} | ||
row[6].SetInt64(int64(hisHotRegion.RegionID)) | ||
row[7].SetInt64(int64(hisHotRegion.StoreID)) | ||
row[8].SetInt64(int64(hisHotRegion.PeerID)) | ||
if hisHotRegion.IsLearner { | ||
row[9].SetInt64(1) | ||
} else { | ||
row[9].SetInt64(0) | ||
} | ||
if hisHotRegion.IsLeader { | ||
row[10].SetInt64(1) | ||
} else { | ||
row[10].SetInt64(0) | ||
} | ||
|
||
row[11].SetString(strings.ToUpper(hisHotRegion.HotRegionType), mysql.DefaultCollationName) | ||
if hisHotRegion.HotDegree != 0 { | ||
row[12].SetInt64(hisHotRegion.HotDegree) | ||
} else { | ||
row[12].SetNull() | ||
} | ||
if hisHotRegion.FlowBytes != 0 { | ||
row[13].SetFloat64(hisHotRegion.FlowBytes) | ||
} else { | ||
row[13].SetNull() | ||
} | ||
if hisHotRegion.KeyRate != 0 { | ||
row[14].SetFloat64(hisHotRegion.KeyRate) | ||
} else { | ||
row[14].SetNull() | ||
} | ||
if hisHotRegion.QueryRate != 0 { | ||
row[15].SetFloat64(hisHotRegion.QueryRate) | ||
} else { | ||
row[15].SetNull() | ||
} | ||
return row, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typ is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but now pd-ctl use a slice, we will fix it after pd-ctl and pd change to type.