diff --git a/pkg/planner/core/main_test.go b/pkg/planner/core/main_test.go index 63e00341f17d2..e8d8f2c56380d 100644 --- a/pkg/planner/core/main_test.go +++ b/pkg/planner/core/main_test.go @@ -50,6 +50,7 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher.(*worker).run"), } callback := func(i int) int { diff --git a/pkg/statistics/handle/autoanalyze/BUILD.bazel b/pkg/statistics/handle/autoanalyze/BUILD.bazel index ebc779a9dd220..51788ab2fdd62 100644 --- a/pkg/statistics/handle/autoanalyze/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/statistics/handle/util", "//pkg/types", "//pkg/util", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/sqlescape", "//pkg/util/timeutil", diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 4c88277bc74f6..3cc3b084bdf9e 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -42,6 +42,7 @@ import ( statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tidb/pkg/util/timeutil" @@ -54,6 +55,9 @@ type statsAnalyze struct { statsHandle statstypes.StatsHandle // sysProcTracker is used to track sys process like analyze sysProcTracker sysproctrack.Tracker + // refresher is used to refresh the analyze job queue and analyze the highest priority tables. + // It is only used when auto-analyze priority queue is enabled. + refresher *refresher.Refresher } // NewStatsAnalyze creates a new StatsAnalyze. @@ -61,7 +65,14 @@ func NewStatsAnalyze( statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.Tracker, ) statstypes.StatsAnalyze { - return &statsAnalyze{statsHandle: statsHandle, sysProcTracker: sysProcTracker} + // Usually, we should only create the refresher when auto-analyze priority queue is enabled. + // But to allow users to enable auto-analyze priority queue on the fly, we need to create the refresher here. + r := refresher.NewRefresher(statsHandle, sysProcTracker) + return &statsAnalyze{ + statsHandle: statsHandle, + sysProcTracker: sysProcTracker, + refresher: r, + } } // InsertAnalyzeJob inserts the analyze job to the storage. @@ -269,10 +280,16 @@ func CleanupCorruptedAnalyzeJobsOnDeadInstances( // HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold) // It also analyzes newly created tables and newly added indexes. func (sa *statsAnalyze) HandleAutoAnalyze() (analyzed bool) { - _ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { - analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, sa.sysProcTracker) + if err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + analyzed = sa.handleAutoAnalyze(sctx) + // During the test, we need to wait for the auto analyze job to be finished. + if intest.InTest { + sa.refresher.WaitAutoAnalyzeFinishedForTest() + } return nil - }) + }); err != nil { + statslogutil.StatsLogger().Error("Failed to handle auto analyze", zap.Error(err)) + } return } @@ -292,12 +309,7 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID return statistics.CheckAnalyzeVerOnTable(tbl, version) } -// HandleAutoAnalyze analyzes the newly created table or index. -func HandleAutoAnalyze( - sctx sessionctx.Context, - statsHandle statstypes.StatsHandle, - sysProcTracker sysproctrack.Tracker, -) (analyzed bool) { +func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool { defer func() { if r := recover(); r != nil { statslogutil.StatsLogger().Error( @@ -308,13 +320,12 @@ func HandleAutoAnalyze( } }() if variable.EnableAutoAnalyzePriorityQueue.Load() { - r := refresher.NewRefresher(statsHandle, sysProcTracker) - err := r.RebuildTableAnalysisJobQueue() + err := sa.refresher.RebuildTableAnalysisJobQueue() if err != nil { statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err)) return false } - return r.AnalyzeHighestPriorityTables() + return sa.refresher.AnalyzeHighestPriorityTables() } parameters := exec.GetAutoAnalyzeParameters(sctx) @@ -328,8 +339,8 @@ func HandleAutoAnalyze( return RandomPickOneTableAndTryAutoAnalyze( sctx, - statsHandle, - sysProcTracker, + sa.statsHandle, + sa.sysProcTracker, autoAnalyzeRatio, pruneMode, start, @@ -337,6 +348,11 @@ func HandleAutoAnalyze( ) } +// Close closes the auto-analyze worker. +func (sa *statsAnalyze) Close() { + sa.refresher.Close() +} + // CheckAutoAnalyzeWindow determine the time window for auto-analysis and verify if the current time falls within this range. // parameters is a map of auto analyze parameters. it is from GetAutoAnalyzeParameters. func CheckAutoAnalyzeWindow(sctx sessionctx.Context) bool { diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index 5bca864e644fb..1274b67db2a4c 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/util", + "//pkg/util/intest", "//pkg/util/timeutil", "@com_github_tikv_client_go_v2//oracle", "@org_uber_go_zap//:zap", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index eeba01d57bfc0..4372ab0254bcd 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -31,6 +31,7 @@ import ( statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -54,6 +55,9 @@ type Refresher struct { // Jobs is the priority queue of analysis jobs. // Exported for testing purposes. Jobs *priorityqueue.AnalysisPriorityQueue + + // worker is the worker that runs the analysis jobs. + worker *worker } // NewRefresher creates a new Refresher and starts the goroutine. @@ -61,15 +65,23 @@ func NewRefresher( statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.Tracker, ) *Refresher { + maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load()) r := &Refresher{ statsHandle: statsHandle, sysProcTracker: sysProcTracker, Jobs: priorityqueue.NewAnalysisPriorityQueue(), + worker: NewWorker(statsHandle, sysProcTracker, maxConcurrency), } return r } +// UpdateConcurrency updates the maximum concurrency for auto-analyze jobs +func (r *Refresher) UpdateConcurrency() { + newConcurrency := int(variable.AutoAnalyzeConcurrency.Load()) + r.worker.UpdateConcurrency(newConcurrency) +} + // AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them. func (r *Refresher) AnalyzeHighestPriorityTables() bool { if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) { @@ -84,14 +96,24 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool { defer r.statsHandle.SPool().Put(se) sctx := se.(sessionctx.Context) - var wg util.WaitGroupWrapper - defer wg.Wait() + // Update the concurrency to the latest value. + r.UpdateConcurrency() + // Check remaining concurrency. + maxConcurrency := r.worker.GetMaxConcurrency() + currentRunningJobs := r.worker.GetRunningJobs() + remainConcurrency := maxConcurrency - len(currentRunningJobs) + if remainConcurrency <= 0 { + statslogutil.SingletonStatsSamplerLogger().Info("No concurrency available") + return false + } - maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load()) analyzedCount := 0 - - for r.Jobs.Len() > 0 && analyzedCount < maxConcurrency { + for r.Jobs.Len() > 0 && analyzedCount < remainConcurrency { job := r.Jobs.Pop() + if _, isRunning := currentRunningJobs[job.GetTableID()]; isRunning { + statslogutil.StatsLogger().Debug("Job already running, skipping", zap.Int64("tableID", job.GetTableID())) + continue + } if valid, failReason := job.IsValidToAnalyze(sctx); !valid { statslogutil.SingletonStatsSamplerLogger().Info( "Table not ready for analysis", @@ -103,20 +125,32 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool { statslogutil.StatsLogger().Info("Auto analyze triggered", zap.Stringer("job", job)) - wg.Run(func() { - if err := job.Analyze(r.statsHandle, r.sysProcTracker); err != nil { - statslogutil.StatsLogger().Error( - "Auto analyze job execution failed", - zap.Stringer("job", job), - zap.Error(err), - ) - } - }) - - analyzedCount++ + submitted := r.worker.SubmitJob(job) + intest.Assert(submitted, "Failed to submit job unexpectedly. "+ + "This should not occur as the concurrency limit was checked prior to job submission. "+ + "Please investigate potential race conditions or inconsistencies in the concurrency management logic.") + if submitted { + statslogutil.StatsLogger().Debug("Job submitted successfully", + zap.Stringer("job", job), + zap.Int("remainConcurrency", remainConcurrency), + zap.Int("currentRunningJobs", len(currentRunningJobs)), + zap.Int("maxConcurrency", maxConcurrency), + zap.Int("analyzedCount", analyzedCount), + ) + analyzedCount++ + } else { + statslogutil.StatsLogger().Warn("Failed to submit job", + zap.Stringer("job", job), + zap.Int("remainConcurrency", remainConcurrency), + zap.Int("currentRunningJobs", len(currentRunningJobs)), + zap.Int("maxConcurrency", maxConcurrency), + zap.Int("analyzedCount", analyzedCount), + ) + } } if analyzedCount > 0 { + statslogutil.StatsLogger().Debug("Auto analyze jobs submitted successfully", zap.Int("submittedCount", analyzedCount)) return true } @@ -279,6 +313,23 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error { return nil } +// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished. +// Only used in the test. +func (r *Refresher) WaitAutoAnalyzeFinishedForTest() { + r.worker.WaitAutoAnalyzeFinishedForTest() +} + +// GetRunningJobs returns the currently running jobs. +// Only used in the test. +func (r *Refresher) GetRunningJobs() map[int64]struct{} { + return r.worker.GetRunningJobs() +} + +// Close stops all running jobs and releases resources. +func (r *Refresher) Close() { + r.worker.Stop() +} + // CreateTableAnalysisJob creates a TableAnalysisJob for the physical table. func CreateTableAnalysisJob( sctx sessionctx.Context, diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index aee766fd47450..4ac0971081b3c 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -201,6 +201,7 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) { require.Equal(t, 2, r.Jobs.Len()) // Analyze t1 first. require.True(t, r.AnalyzeHighestPriorityTables()) + r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) // The table is analyzed. @@ -218,6 +219,7 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) { require.Equal(t, int64(6), tblStats2.ModifyCount) // Do one more round. require.True(t, r.AnalyzeHighestPriorityTables()) + r.WaitAutoAnalyzeFinishedForTest() // t2 is analyzed. pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID tblStats2 = handle.GetPartitionStats(tbl2.Meta(), pid2) @@ -263,6 +265,7 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) { require.Equal(t, 3, r.Jobs.Len()) // Analyze tables concurrently. require.True(t, r.AnalyzeHighestPriorityTables()) + r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) // Check if t1 and t2 are analyzed (they should be, as they have more new data). @@ -289,6 +292,7 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) { // Do one more round to analyze t3. require.True(t, r.AnalyzeHighestPriorityTables()) + r.WaitAutoAnalyzeFinishedForTest() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) @@ -315,6 +319,7 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) { r.RebuildTableAnalysisJobQueue() // No jobs in the queue. r.AnalyzeHighestPriorityTables() + r.WaitAutoAnalyzeFinishedForTest() // The table is not analyzed. is := dom.InfoSchema() tbl1, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) @@ -351,6 +356,7 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) { insertFailedJobForPartitionWithStartTime(tk, "test", "t1", "p0", startTime) r.AnalyzeHighestPriorityTables() + r.WaitAutoAnalyzeFinishedForTest() // t2 is analyzed. pid2 := tbl2.Meta().GetPartitionInfo().Definitions[0].ID tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2) diff --git a/pkg/statistics/handle/autoanalyze/refresher/worker.go b/pkg/statistics/handle/autoanalyze/refresher/worker.go index cf735dda6574f..bed47729c7c3e 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/worker.go +++ b/pkg/statistics/handle/autoanalyze/refresher/worker.go @@ -56,6 +56,9 @@ func NewWorker(statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.T func (w *worker) UpdateConcurrency(newConcurrency int) { w.mu.Lock() defer w.mu.Unlock() + if newConcurrency == w.maxConcurrency { + return + } statslogutil.StatsLogger().Info( "Update concurrency", zap.Int("newConcurrency", newConcurrency), diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 3350d6b983dec..c72802acfab93 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -213,4 +213,5 @@ func (h *Handle) Close() { h.Pool.Close() h.StatsCache.Close() h.StatsUsage.Close() + h.StatsAnalyze.Close() } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 9b8aea83c6ba7..a98f8b7aba78e 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -156,6 +156,9 @@ type StatsAnalyze interface { // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool + + // Close closes the analyze worker. + Close() } // StatsCache is used to manage all table statistics in memory.