Skip to content

Commit

Permalink
executor: support record analzye merge job (#38672)
Browse files Browse the repository at this point in the history
ref #35142
  • Loading branch information
Yisaer authored Oct 27, 2022
1 parent 756c7b3 commit c3ecb04
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 25 deletions.
34 changes: 34 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,40 @@ func UpdateAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, rowCo
}
}

// FinishAnalyzeMergeJob finishes analyze merge job
func FinishAnalyzeMergeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
return
}
job.EndTime = time.Now()
var sql string
var args []interface{}
if analyzeErr != nil {
failReason := analyzeErr.Error()
const textMaxLength = 65535
if len(failReason) > textMaxLength {
failReason = failReason[:textMaxLength]
}
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, fail_reason = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFailed, failReason, *job.ID}
} else {
sql = "UPDATE mysql.analyze_jobs SET end_time = CONVERT_TZ(%?, '+00:00', @@TIME_ZONE), state = %?, process_id = NULL WHERE id = %?"
args = []interface{}{job.EndTime.UTC().Format(types.TimeFormat), statistics.AnalyzeFinished, *job.ID}
}
exec := sctx.(sqlexec.RestrictedSQLExecutor)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, args...)
if err != nil {
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
} else {
state = statistics.AnalyzeFinished
}
logutil.BgLogger().Warn("failed to update analyze job", zap.String("update", fmt.Sprintf("%s->%s", statistics.AnalyzeRunning, state)), zap.Error(err))
}
}

// FinishAnalyzeJob updates the state of the analyze job to finished/failed according to `meetError` and sets the end time.
func FinishAnalyzeJob(sctx sessionctx.Context, job *statistics.AnalyzeJob, analyzeErr error) {
if job == nil || job.ID == nil {
Expand Down
77 changes: 52 additions & 25 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"fmt"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -58,36 +59,62 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
if globalStatsID.tableID != tableID {
continue
}
globalOpts := e.opts
if e.OptionsMap != nil {
if v2Options, ok := e.OptionsMap[globalStatsID.tableID]; ok {
globalOpts = v2Options.FilledOpts
job := e.newAnalyzeHandleGlobalStatsJob(globalStatsID)
AddNewAnalyzeJob(e.ctx, job)
StartAnalyzeJob(e.ctx, job)
mergeStatsErr := func() error {
globalOpts := e.opts
if e.OptionsMap != nil {
if v2Options, ok := e.OptionsMap[globalStatsID.tableID]; ok {
globalOpts = v2Options.FilledOpts
}
}
}
globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, globalOpts, e.ctx.GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID, info.isIndex, info.histIDs,
tableAllPartitionStats)
if err != nil {
if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
continue
}
return err
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1, true)
globalStats, err := statsHandle.MergePartitionStats2GlobalStatsByTableID(e.ctx, globalOpts, e.ctx.GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID, info.isIndex, info.histIDs,
tableAllPartitionStats)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
if types.ErrPartitionStatsMissing.Equal(err) || types.ErrPartitionColumnStatsMissing.Equal(err) {
// When we find some partition-level stats are missing, we need to report warning.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
return err
}
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1, true)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
}
return nil
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}
}
return nil
}

func (e *AnalyzeExec) newAnalyzeHandleGlobalStatsJob(key globalStatsKey) *statistics.AnalyzeJob {
dom := domain.GetDomain(e.ctx)
is := dom.InfoSchema()
table, _ := is.TableByID(key.tableID)
db, _ := is.SchemaByTable(table.Meta())
dbName := db.Name.String()
tableName := table.Meta().Name.String()
jobInfo := fmt.Sprintf("merge global stats for %v.%v columns", dbName, tableName)
if key.indexID != -1 {
idxName := table.Meta().FindIndexNameByID(key.indexID)
jobInfo = fmt.Sprintf("merge global stats for %v.%v's index %v", dbName, tableName, idxName)
}
return &statistics.AnalyzeJob{
DBName: db.Name.String(),
TableName: table.Meta().Name.String(),
JobInfo: jobInfo,
}
}
15 changes: 15 additions & 0 deletions executor/show_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,19 @@ func TestShowAnalyzeStatus(t *testing.T) {
require.Equal(t, "<nil>", rows[1][8])
require.Equal(t, addr, rows[1][9])
require.Equal(t, "<nil>", rows[1][10])

tk.MustExec("delete from mysql.analyze_jobs")
tk.MustExec("create table t2 (a int, b int, primary key(a)) PARTITION BY RANGE ( a )(PARTITION p0 VALUES LESS THAN (6))")
tk.MustExec(`insert into t2 values (1, 1), (2, 2)`)
tk.MustExec("analyze table t2")
rows = tk.MustQuery("show analyze status").Rows()
require.Len(t, rows, 2)
require.Equal(t, "merge global stats for test.t2 columns", rows[0][3])

tk.MustExec("delete from mysql.analyze_jobs")
tk.MustExec("alter table t2 add index idx(b)")
tk.MustExec("analyze table t2 index idx")
rows = tk.MustQuery("show analyze status").Rows()
require.Len(t, rows, 2)
require.Equal(t, "merge global stats for test.t2's index idx", rows[0][3])
}

0 comments on commit c3ecb04

Please sign in to comment.