Skip to content

Commit

Permalink
statistics: add the analysis worker (#55775)
Browse files Browse the repository at this point in the history
ref #55618
  • Loading branch information
Rustin170506 authored Sep 4, 2024
1 parent 5ffe4b1 commit 638c05f
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func NewDynamicPartitionedTableAnalysisJob(
}
}

// GetTableID gets the table ID of the job.
func (j *DynamicPartitionedTableAnalysisJob) GetTableID() int64 {
return j.GlobalTableID
}

// Analyze analyzes the partitions or partition indexes.
func (j *DynamicPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type AnalysisJob interface {
// GetIndicators gets the indicators of the job.
GetIndicators() Indicators

// GetTableID gets the table ID of the job.
GetTableID() int64

fmt.Stringer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func NewNonPartitionedTableAnalysisJob(
}
}

// GetTableID gets the table ID of the job.
func (j *NonPartitionedTableAnalysisJob) GetTableID() int64 {
return j.TableID
}

// Analyze analyzes the table or indexes.
func (j *NonPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ func NewStaticPartitionTableAnalysisJob(
}
}

// GetTableID gets the table ID of the job.
func (j *StaticPartitionedTableAnalysisJob) GetTableID() int64 {
// Because we only analyze the specified static partition, the table ID is the static partition ID.
return j.StaticPartitionID
}

// Analyze analyzes the specified static partition or indexes.
func (j *StaticPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
Expand Down
18 changes: 15 additions & 3 deletions pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "refresher",
srcs = ["refresher.go"],
srcs = [
"refresher.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -28,17 +31,26 @@ go_library(
go_test(
name = "refresher_test",
timeout = "short",
srcs = ["refresher_test.go"],
srcs = [
"main_test.go",
"refresher_test.go",
"worker_test.go",
],
flaky = True,
shard_count = 14,
shard_count = 15,
deps = [
":refresher",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/priorityqueue",
"//pkg/statistics/handle/types",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
],
)
34 changes: 34 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package refresher_test

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/bazelbuild/rules_go/go/tools/bzltestutil.RegisterTimeoutHandler.func1"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
}
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m, opts...)
}
149 changes: 149 additions & 0 deletions pkg/statistics/handle/autoanalyze/refresher/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package refresher

import (
"sync"
"time"

"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util"
"go.uber.org/zap"
)

// worker manages the execution of analysis jobs.
// Fields are ordered to represent the mutex protection clearly.
//
//nolint:fieldalignment
type worker struct {
statsHandle statstypes.StatsHandle
sysProcTracker sysproctrack.Tracker
wg util.WaitGroupWrapper

mu sync.Mutex
// mu is used to protect the following fields.
runningJobs map[int64]struct{}
maxConcurrency int
}

// NewWorker creates a new worker.
func NewWorker(statsHandle statstypes.StatsHandle, sysProcTracker sysproctrack.Tracker, maxConcurrency int) *worker {
w := &worker{
statsHandle: statsHandle,
sysProcTracker: sysProcTracker,
runningJobs: make(map[int64]struct{}),
maxConcurrency: maxConcurrency,
}
return w
}

// UpdateConcurrency updates the maximum concurrency for the worker
func (w *worker) UpdateConcurrency(newConcurrency int) {
w.mu.Lock()
defer w.mu.Unlock()
statslogutil.StatsLogger().Info(
"Update concurrency",
zap.Int("newConcurrency", newConcurrency),
zap.Int("oldConcurrency", w.maxConcurrency),
)
w.maxConcurrency = newConcurrency
}

// SubmitJob submits a job to the worker.
// It returns false if the job is not submitted due to concurrency limit.
func (w *worker) SubmitJob(job priorityqueue.AnalysisJob) bool {
w.mu.Lock()
defer w.mu.Unlock()
if len(w.runningJobs) >= w.maxConcurrency {
statslogutil.StatsLogger().Warn("Worker at maximum capacity, job discarded", zap.Stringer("job", job))
return false
}
w.runningJobs[job.GetTableID()] = struct{}{}

w.wg.RunWithRecover(
func() {
w.processJob(job)
},
func(r any) {
if r != nil {
statslogutil.StatsLogger().Error("Auto analyze job execution failed", zap.Any("recover", r), zap.Stack("stack"))
}
},
)
statslogutil.StatsLogger().Info("Job submitted", zap.Stringer("job", job))
return true
}

func (w *worker) processJob(job priorityqueue.AnalysisJob) {
defer func() {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.runningJobs, job.GetTableID())
}()

if err := job.Analyze(w.statsHandle, w.sysProcTracker); err != nil {
statslogutil.StatsLogger().Error(
"Auto analyze job execution failed",
zap.Stringer("job", job),
zap.Error(err),
)
}
}

// GetRunningJobs returns the running jobs.
func (w *worker) GetRunningJobs() map[int64]struct{} {
w.mu.Lock()
defer w.mu.Unlock()
runningJobs := make(map[int64]struct{}, len(w.runningJobs))
for id := range w.runningJobs {
runningJobs[id] = struct{}{}
}
return runningJobs
}

// GetMaxConcurrency returns the maximum concurrency for the worker.
func (w *worker) GetMaxConcurrency() int {
w.mu.Lock()
defer w.mu.Unlock()
return w.maxConcurrency
}

// Stop stops the worker.
func (w *worker) Stop() {
w.wg.Wait()
}

// WaitAutoAnalyzeFinishedForTest waits for all running auto-analyze jobs to finish.
// Only used for test.
func (w *worker) WaitAutoAnalyzeFinishedForTest() {
done := make(chan struct{})
go func() {
for {
w.mu.Lock()
if len(w.runningJobs) == 0 {
w.mu.Unlock()
close(done)
return
}
w.mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
}()

<-done
}
Loading

0 comments on commit 638c05f

Please sign in to comment.