Skip to content

Commit

Permalink
executor: support save partition stats in concurrency (#38239)
Browse files Browse the repository at this point in the history
ref #35142
  • Loading branch information
Yisaer authored Oct 25, 2022
1 parent 06ed0e6 commit 70f3348
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 65 deletions.
37 changes: 20 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,16 @@ type Performance struct {
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
MemProfileInterval string `toml:"-" json:"-"`

IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down Expand Up @@ -905,15 +907,16 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
AnalyzePartitionConcurrencyQuota: 16,
EnableStatsCacheMemQuota: false,
RunAutoAnalyze: true,
EnableLoadFMSketch: false,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down
45 changes: 45 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ type Domain struct {
sysProcesses SysProcesses

mdlCheckTableInfo *mdlCheckTableInfo

analyzeMu struct {
sync.Mutex
sctxs map[sessionctx.Context]bool
}
}

type mdlCheckTableInfo struct {
Expand Down Expand Up @@ -1575,6 +1580,46 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// ReleaseAnalyzeExec returned extra exec for Analyze
func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// FetchAnalyzeExec get needed exec for analyze
func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context {
if need < 1 {
return nil
}
count := 0
r := make([]sessionctx.Context, 0)
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for sctx, used := range do.analyzeMu.sctxs {
if used {
continue
}
r = append(r, sctx)
do.analyzeMu.sctxs[sctx] = true
count++
if count >= need {
break
}
}
return r
}

// SetupAnalyzeExec setups exec for Analyze Executor
func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
do.analyzeMu.sctxs = make(map[sessionctx.Context]bool)
for _, ctx := range ctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
Expand Down
124 changes: 97 additions & 27 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ func (e *AnalyzeExec) saveV2AnalyzeOpts() error {
return nil
}

func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {
statsHandle := domain.GetDomain(sctx).StatsHandle()
historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
Expand All @@ -198,7 +198,7 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
return nil
}

is := domain.GetDomain(e.ctx).InfoSchema()
is := domain.GetDomain(sctx).InfoSchema()
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
Expand All @@ -217,6 +217,23 @@ func (e *AnalyzeExec) recordHistoricalStats(tableID int64) error {
// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, needGlobalStats bool,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := e.ctx.GetSessionVars().AnalyzePartitionConcurrency
// If 'partitionStatsConcurrency' > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
if partitionStatsConcurrency > 1 {
dom := domain.GetDomain(e.ctx)
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
if len(subSctxs) > 0 {
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
err := e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
return err
}
}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
var err error
Expand All @@ -235,36 +252,16 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
continue
}
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
if err1 := statsHandle.SaveTableStatsToStorage(results, results.TableID.IsPartitionTable(), e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
handleGlobalStats(needGlobalStats, globalStatsMap, results)

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot); err1 != nil {
err = err1
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := e.recordHistoricalStats(results.TableID.TableID); err != nil {
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand All @@ -273,6 +270,54 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
return err
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, statsConcurrency int, needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults) error {
partitionStatsConcurrency := len(subSctxs)

var wg util.WaitGroupWrapper
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
wg.Run(func() {
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
})
}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
results, ok := <-resultsCh
if !ok {
break
}
if results.Err != nil {
err = results.Err
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
}
finishJobWithLog(e.ctx, results.Job, err)
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
saveResultsCh <- results
}
close(saveResultsCh)
wg.Wait()
close(errCh)
if len(errCh) > 0 {
errMsg := make([]string, 0)
for err1 := range errCh {
errMsg = append(errMsg, err1.Error())
}
err = errors.New(strings.Join(errMsg, ","))
}
return err
}

func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<- *statistics.AnalyzeResults) {
var task *analyzeTask
defer func() {
Expand Down Expand Up @@ -434,3 +479,28 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
zap.String("cost", job.EndTime.Sub(job.StartTime).String()))
}
}

func handleGlobalStats(needGlobalStats bool, globalStatsMap globalStatsMap, results *statistics.AnalyzeResults) {
if results.TableID.IsPartitionTable() && needGlobalStats {
for _, result := range results.Ars {
if result.IsIndex == 0 {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: int64(-1)}
histIDs := make([]int64, 0, len(result.Hist))
for _, hg := range result.Hist {
// It's normal virtual column, skip.
if hg == nil {
continue
}
histIDs = append(histIDs, hg.ID)
}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: histIDs, statsVersion: results.StatsVer}
} else {
for _, hg := range result.Hist {
globalStatsID := globalStatsKey{tableID: results.TableID.TableID, indexID: hg.ID}
globalStatsMap[globalStatsID] = globalStatsInfo{isIndex: result.IsIndex, histIDs: []int64{hg.ID}, statsVersion: results.StatsVer}
}
}
}
}
}
2 changes: 1 addition & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
// Dump stats to historical storage.
if err := e.recordHistoricalStats(globalStatsID.tableID); err != nil {
if err := recordHistoricalStats(e.ctx, globalStatsID.tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
Expand Down
3 changes: 3 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec(fmt.Sprintf("set @@tidb_analyze_partition_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_analyze_partition_concurrency").Check(testkit.Rows(concurrency))

tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}
Expand Down
70 changes: 70 additions & 0 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type analyzeSaveStatsWorker struct {
resultsCh <-chan *statistics.AnalyzeResults
sctx sessionctx.Context
errCh chan<- error
}

func newAnalyzeSaveStatsWorker(
resultsCh <-chan *statistics.AnalyzeResults,
sctx sessionctx.Context,
errCh chan<- error) *analyzeSaveStatsWorker {
worker := &analyzeSaveStatsWorker{
resultsCh: resultsCh,
sctx: sctx,
errCh: errCh,
}
return worker
}

func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot bool) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("analyze save stats worker panicked", zap.Any("recover", r), zap.Stack("stack"))
worker.errCh <- getAnalyzePanicErr(r)
}
}()
for results := range worker.resultsCh {
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
return
}
}
}
9 changes: 7 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2849,8 +2849,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
runInBootstrapSession(store, upgrade)
}

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency)
ses, err := createSessions(store, 7+concurrency)
ses, err := createSessions(store, 7+concurrency+analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2933,7 +2934,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
return nil, err
}

subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = ses[7+concurrency+i]
}
dom.SetupAnalyzeExec(subCtxs2)
dom.DumpFileGcCheckerLoop()
dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey)

Expand Down
Loading

0 comments on commit 70f3348

Please sign in to comment.