From c3ecb0487597ed92df1edf9703119a943176331b Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 27 Oct 2022 17:05:57 +0800 Subject: [PATCH] executor: support record analzye merge job (#38672) ref pingcap/tidb#35142 --- executor/analyze.go | 34 ++++++++++++++ executor/analyze_global_stats.go | 77 +++++++++++++++++++++----------- executor/show_stats_test.go | 15 +++++++ 3 files changed, 101 insertions(+), 25 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 66334ad05d647..a66a92d8ff74d 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -426,6 +426,40 @@ func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCo } } +// FinishAnalyzeMergeJob finishes analyze merge job +func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) { + if job == nil || job.ID == nil { + return + } + job.EndTime = time.Now() + var sql string + var args []interface{} + if analyzeErr != nil { + failReason := analyzeErr.Error() + const textMaxLength = 65535 + if len(failReason) > textMaxLength { + failReason = failReason[:textMaxLength] + } + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?" + args = []interface{}{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID} + } else { + sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?" + args = []interface{}{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID} + } + exec := sctx.(sqlexec.RestrictedSQLExecutor) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + _, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...) + if err != nil { + var state string + if analyzeErr != nil { + state = statistics.AnalyzeFailed + } else { + state = statistics.AnalyzeFinished + } + logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err)) + } +} + // FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time. func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) { if job == nil || job.ID == nil { diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 440f0a104e207..d1e96a5824abd 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -16,6 +16,7 @@ package executor import ( "context" + "fmt" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" @@ -58,36 +59,62 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo if globalStatsID.tableID != tableID { continue } - globalOpts := e.opts - if e.OptionsMap != nil { - if v2Options, ok := e.OptionsMap[globalStatsID.tableID]; ok { - globalOpts = v2Options.FilledOpts + job := e.newAnalyzeHandleGlobalStatsJob(globalStatsID) + AddNewAnalyzeJob(e.ctx, job) + StartAnalyzeJob(e.ctx, job) + mergeStatsErr := func() error { + globalOpts := e.opts + if e.OptionsMap != nil { + if v2Options, ok := e.OptionsMap[globalStatsID.tableID]; ok { + globalOpts = v2Options.FilledOpts + } } - } - globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, globalOpts, e.ctx.GetInfoSchema().(infoschema.InfoSchema), - globalStatsID.tableID, info.isIndex, info.histIDs, - tableAllPartitionStats) - if err != nil { - if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) { - // When we find some partition-level stats are missing, we need to report warning. - e.ctx.GetSessionVars().StmtCtx.AppendWarning(err) - continue - } - return err - } - for i := 0; i < globalStats.Num; i++ { - hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i] - // fms for global stats doesn't need to dump to kv. - err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1, true) + globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, globalOpts, e.ctx.GetInfoSchema().(infoschema.InfoSchema), + globalStatsID.tableID, info.isIndex, info.histIDs, + tableAllPartitionStats) if err != nil { - logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err)) + if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) { + // When we find some partition-level stats are missing, we need to report warning. + e.ctx.GetSessionVars().StmtCtx.AppendWarning(err) + return nil + } + return err } - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + for i := 0; i < globalStats.Num; i++ { + hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i] + // fms for global stats doesn't need to dump to kv. + err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1, true) + if err != nil { + logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err)) + } + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } } - } + return nil + }() + FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr) } } return nil } + +func (e *AnalyzeExec) newAnalyzeHandleGlobalStatsJob(key globalStatsKey) *statistics.AnalyzeJob { + dom := domain.GetDomain(e.ctx) + is := dom.InfoSchema() + table, _ := is.TableByID(key.tableID) + db, _ := is.SchemaByTable(table.Meta()) + dbName := db.Name.String() + tableName := table.Meta().Name.String() + jobInfo := fmt.Sprintf("merge global stats for %v.%v columns", dbName, tableName) + if key.indexID != -1 { + idxName := table.Meta().FindIndexNameByID(key.indexID) + jobInfo = fmt.Sprintf("merge global stats for %v.%v's index %v", dbName, tableName, idxName) + } + return &statistics.AnalyzeJob{ + DBName: db.Name.String(), + TableName: table.Meta().Name.String(), + JobInfo: jobInfo, + } +} diff --git a/executor/show_stats_test.go b/executor/show_stats_test.go index cb8bdebcad8ac..9fea890e7251d 100644 --- a/executor/show_stats_test.go +++ b/executor/show_stats_test.go @@ -397,4 +397,19 @@ func TestShowAnalyzeStatus(t *testing.T) { require.Equal(t, "", rows[1][8]) require.Equal(t, addr, rows[1][9]) require.Equal(t, "", rows[1][10]) + + tk.MustExec("delete from mysql.analyze_jobs") + tk.MustExec("create table t2 (a int, b int, primary key(a)) PARTITION BY RANGE ( a )(PARTITION p0 VALUES LESS THAN (6))") + tk.MustExec(`insert into t2 values (1, 1), (2, 2)`) + tk.MustExec("analyze table t2") + rows = tk.MustQuery("show analyze status").Rows() + require.Len(t, rows, 2) + require.Equal(t, "merge global stats for test.t2 columns", rows[0][3]) + + tk.MustExec("delete from mysql.analyze_jobs") + tk.MustExec("alter table t2 add index idx(b)") + tk.MustExec("analyze table t2 index idx") + rows = tk.MustQuery("show analyze status").Rows() + require.Len(t, rows, 2) + require.Equal(t, "merge global stats for test.t2's index idx", rows[0][3]) }