diff --git a/executor/builder.go b/executor/builder.go index 0aa22d3d1431d..a30f10c5ec3b8 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1327,6 +1327,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo extractor: v.Extractor.(*plannercore.MetricTableExtractor), }, } + case strings.ToLower(infoschema.TableMetricSummaryByLabel): + return &ClusterReaderExec{ + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), + retriever: &MetricSummaryByLabelRetriever{ + table: v.Table, + extractor: v.Extractor.(*plannercore.MetricTableExtractor), + }, + } } } tb, _ := b.is.TableByID(v.Table.ID) diff --git a/executor/metric_reader.go b/executor/metric_reader.go index 7545bade99250..1ae10a2002911 100644 --- a/executor/metric_reader.go +++ b/executor/metric_reader.go @@ -243,3 +243,67 @@ func (e *MetricSummaryRetriever) genMetricQuerySQLS(name, startTime, endTime str } return sqls } + +// MetricSummaryByLabelRetriever uses to read metric detail data. +type MetricSummaryByLabelRetriever struct { + dummyCloser + table *model.TableInfo + extractor *plannercore.MetricTableExtractor + retrieved bool +} + +func (e *MetricSummaryByLabelRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { + if e.retrieved || e.extractor.SkipRequest { + return nil, nil + } + e.retrieved = true + totalRows := make([][]types.Datum, 0, len(infoschema.MetricTableMap)) + quantiles := []float64{1, 0.999, 0.99, 0.90, 0.80} + tps := make([]*types.FieldType, 0, len(e.table.Columns)) + for _, col := range e.table.Columns { + tps = append(tps, &col.FieldType) + } + startTime := e.extractor.StartTime.Format(plannercore.MetricTableTimeFormat) + endTime := e.extractor.EndTime.Format(plannercore.MetricTableTimeFormat) + for name, def := range infoschema.MetricTableMap { + sqls := e.genMetricQuerySQLS(name, startTime, endTime, quantiles, def) + for _, sql := range sqls { + rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) + if err != nil { + return nil, errors.Trace(err) + } + for _, row := range rows { + totalRows = append(totalRows, row.GetDatumRow(tps)) + } + } + } + return totalRows, nil +} + +func (e *MetricSummaryByLabelRetriever) genMetricQuerySQLS(name, startTime, endTime string, quantiles []float64, def infoschema.MetricTableDef) []string { + labels := "" + labelsColumn := `""` + if len(def.Labels) > 0 { + labels = "`" + strings.Join(def.Labels, "`, `") + "`" + labelsColumn = fmt.Sprintf("concat_ws(' = ', '%s', concat_ws(', ', %s))", strings.Join(def.Labels, ", "), labels) + } + if def.Quantile == 0 { + quantiles = []float64{0} + } + sqls := []string{} + for _, quantile := range quantiles { + var sql string + if quantile == 0 { + sql = fmt.Sprintf(`select "%[1]s", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s'`, + name, labelsColumn, startTime, endTime) + } else { + sql = fmt.Sprintf(`select "%[1]s_%[5]v", %[2]s as label,min(time),sum(value),avg(value),min(value),max(value) from metric_schema.%[1]s where time > '%[3]s' and time < '%[4]s' and quantile=%[5]v`, + name, labelsColumn, startTime, endTime, quantile) + } + if len(def.Labels) > 0 { + sql += " group by " + labels + } + sqls = append(sqls, sql) + } + return sqls +} diff --git a/infoschema/tables.go b/infoschema/tables.go index e5492d63c8346..cc693dc145f99 100755 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -107,7 +107,9 @@ const ( // TableInspectionResult is the string constant of inspection result table TableInspectionResult = "INSPECTION_RESULT" // TableMetricSummary is a summary table that contains all metrics. - TableMetricSummary = "METRIC_SUMMARY" + TableMetricSummary = "METRICS_SUMMARY" + // TableMetricSummaryByLabel is a metric table that contains all metrics that group by label info. + TableMetricSummaryByLabel = "METRICS_SUMMARY_BY_LABEL" ) var tableIDMap = map[string]int64{ @@ -163,6 +165,7 @@ var tableIDMap = map[string]int64{ TableClusterSystemInfo: autoid.InformationSchemaDBID + 50, TableInspectionResult: autoid.InformationSchemaDBID + 51, TableMetricSummary: autoid.InformationSchemaDBID + 52, + TableMetricSummaryByLabel: autoid.InformationSchemaDBID + 53, } type columnInfo struct { @@ -1136,6 +1139,15 @@ var tableMetricSummaryCols = []columnInfo{ {"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, {"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, } +var tableMetricSummaryByLabelCols = []columnInfo{ + {"METRIC_NAME", mysql.TypeVarchar, 64, 0, nil, nil}, + {"LABEL", mysql.TypeVarchar, 64, 0, nil, nil}, + {"TIME", mysql.TypeDatetime, -1, 0, nil, nil}, + {"SUM_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, + {"AVG_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, + {"MIN_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, + {"MAX_VALUE", mysql.TypeDouble, 22, 0, nil, nil}, +} func dataForSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum { checker := privilege.GetPrivilegeManager(ctx) @@ -2347,6 +2359,7 @@ var tableNameToColumns = map[string][]columnInfo{ TableClusterSystemInfo: tableClusterSystemInfoCols, TableInspectionResult: tableInspectionResultCols, TableMetricSummary: tableMetricSummaryCols, + TableMetricSummaryByLabel: tableMetricSummaryByLabelCols, } func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 5d03df3c85f7c..54eb19f50a589 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2793,7 +2793,7 @@ func (b *PlanBuilder) buildMemTable(ctx context.Context, dbName model.CIStr, tab p.Extractor = &ClusterLogTableExtractor{} case infoschema.TableInspectionResult: p.Extractor = &InspectionResultTableExtractor{} - case infoschema.TableMetricSummary: + case infoschema.TableMetricSummary, infoschema.TableMetricSummaryByLabel: p.Extractor = newMetricTableExtractor() } } diff --git a/session/session.go b/session/session.go index 57d42a2010e3a..b5b0da8ea71c6 100644 --- a/session/session.go +++ b/session/session.go @@ -853,6 +853,10 @@ func createSessionFunc(store kv.Storage) pools.Factory { if err != nil { return nil, errors.Trace(err) } + err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxAllowedPacket, types.NewStringDatum("67108864")) + if err != nil { + return nil, errors.Trace(err) + } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true return se, nil