Skip to content

Commit

Permalink
statistics: use analysis worker to run analysis jobs (#55848)
Browse files Browse the repository at this point in the history
close #55618
  • Loading branch information
Rustin170506 authored Sep 9, 2024
1 parent 47bbca3 commit 95527cd
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 31 deletions.
1 change: 1 addition & 0 deletions pkg/planner/core/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher.(*worker).run"),
}

callback := func(i int) int {
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/autoanalyze/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"//pkg/util/timeutil",
Expand Down
46 changes: 31 additions & 15 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/timeutil"
Expand All @@ -54,14 +55,24 @@ type statsAnalyze struct {
statsHandle statstypes.StatsHandle
// sysProcTracker is used to track sys process like analyze
sysProcTracker sysproctrack.Tracker
// refresher is used to refresh the analyze job queue and analyze the highest priority tables.
// It is only used when auto-analyze priority queue is enabled.
refresher *refresher.Refresher
}

// NewStatsAnalyze creates a new StatsAnalyze.
func NewStatsAnalyze(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) statstypes.StatsAnalyze {
return &statsAnalyze{statsHandle: statsHandle, sysProcTracker: sysProcTracker}
// Usually, we should only create the refresher when auto-analyze priority queue is enabled.
// But to allow users to enable auto-analyze priority queue on the fly, we need to create the refresher here.
r := refresher.NewRefresher(statsHandle, sysProcTracker)
return &statsAnalyze{
statsHandle: statsHandle,
sysProcTracker: sysProcTracker,
refresher: r,
}
}

// InsertAnalyzeJob inserts the analyze job to the storage.
Expand Down Expand Up @@ -269,10 +280,16 @@ func CleanupCorruptedAnalyzeJobsOnDeadInstances(
// HandleAutoAnalyze analyzes the outdated tables. (The change percent of the table exceeds the threshold)
// It also analyzes newly created tables and newly added indexes.
func (sa *statsAnalyze) HandleAutoAnalyze() (analyzed bool) {
_ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, sa.sysProcTracker)
if err := statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
analyzed = sa.handleAutoAnalyze(sctx)
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
}
return nil
})
}); err != nil {
statslogutil.StatsLogger().Error("Failed to handle auto analyze", zap.Error(err))
}
return
}

Expand All @@ -292,12 +309,7 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// HandleAutoAnalyze analyzes the newly created table or index.
func HandleAutoAnalyze(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) (analyzed bool) {
func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
defer func() {
if r := recover(); r != nil {
statslogutil.StatsLogger().Error(
Expand All @@ -308,13 +320,12 @@ func HandleAutoAnalyze(
}
}()
if variable.EnableAutoAnalyzePriorityQueue.Load() {
r := refresher.NewRefresher(statsHandle, sysProcTracker)
err := r.RebuildTableAnalysisJobQueue()
err := sa.refresher.RebuildTableAnalysisJobQueue()
if err != nil {
statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err))
return false
}
return r.AnalyzeHighestPriorityTables()
return sa.refresher.AnalyzeHighestPriorityTables()
}

parameters := exec.GetAutoAnalyzeParameters(sctx)
Expand All @@ -328,15 +339,20 @@ func HandleAutoAnalyze(

return RandomPickOneTableAndTryAutoAnalyze(
sctx,
statsHandle,
sysProcTracker,
sa.statsHandle,
sa.sysProcTracker,
autoAnalyzeRatio,
pruneMode,
start,
end,
)
}

// Close closes the auto-analyze worker.
func (sa *statsAnalyze) Close() {
sa.refresher.Close()
}

// CheckAutoAnalyzeWindow determine the time window for auto-analysis and verify if the current time falls within this range.
// parameters is a map of auto analyze parameters. it is from GetAutoAnalyzeParameters.
func CheckAutoAnalyzeWindow(sctx sessionctx.Context) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
Expand Down
83 changes: 67 additions & 16 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -54,22 +55,33 @@ type Refresher struct {
// Jobs is the priority queue of analysis jobs.
// Exported for testing purposes.
Jobs *priorityqueue.AnalysisPriorityQueue

// worker is the worker that runs the analysis jobs.
worker *worker
}

// NewRefresher creates a new Refresher and starts the goroutine.
func NewRefresher(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) *Refresher {
maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
r := &Refresher{
statsHandle: statsHandle,
sysProcTracker: sysProcTracker,
Jobs: priorityqueue.NewAnalysisPriorityQueue(),
worker: NewWorker(statsHandle, sysProcTracker, maxConcurrency),
}

return r
}

// UpdateConcurrency updates the maximum concurrency for auto-analyze jobs
func (r *Refresher) UpdateConcurrency() {
newConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
r.worker.UpdateConcurrency(newConcurrency)
}

// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) {
Expand All @@ -84,14 +96,24 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool {
defer r.statsHandle.SPool().Put(se)

sctx := se.(sessionctx.Context)
var wg util.WaitGroupWrapper
defer wg.Wait()
// Update the concurrency to the latest value.
r.UpdateConcurrency()
// Check remaining concurrency.
maxConcurrency := r.worker.GetMaxConcurrency()
currentRunningJobs := r.worker.GetRunningJobs()
remainConcurrency := maxConcurrency - len(currentRunningJobs)
if remainConcurrency <= 0 {
statslogutil.SingletonStatsSamplerLogger().Info("No concurrency available")
return false
}

maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
analyzedCount := 0

for r.Jobs.Len() > 0 && analyzedCount < maxConcurrency {
for r.Jobs.Len() > 0 && analyzedCount < remainConcurrency {
job := r.Jobs.Pop()
if _, isRunning := currentRunningJobs[job.GetTableID()]; isRunning {
statslogutil.StatsLogger().Debug("Job already running, skipping", zap.Int64("tableID", job.GetTableID()))
continue
}
if valid, failReason := job.IsValidToAnalyze(sctx); !valid {
statslogutil.SingletonStatsSamplerLogger().Info(
"Table not ready for analysis",
Expand All @@ -103,20 +125,32 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool {

statslogutil.StatsLogger().Info("Auto analyze triggered", zap.Stringer("job", job))

wg.Run(func() {
if err := job.Analyze(r.statsHandle, r.sysProcTracker); err != nil {
statslogutil.StatsLogger().Error(
"Auto analyze job execution failed",
zap.Stringer("job", job),
zap.Error(err),
)
}
})

analyzedCount++
submitted := r.worker.SubmitJob(job)
intest.Assert(submitted, "Failed to submit job unexpectedly. "+
"This should not occur as the concurrency limit was checked prior to job submission. "+
"Please investigate potential race conditions or inconsistencies in the concurrency management logic.")
if submitted {
statslogutil.StatsLogger().Debug("Job submitted successfully",
zap.Stringer("job", job),
zap.Int("remainConcurrency", remainConcurrency),
zap.Int("currentRunningJobs", len(currentRunningJobs)),
zap.Int("maxConcurrency", maxConcurrency),
zap.Int("analyzedCount", analyzedCount),
)
analyzedCount++
} else {
statslogutil.StatsLogger().Warn("Failed to submit job",
zap.Stringer("job", job),
zap.Int("remainConcurrency", remainConcurrency),
zap.Int("currentRunningJobs", len(currentRunningJobs)),
zap.Int("maxConcurrency", maxConcurrency),
zap.Int("analyzedCount", analyzedCount),
)
}
}

if analyzedCount > 0 {
statslogutil.StatsLogger().Debug("Auto analyze jobs submitted successfully", zap.Int("submittedCount", analyzedCount))
return true
}

Expand Down Expand Up @@ -279,6 +313,23 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
return nil
}

// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished.
// Only used in the test.
func (r *Refresher) WaitAutoAnalyzeFinishedForTest() {
r.worker.WaitAutoAnalyzeFinishedForTest()
}

// GetRunningJobs returns the currently running jobs.
// Only used in the test.
func (r *Refresher) GetRunningJobs() map[int64]struct{} {
return r.worker.GetRunningJobs()
}

// Close stops all running jobs and releases resources.
func (r *Refresher) Close() {
r.worker.Stop()
}

// CreateTableAnalysisJob creates a TableAnalysisJob for the physical table.
func CreateTableAnalysisJob(
sctx sessionctx.Context,
Expand Down
6 changes: 6 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
require.Equal(t, 2, r.Jobs.Len())
// Analyze t1 first.
require.True(t, r.AnalyzeHighestPriorityTables())
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
// The table is analyzed.
Expand All @@ -218,6 +219,7 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
require.Equal(t, int64(6), tblStats2.ModifyCount)
// Do one more round.
require.True(t, r.AnalyzeHighestPriorityTables())
r.WaitAutoAnalyzeFinishedForTest()
// t2 is analyzed.
pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID
tblStats2 = handle.GetPartitionStats(tbl2.Meta(), pid2)
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
require.Equal(t, 3, r.Jobs.Len())
// Analyze tables concurrently.
require.True(t, r.AnalyzeHighestPriorityTables())
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
// Check if t1 and t2 are analyzed (they should be, as they have more new data).
Expand All @@ -289,6 +292,7 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {

// Do one more round to analyze t3.
require.True(t, r.AnalyzeHighestPriorityTables())
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))

Expand All @@ -315,6 +319,7 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
r.RebuildTableAnalysisJobQueue()
// No jobs in the queue.
r.AnalyzeHighestPriorityTables()
r.WaitAutoAnalyzeFinishedForTest()
// The table is not analyzed.
is := dom.InfoSchema()
tbl1, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
Expand Down Expand Up @@ -351,6 +356,7 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
insertFailedJobForPartitionWithStartTime(tk, "test", "t1", "p0", startTime)

r.AnalyzeHighestPriorityTables()
r.WaitAutoAnalyzeFinishedForTest()
// t2 is analyzed.
pid2 := tbl2.Meta().GetPartitionInfo().Definitions[0].ID
tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2)
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func NewWorker(statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.T
func (w *worker) UpdateConcurrency(newConcurrency int) {
w.mu.Lock()
defer w.mu.Unlock()
if newConcurrency == w.maxConcurrency {
return
}
statslogutil.StatsLogger().Info(
"Update concurrency",
zap.Int("newConcurrency", newConcurrency),
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,5 @@ func (h *Handle) Close() {
h.Pool.Close()
h.StatsCache.Close()
h.StatsUsage.Close()
h.StatsAnalyze.Close()
}
3 changes: 3 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ type StatsAnalyze interface {

// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool

// Close closes the analyze worker.
Close()
}

// StatsCache is used to manage all table statistics in memory.
Expand Down

0 comments on commit 95527cd

Please sign in to comment.