Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: statistics: add the analysis worker #55621

1 change: 1 addition & 0 deletions pkg/planner/core/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher.(*worker).run"),
}

callback := func(i int) int {
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/autoanalyze/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"//pkg/util/timeutil",
Expand Down
47 changes: 25 additions & 22 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/timeutil"
Expand All @@ -53,14 +54,21 @@ type statsAnalyze struct {
statsHandle statstypes.StatsHandle
// sysProcTracker is used to track sys process like analyze
sysProcTracker sysproctrack.Tracker
refresher *refresher.Refresher
}

// NewStatsAnalyze creates a new StatsAnalyze.
func NewStatsAnalyze(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) statstypes.StatsAnalyze {
return &statsAnalyze{statsHandle: statsHandle, sysProcTracker: sysProcTracker}
// TODO: only create the refresher when auto-analyze and priority queue are enabled.
r := refresher.NewRefresher(statsHandle, sysProcTracker)
return &statsAnalyze{
statsHandle: statsHandle,
sysProcTracker: sysProcTracker,
refresher: r,
}
}

// InsertAnalyzeJob inserts the analyze job to the storage.
Expand Down Expand Up @@ -269,7 +277,7 @@ func CleanupCorruptedAnalyzeJobsOnDeadInstances(
// It also analyzes newly created tables and newly added indexes.
func (sa *statsAnalyze) HandleAutoAnalyze() (analyzed bool) {
_ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error {
analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, sa.sysProcTracker)
analyzed = sa.handleAutoAnalyze(sctx)
return nil
})
return
Expand All @@ -291,29 +299,19 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// HandleAutoAnalyze analyzes the newly created table or index.
func HandleAutoAnalyze(
sctx sessionctx.Context,
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) (analyzed bool) {
defer func() {
if r := recover(); r != nil {
statslogutil.StatsLogger().Error(
"HandleAutoAnalyze panicked",
zap.Any("recover", r),
zap.Stack("stack"),
)
}
}()
func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
if variable.EnableAutoAnalyzePriorityQueue.Load() {
r := refresher.NewRefresher(statsHandle, sysProcTracker)
err := r.RebuildTableAnalysisJobQueue()
err := sa.refresher.RebuildTableAnalysisJobQueue()
if err != nil {
statslogutil.StatsLogger().Error("rebuild table analysis job queue failed", zap.Error(err))
return false
}
return r.AnalyzeHighestPriorityTables()
analyzed := sa.refresher.AnalyzeHighestPriorityTables()
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
}
return analyzed
}

parameters := exec.GetAutoAnalyzeParameters(sctx)
Expand All @@ -327,15 +325,20 @@ func HandleAutoAnalyze(

return RandomPickOneTableAndTryAutoAnalyze(
sctx,
statsHandle,
sysProcTracker,
sa.statsHandle,
sa.sysProcTracker,
autoAnalyzeRatio,
pruneMode,
start,
end,
)
}

// Close closes the auto-analyze worker.
func (sa *statsAnalyze) Close() {
sa.refresher.Close()
}

// CheckAutoAnalyzeWindow determine the time window for auto-analysis and verify if the current time falls within this range.
// parameters is a map of auto analyze parameters. it is from GetAutoAnalyzeParameters.
func CheckAutoAnalyzeWindow(sctx sessionctx.Context) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (j *DynamicPartitionedTableAnalysisJob) SetWeight(weight float64) {
j.Weight = weight
}

// GetTableID gets the table ID of the job.
// Because it is partitioned table, so we need to use the global table ID.
func (j *DynamicPartitionedTableAnalysisJob) GetTableID() int64 {
return j.GlobalTableID
}

// GetWeight gets the weight of the job.
func (j *DynamicPartitionedTableAnalysisJob) GetWeight() float64 {
return j.Weight
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type AnalysisJob interface {
// GetIndicators gets the indicators of the job.
GetIndicators() Indicators

// GetTableID gets the table ID of the job.
GetTableID() int64

fmt.Stringer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (j *NonPartitionedTableAnalysisJob) GetIndicators() Indicators {
return j.Indicators
}

// GetTableID gets the table ID of the job.
func (j *NonPartitionedTableAnalysisJob) GetTableID() int64 {
return j.TableID
}

// String implements fmt.Stringer interface.
func (j *NonPartitionedTableAnalysisJob) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (j *StaticPartitionedTableAnalysisJob) GetIndicators() Indicators {
return j.Indicators
}

// GetTableID gets the table ID of the job.
func (j *StaticPartitionedTableAnalysisJob) GetTableID() int64 {
return j.StaticPartitionID
}

// HasNewlyAddedIndex implements AnalysisJob.
func (j *StaticPartitionedTableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.Indexes) > 0
Expand Down
5 changes: 4 additions & 1 deletion pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "refresher",
srcs = ["refresher.go"],
srcs = [
"refresher.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher",
visibility = ["//visibility:public"],
deps = [
Expand Down
72 changes: 54 additions & 18 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const (
)

// Refresher provides methods to refresh stats info.
// NOTE: Refresher is not thread-safe.
type Refresher struct {
statsHandle statstypes.StatsHandle
sysProcTracker sysproctrack.Tracker
Expand All @@ -54,22 +53,39 @@ type Refresher struct {
// Jobs is the priority queue of analysis jobs.
// Exported for testing purposes.
Jobs *priorityqueue.AnalysisPriorityQueue

// worker is the worker that runs the analysis jobs.
worker *worker

maxConcurrency int
}

// NewRefresher creates a new Refresher and starts the goroutine.
func NewRefresher(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) *Refresher {
maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
r := &Refresher{
statsHandle: statsHandle,
sysProcTracker: sysProcTracker,
Jobs: priorityqueue.NewAnalysisPriorityQueue(),
worker: newWorker(statsHandle, sysProcTracker, maxConcurrency),
maxConcurrency: maxConcurrency,
}

return r
}

// UpdateConcurrency updates the maximum concurrency for auto-analyze jobs
func (r *Refresher) UpdateConcurrency() {
newConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
if newConcurrency != r.maxConcurrency {
r.maxConcurrency = newConcurrency
r.worker.UpdateConcurrency(newConcurrency)
}
}

// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) {
Expand All @@ -84,14 +100,23 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool {
defer r.statsHandle.SPool().Put(se)

sctx := se.(sessionctx.Context)
var wg util.WaitGroupWrapper
defer wg.Wait()

maxConcurrency := int(variable.AutoAnalyzeConcurrency.Load())
r.UpdateConcurrency()
maxConcurrency := r.maxConcurrency
analyzedCount := 0

for r.Jobs.Len() > 0 && analyzedCount < maxConcurrency {
for r.Jobs.Len() > 0 {
runningJobs := r.worker.GetRunningJobs()
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
if len(runningJobs) >= maxConcurrency {
statslogutil.SingletonStatsSamplerLogger().Info("Maximum concurrency reached, stopping analysis", zap.Int("runningJobs", len(runningJobs)))
break
}

job := r.Jobs.Pop()
if _, isRunning := runningJobs[job.GetTableID()]; isRunning {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
statslogutil.StatsLogger().Debug("Job already running, skipping", zap.Int64("tableID", job.GetTableID()))
continue
}

if valid, failReason := job.IsValidToAnalyze(sctx); !valid {
statslogutil.SingletonStatsSamplerLogger().Info(
"Table not ready for analysis",
Expand All @@ -102,28 +127,39 @@ func (r *Refresher) AnalyzeHighestPriorityTables() bool {
}

statslogutil.StatsLogger().Info("Auto analyze triggered", zap.Stringer("job", job))

wg.Run(func() {
if err := job.Analyze(r.statsHandle, r.sysProcTracker); err != nil {
statslogutil.StatsLogger().Error(
"Auto analyze job execution failed",
zap.Stringer("job", job),
zap.Error(err),
)
}
})

analyzedCount++
if r.worker.SubmitJob(job) {
analyzedCount++
statslogutil.StatsLogger().Debug("Job submitted successfully", zap.Int("analyzedCount", analyzedCount))
} else {
statslogutil.SingletonStatsSamplerLogger().Warn("Failed to submit job", zap.Stringer("job", job))
}
}

if analyzedCount > 0 {
statslogutil.SingletonStatsSamplerLogger().Info("Auto analyze jobs submitted successfully", zap.Int("submittedCount", analyzedCount))
return true
}

statslogutil.SingletonStatsSamplerLogger().Info("No tables to analyze")
return false
}

// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished.
// Only used in the test.
func (r *Refresher) WaitAutoAnalyzeFinishedForTest() {
r.worker.WaitAutoAnalyzeFinishedForTest()
}

// GetRunningJobs returns the currently running jobs.
func (r *Refresher) GetRunningJobs() map[int64]struct{} {
return r.worker.GetRunningJobs()
}

// Close stops all running jobs and releases resources.
func (r *Refresher) Close() {
r.worker.Stop()
}

// RebuildTableAnalysisJobQueue rebuilds the priority queue of analysis jobs.
func (r *Refresher) RebuildTableAnalysisJobQueue() error {
// Reset the priority queue.
Expand Down
18 changes: 18 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
require.Equal(t, 2, r.Jobs.Len())
// Analyze t1 first.
require.True(t, r.AnalyzeHighestPriorityTables())
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
// The table is analyzed.
Expand All @@ -217,6 +220,9 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
require.Equal(t, int64(6), tblStats2.ModifyCount)
// Do one more round.
require.True(t, r.AnalyzeHighestPriorityTables())
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
// t2 is analyzed.
pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID
tblStats2 = handle.GetPartitionStats(tbl2.Meta(), pid2)
Expand Down Expand Up @@ -262,6 +268,9 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
require.Equal(t, 3, r.Jobs.Len())
// Analyze tables concurrently.
require.True(t, r.AnalyzeHighestPriorityTables())
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
// Check if t1 and t2 are analyzed (they should be, as they have more new data).
Expand All @@ -288,6 +297,9 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {

// Do one more round to analyze t3.
require.True(t, r.AnalyzeHighestPriorityTables())
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))

Expand All @@ -314,6 +326,9 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
r.RebuildTableAnalysisJobQueue()
// No jobs in the queue.
r.AnalyzeHighestPriorityTables()
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
// The table is not analyzed.
is := dom.InfoSchema()
tbl1, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
Expand Down Expand Up @@ -350,6 +365,9 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
insertFailedJobForPartitionWithStartTime(tk, "test", "t1", "p0", startTime)

r.AnalyzeHighestPriorityTables()
require.Eventually(t, func() bool {
return len(r.GetRunningJobs()) == 0
}, 10*time.Second, 100*time.Millisecond)
// t2 is analyzed.
pid2 := tbl2.Meta().GetPartitionInfo().Definitions[0].ID
tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2)
Expand Down
Loading