Skip to content

Commit

Permalink
planner: add extractor for tidb_hot_regions_history
Browse files Browse the repository at this point in the history
Signed-off-by: IcePigZDB <icepigzdb@gmail.com>
  • Loading branch information
IcePigZDB committed Aug 15, 2021
1 parent dd87813 commit fe67ab4
Show file tree
Hide file tree
Showing 3 changed files with 881 additions and 5 deletions.
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4209,6 +4209,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.Extractor = &ClusterTableExtractor{}
case infoschema.TableClusterLog:
p.Extractor = &ClusterLogTableExtractor{}
case infoschema.TableTiDBHotRegionsHistory:
p.Extractor = &HotRegionsHistoryTableExtractor{}
case infoschema.TableInspectionResult:
p.Extractor = &InspectionResultTableExtractor{}
p.QueryTimeRange = b.timeRangeForSummaryTable()
Expand Down
358 changes: 358 additions & 0 deletions planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,121 @@ func (helper extractHelper) extractTimeRange(
return
}

// getQuantilesFunctionName is used to get the Quantiles(Int, Real, Decimal) range function name: GT, GE, LT, LE.
// For these expression, then getQuantilesFunctionName will do a sample function name convert.
func (helper extractHelper) getQuantilesFunctionName(fn *expression.ScalarFunction) string {
switch fn.Function.PbCode() {
case tipb.ScalarFuncSig_GTInt, tipb.ScalarFuncSig_GTReal, tipb.ScalarFuncSig_GTDecimal:
return ast.GT
case tipb.ScalarFuncSig_GEInt, tipb.ScalarFuncSig_GEReal, tipb.ScalarFuncSig_GEDecimal:
return ast.GE
case tipb.ScalarFuncSig_LTInt, tipb.ScalarFuncSig_LTReal, tipb.ScalarFuncSig_LTDecimal:
return ast.LT
case tipb.ScalarFuncSig_LEInt, tipb.ScalarFuncSig_LEReal, tipb.ScalarFuncSig_LEDecimal:
return ast.LE
default:
return fn.FuncName.L
}
}

// Extracts the time range column from `TiDB_Hot_Regions_History`, it use function pb code to
// distinguish from Int and (Real,Decimal) in GT and LE function.
// It leave EQ function to above SELECTION excutor different from getTimeFunctionName.
// Currently, this is used by `TiDB_Hot_Regions_History` only.
// e.g: SELECT * FROM t WHERE a>90 AND a<100
func (helper extractHelper) extractQuantilesRange(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
extractColName string,
) (
remained []expression.Expression,
// unix timestamp in nanoseconds
lowQuantile float64,
HighQuantile float64,
) {
var smallFloat64 = 1e-12
var smallInt64 = float64(1)
remained = make([]expression.Expression, 0, len(predicates))
extractCols := helper.findColumn(schema, names, extractColName)
if len(extractCols) == 0 {
return predicates, lowQuantile, HighQuantile
}

for _, expr := range predicates {
fn, ok := expr.(*expression.ScalarFunction)
if !ok {
remained = append(remained, expr)
continue
}

var colName string
var datums []types.Datum
fnName := helper.getQuantilesFunctionName(fn)
switch fnName {
case ast.GT, ast.GE, ast.LT, ast.LE:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
}

if colName == extractColName {
doubleType := types.NewFieldType(mysql.TypeDouble)
doubleType.Decimal = 12
doubleDatum, err := datums[0].ConvertTo(ctx.GetSessionVars().StmtCtx, doubleType)
if err != nil || doubleDatum.Kind() == types.KindNull {
remained = append(remained, expr)
continue
}
mysqlDouble := doubleDatum.GetFloat64()

switch fnName {
case ast.EQ:
lowQuantile = math.Max(lowQuantile, mysqlDouble)
if HighQuantile == 0 {
HighQuantile = mysqlDouble
} else {
HighQuantile = math.Min(HighQuantile, mysqlDouble)
}
case ast.GT:
// Fixme: add a samll number is not absolutely correct here
// add 1e-12 for float 64, add 1 for int
if fn.Function.PbCode() == tipb.ScalarFuncSig_GTInt {
lowQuantile = math.Max(lowQuantile, mysqlDouble+smallInt64)
} else {
lowQuantile = math.Max(lowQuantile, mysqlDouble+smallFloat64)
}
case ast.GE:
lowQuantile = math.Max(lowQuantile, mysqlDouble)
case ast.LT:
if HighQuantile == 0 {
if fn.Function.PbCode() == tipb.ScalarFuncSig_LTInt {
HighQuantile = mysqlDouble - smallInt64
} else {
HighQuantile = mysqlDouble - smallFloat64
}
} else {
if fn.Function.PbCode() == tipb.ScalarFuncSig_LTInt {
HighQuantile = math.Min(HighQuantile, HighQuantile-smallInt64)
} else {
HighQuantile = math.Min(HighQuantile, HighQuantile-smallFloat64)
}
}
case ast.LE:
if HighQuantile == 0 {
HighQuantile = mysqlDouble
} else {
HighQuantile = math.Min(HighQuantile, mysqlDouble)
}
default:
remained = append(remained, expr)
}
} else {
remained = append(remained, expr)
}
}
return
}

func (helper extractHelper) parseQuantiles(quantileSet set.StringSet) []float64 {
quantiles := make([]float64, 0, len(quantileSet))
for k := range quantileSet {
Expand All @@ -471,6 +586,20 @@ func (helper extractHelper) parseQuantiles(quantileSet set.StringSet) []float64
return quantiles
}

func (helper extractHelper) parseQuantilesUint64(quantileSet set.StringSet) []uint64 {
quantiles := make([]uint64, 0, len(quantileSet))
for k := range quantileSet {
v, err := strconv.ParseUint(k, 10, 64)
if err != nil {
// ignore the parse error won't affect result.
continue
}
quantiles = append(quantiles, v)
}
sort.Slice(quantiles, func(i, j int) bool { return quantiles[i] < quantiles[j] })
return quantiles
}

func (helper extractHelper) extractCols(
schema *expression.Schema,
names []*types.FieldName,
Expand Down Expand Up @@ -661,6 +790,235 @@ func (e *ClusterLogTableExtractor) explainInfo(p *PhysicalMemTable) string {
return s
}

// HotRegionsHistoryTableExtractor is used to extract some predicates of `tidb_hot_regions_history`
type HotRegionsHistoryTableExtractor struct {
extractHelper

// SkipRequest means the where clause always false, we don't need to request any pd server.
SkipRequest bool

// StartTime represents the beginning time of update time.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE update_time>'2019-10-10 10:10:10.999'
StartTime int64
// EndTime represents the ending time of update time.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE update_time<'2019-10-11 10:10:10.999'
EndTime int64

// RegionIDs(StoreIDs) represents all region(store) ids we should filter in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE region_id=1
// 2. SELECT * FROM tidb_hot_regions_history WHERE table_id in (11, 22)
// leave range operation to above
RegionIDs []uint64 // use uint64 to match PD server
StoreIDs []uint64
PeerIDs []uint64

// HotRegionTypes represents all hot region types we should filter in PD to reduce network IO.
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE type='read'
// 2. SELECT * FROM tidb_hot_regions_history WHERE type in ('read', 'write')
// 3. SELECT * FROM tidb_hot_regions_history WHERE type='read' and type='write' -> SkipRequest = true
HotRegionTypes set.StringSet

// LowHotDegree, HighHotDegree, LowFlowBytes, HighFlowBytes,
// LowKeyRate, HighKeyRate, LowQueryRate and HighQueryRate are used to push down range filter on
// hot_degree, flow_bytes, key_rate, query_rate fields to PD.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE hot_degree>90
LowHotDegree int64
// e.g: SELECT * FROM tidb_hot_regions_history WHERE hot_degree<100
HighHotDegree int64
LowFlowBytes float64
HighFlowBytes float64
LowKeyRate float64
HighKeyRate float64
LowQueryRate float64
HighQueryRate float64

// DBNames, TableNames, IndexNames, TableIDs, IndexIDs will be filtered in TiDB,
// because PD can't parse schema info.
// DBNames represents database names applied to, and we should apply all databases' hot regions if there is no database specified.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE db_name in ('mysql', 'test')
DBNames set.StringSet
// TableNames represents table names applied to, and we should apply all tables' hot regions if there is no table specified.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE table_name in ('tables_priv', 'stats_meta')
TableNames set.StringSet
// IndexNames represents index names applied to, and we should apply all indexes' hot regions if there is no index specified.
// e.g: SELECT * FROM tidb_hot_regions_history WHERE index_name in ('idx_ver', 'tbl')
IndexNames set.StringSet
// TableID(IndexID) represents all table(index) ids we should filter in TiDB
// e.g:
// 1. SELECT * FROM tidb_hot_regions_history WHERE table_id=11
// 2. SELECT * FROM tidb_hot_regions_history WHERE table_id in (11, 21)
TableIDs []uint64
IndexIDs []uint64
}

// Extract implements the MemTablePredicateExtractor Extract interface
func (e *HotRegionsHistoryTableExtractor) Extract(
ctx sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) []expression.Expression {
// Extract the `region_id/store_id/peer_id/type/db_name/table_name/index_name/table_id/index_id` columns
remained, regionIDSkipRequest, regionIDs := e.extractCol(schema, names, predicates, "region_id", false)
remained, storeIDSkipRequest, storeIDs := e.extractCol(schema, names, remained, "store_id", false)
remained, peerIDSkipRequest, peerIDs := e.extractCol(schema, names, remained, "peer_id", false)
remained, typeSkipRequest, types := e.extractCol(schema, names, remained, "type", false)
remained, dbNameSkipRequest, dbNames := e.extractCol(schema, names, remained, "db_name", true)
remained, tableNameSkipRequest, tableNames := e.extractCol(schema, names, remained, "table_name", true)
remained, indexNameSkipRequest, indexNames := e.extractCol(schema, names, remained, "index_name", true)
remained, tableIDSkipRequest, tableIDs := e.extractCol(schema, names, remained, "table_id", false)
remained, indexIDSkipRequest, indexIDs := e.extractCol(schema, names, remained, "index_id", false)

e.SkipRequest = regionIDSkipRequest || storeIDSkipRequest || peerIDSkipRequest ||
typeSkipRequest || dbNameSkipRequest || tableNameSkipRequest ||
indexNameSkipRequest || tableIDSkipRequest || indexIDSkipRequest
if e.SkipRequest {
return nil
}
e.RegionIDs, e.StoreIDs, e.PeerIDs = e.parseQuantilesUint64(regionIDs), e.parseQuantilesUint64(storeIDs), e.parseQuantilesUint64(peerIDs)
e.HotRegionTypes, e.DBNames, e.TableNames, e.IndexNames = types, dbNames, tableNames, indexNames
e.TableIDs, e.IndexIDs = e.parseQuantilesUint64(tableIDs), e.parseQuantilesUint64(indexIDs)

remained, startTime, endTime := e.extractTimeRange(ctx, schema, names, remained, "update_time", time.Local)
// The time unit for search hot regions is millisecond.
startTime = startTime / int64(time.Millisecond)
endTime = endTime / int64(time.Millisecond)
e.StartTime = startTime
e.EndTime = endTime
if startTime != 0 && endTime != 0 {
e.SkipRequest = startTime > endTime
}

// Extract the `hot_degree/flow_bytes/query_rate/key_rate` columns
// Not extract equal condition in comparison to time range, leave it to upper selection node
remained, lowHotDegree, highHotDegree := e.extractQuantilesRange(ctx, schema, names, remained, "hot_degree")
remained, lowFlowBytes, highFlowBytes := e.extractQuantilesRange(ctx, schema, names, remained, "flow_bytes")
remained, lowKeyRate, highKeyRate := e.extractQuantilesRange(ctx, schema, names, remained, "flow_bytes")
remained, lowQueryRate, highQueryRate := e.extractQuantilesRange(ctx, schema, names, remained, "query_rate")

e.LowHotDegree, e.HighHotDegree = int64(lowHotDegree), int64(highHotDegree)
e.LowFlowBytes, e.HighFlowBytes = lowFlowBytes, highFlowBytes
e.LowKeyRate, e.HighKeyRate = lowKeyRate, highKeyRate
e.LowQueryRate, e.HighQueryRate = lowQueryRate, highQueryRate

// normal case
// low high
// 0 100 x<100
// 10 100 10<x<100
// skip case
// 100 10 SkipRequest
if lowHotDegree != 0 && highHotDegree != 0 {
e.SkipRequest = lowHotDegree > highHotDegree
}
// 10 0 10<x<math.MaxFloat64
// 0 0 0<x<math.MaxFloat64
if highHotDegree == 0 {
e.HighHotDegree = math.MaxInt64
}

if lowFlowBytes != 0 && highFlowBytes != 0 {
e.SkipRequest = lowFlowBytes > highFlowBytes
}
if highFlowBytes == 0 {
e.HighFlowBytes = math.MaxFloat64
}

if lowKeyRate != 0 && highKeyRate != 0 {
e.SkipRequest = lowKeyRate > highQueryRate
}
if highKeyRate == 0 {
e.HighKeyRate = math.MaxFloat64
}

if lowQueryRate != 0 && highQueryRate != 0 {
e.SkipRequest = lowQueryRate > highQueryRate
}
if highQueryRate == 0 {
e.HighQueryRate = math.MaxFloat64
}

if e.SkipRequest {
return nil
}
return remained
}

func (e *HotRegionsHistoryTableExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
r := new(bytes.Buffer)
st, et := e.StartTime, e.EndTime
if st > 0 {
st := time.Unix(0, st*1e6)
r.WriteString(fmt.Sprintf("start_time:%v, ", st.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat)))
}
if et > 0 {
et := time.Unix(0, et*1e6)
r.WriteString(fmt.Sprintf("end_time:%v, ", et.In(p.ctx.GetSessionVars().StmtCtx.TimeZone).Format(MetricTableTimeFormat)))
}
if len(e.RegionIDs) > 0 {
r.WriteString(fmt.Sprintf("region_ids:[%v], ", e.RegionIDs))
}
if len(e.StoreIDs) > 0 {
r.WriteString(fmt.Sprintf("store_ids:[%v], ", e.StoreIDs))
}
if len(e.PeerIDs) > 0 {
r.WriteString(fmt.Sprintf("peer_ids:[%v], ", e.PeerIDs))
}
if len(e.HotRegionTypes) > 0 {
r.WriteString(fmt.Sprintf("hot_region_types:[%s], ", extractStringFromStringSet(e.HotRegionTypes)))
}
if e.LowHotDegree > 0 {
r.WriteString(fmt.Sprintf("LowHotDegree:%d, ", e.LowHotDegree))
}
if 0 < e.HighHotDegree && e.HighHotDegree < math.MaxInt64 {
r.WriteString(fmt.Sprintf("HighHotDegree:%d, ", e.HighHotDegree))
}
if e.LowFlowBytes > 0 {
r.WriteString(fmt.Sprintf("LowFlowBytes:%f, ", e.LowFlowBytes))
}
if 0 < e.HighFlowBytes && e.HighFlowBytes < math.MaxFloat64 {
r.WriteString(fmt.Sprintf("HighFlowBytes:%f, ", e.HighFlowBytes))
}
if e.LowKeyRate > 0 {
r.WriteString(fmt.Sprintf("LowKeyRate:%f, ", e.LowKeyRate))
}
if 0 < e.HighKeyRate && e.HighKeyRate < math.MaxFloat64 {
r.WriteString(fmt.Sprintf("HighKeyRate:%f, ", e.HighKeyRate))
}
if e.LowQueryRate > 0 {
r.WriteString(fmt.Sprintf("LowQueryRate:%f, ", e.LowQueryRate))
}
if 0 < e.HighQueryRate && e.HighQueryRate < math.MaxFloat64 {
r.WriteString(fmt.Sprintf("HighQueryRate:%f, ", e.HighQueryRate))
}
if len(e.DBNames) > 0 {
r.WriteString(fmt.Sprintf("DB_names:[%s], ", extractStringFromStringSet(e.DBNames)))
}
if len(e.TableNames) > 0 {
r.WriteString(fmt.Sprintf("table_names:[%s], ", extractStringFromStringSet(e.TableNames)))
}
if len(e.IndexNames) > 0 {
r.WriteString(fmt.Sprintf("index_names:[%s], ", extractStringFromStringSet(e.IndexNames)))
}
if len(e.TableIDs) > 0 {
r.WriteString(fmt.Sprintf("table_ids:[%v], ", e.TableIDs))
}
if len(e.IndexIDs) > 0 {
r.WriteString(fmt.Sprintf("index_ids:[%v], ", e.IndexIDs))
}

// remove the last ", " in the message info
s := r.String()
if len(s) > 2 {
return s[:len(s)-2]
}
return s
}

// MetricTableExtractor is used to extract some predicates of metrics_schema tables.
type MetricTableExtractor struct {
extractHelper
Expand Down
Loading

0 comments on commit fe67ab4

Please sign in to comment.