Skip to content

Commit

Permalink
statistics: add source column for stats_meta_hisotry table (#39835)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 14, 2022
1 parent 1e7c552 commit 388e008
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 83 deletions.
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err), zap.Int64("tableID", tableID))
Expand Down
2 changes: 2 additions & 0 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -94,6 +95,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
info.statsVersion,
1,
true,
handle.StatsMetaHistorySourceAnalyze,
)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo),
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- errors.Trace(ErrQueryInterrupted)
return
}
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot, handle.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
Expand Down
1 change: 1 addition & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) {
}
tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time", tableInfo.Meta().ID)).Sort().Check(
testkit.Rows("18 18", "21 21", "24 24", "27 27", "30 30"))
tk.MustQuery(fmt.Sprintf("select distinct source from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Sort().Check(testkit.Rows("flush stats"))

// assert delete
tk.MustExec("delete from test.t where test.t.a = 1")
Expand Down
13 changes: 12 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ const (
modify_count bigint(64) NOT NULL,
count bigint(64) NOT NULL,
version bigint(64) NOT NULL comment 'stats version which corresponding to stats:version in EXPLAIN',
source varchar(40) NOT NULL,
create_time datetime(6) NOT NULL,
UNIQUE KEY table_version (table_id, version),
KEY table_create_time (table_id, create_time)
Expand Down Expand Up @@ -732,11 +733,13 @@ const (
version107 = 107
// version108 adds the table tidb_ttl_table_status
version108 = 108
// version109 add column source to mysql.stats_meta_history
version109 = 109
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version108
var currentBootstrapVersion int64 = version109

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -849,6 +852,7 @@ var (
upgradeToVer106,
upgradeToVer107,
upgradeToVer108,
upgradeToVer109,
}
)

Expand Down Expand Up @@ -2190,6 +2194,13 @@ func upgradeToVer108(s Session, ver int64) {
doReentrantDDL(s, CreateTTLTableStatus)
}

func upgradeToVer109(s Session, ver int64) {
if ver >= version109 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.stats_meta_history ADD COLUMN IF NOT EXISTS `source` varchar(40) NOT NULL after `version`;")
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down
5 changes: 3 additions & 2 deletions session/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ func TestUpgradeVersion84(t *testing.T) {
{"modify_count", "bigint(64)"},
{"count", "bigint(64)"},
{"version", "bigint(64)"},
{"source", "varchar(40)"},
{"create_time", "datetime(6)"},
}
rStatsHistoryTbl, err := tk.Exec(`desc mysql.stats_meta_history`)
require.NoError(t, err)
req := rStatsHistoryTbl.NewChunk(nil)
require.NoError(t, rStatsHistoryTbl.Next(ctx, req))
require.Equal(t, 5, req.NumRows())
for i := 0; i < 5; i++ {
require.Equal(t, 6, req.NumRows())
for i := 0; i < 6; i++ {
row := req.GetRow(i)
require.Equal(t, statsHistoryTblFields[i].field, strings.ToLower(row.GetString(0)))
require.Equal(t, statsHistoryTblFields[i].tp, strings.ToLower(row.GetString(1)))
Expand Down
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"gc.go",
"handle.go",
"handle_hist.go",
"historical_stats_handler.go",
"lru_cache.go",
"statscache.go",
"update.go",
Expand Down
9 changes: 5 additions & 4 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
for i := 0; i < newColGlobalStats.Num; i++ {
hg, cms, topN := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i]
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount, 0, hg, cms, topN, 2, 1, false)
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount,
0, hg, cms, topN, 2, 1, false, StatsMetaHistorySourceSchemaChange)
if err != nil {
return err
}
Expand Down Expand Up @@ -186,7 +187,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
for i := 0; i < newIndexGlobalStats.Num; i++ {
hg, cms, topN := newIndexGlobalStats.Hg[i], newIndexGlobalStats.Cms[i], newIndexGlobalStats.TopN[i]
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false)
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newIndexGlobalStats.ModifyCount, 1, hg, cms, topN, 2, 1, false, StatsMetaHistorySourceSchemaChange)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +222,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer, StatsMetaHistorySourceSchemaChange)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -263,7 +264,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(physicalID, statsVer)
h.recordHistoricalStatsMeta(physicalID, statsVer, StatsMetaHistorySourceSchemaChange)
}
}()
h.mu.Lock()
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,
// loadStatsFromJSON doesn't support partition table now.
// The table level Count and Modify_count would be overridden by the SaveMetaToStorage below, so we don't need
// to care about them here.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.StatsVer), 1, false)
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.StatsVer), 1, false, StatsMetaHistorySourceLoadStats)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -356,7 +356,7 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,
// loadStatsFromJSON doesn't support partition table now.
// The table level Count and Modify_count would be overridden by the SaveMetaToStorage below, so we don't need
// to care about them here.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.StatsVer), 1, false)
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.StatsVer), 1, false, StatsMetaHistorySourceLoadStats)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -365,7 +365,7 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,
if err != nil {
return errors.Trace(err)
}
return h.SaveMetaToStorage(tbl.PhysicalID, tbl.Count, tbl.ModifyCount)
return h.SaveMetaToStorage(tbl.PhysicalID, tbl.Count, tbl.ModifyCount, StatsMetaHistorySourceLoadStats)
}

// TableStatsFromJSON loads statistic from JSONTable and return the Table of statistic.
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestDumpCMSketchWithTopN(t *testing.T) {
cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100)

stat := h.GetTableStats(tableInfo)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.Version2, 1, false)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.Version2, 1, false, handle.StatsMetaHistorySourceLoadStats)
require.NoError(t, err)
require.Nil(t, h.Update(is))

Expand Down
80 changes: 12 additions & 68 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,27 +1614,20 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) {
tableID := results.TableID.GetStatisticsID()
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
return SaveTableStatsToStorage(h.mu.ctx, results, analyzeSnapshot)
return SaveTableStatsToStorage(h.mu.ctx, results, analyzeSnapshot, source)
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool) (err error) {
func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) {
needDumpFMS := results.TableID.IsPartitionTable()
tableID := results.TableID.GetStatisticsID()
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
if err1 := recordHistoricalStatsMeta(sctx, tableID, statsVer); err1 != nil {
if err1 := recordHistoricalStatsMeta(sctx, tableID, statsVer, source); err1 != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
Expand Down Expand Up @@ -1809,11 +1802,12 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding
// fields in the stats_meta table will be updated.
// TODO: refactor to reduce the number of parameters
func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool) (err error) {
func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram,
cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer, source)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -1888,11 +1882,11 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isI
}

// SaveMetaToStorage will save stats_meta to storage.
func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error) {
func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer, source)
}
}()
h.mu.Lock()
Expand Down Expand Up @@ -2098,7 +2092,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats)
}
}()
slices.Sort(colIDs)
Expand Down Expand Up @@ -2169,7 +2163,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats)
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down Expand Up @@ -2382,7 +2376,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(tableID, statsVer)
h.recordHistoricalStatsMeta(tableID, statsVer, StatsMetaHistorySourceExtendedStats)
}
}()
if extStats == nil || len(extStats.Stats) == 0 {
Expand Down Expand Up @@ -2643,56 +2637,6 @@ func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) {
return checkHistoricalStatsEnable(h.mu.ctx)
}

func recordHistoricalStatsMeta(sctx sessionctx.Context, tableID int64, version uint64) error {
if tableID == 0 || version == 0 {
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
}
historicalStatsEnabled, err := checkHistoricalStatsEnable(sctx)
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
}
if !historicalStatsEnabled {
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
exec := sctx.(sqlexec.SQLExecutor)
rexec := sctx.(sqlexec.RestrictedSQLExecutor)
rows, _, err := rexec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version)
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.New("no historical meta stats can be recorded")
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
}()

const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, create_time) VALUES (%?, %?, %?, %?, NOW())"
if _, err := exec.ExecuteInternal(ctx, sql, tableID, modifyCount, count, version); err != nil {
return errors.Trace(err)
}
return nil
}

func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) {
h.mu.Lock()
defer h.mu.Unlock()
err := recordHistoricalStatsMeta(h.mu.ctx, tableID, version)
if err != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.Error(err))
}
}

// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
func (h *Handle) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error {
h.mu.Lock()
Expand Down
Loading

0 comments on commit 388e008

Please sign in to comment.