Skip to content

Commit ff4dcb5

Browse files
authored
Merge branch 'master' into client-go-fe5b35c
2 parents ea71bb6 + 7ba1f0a commit ff4dcb5

File tree

17 files changed

+410
-35
lines changed

17 files changed

+410
-35
lines changed

br/pkg/gluetidb/glue.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ type tidbSession struct {
6161

6262
// GetDomain implements glue.Glue.
6363
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
64+
initStatsSe, err := session.CreateSession(store)
65+
if err != nil {
66+
return nil, errors.Trace(err)
67+
}
6468
se, err := session.CreateSession(store)
6569
if err != nil {
6670
return nil, errors.Trace(err)
@@ -74,7 +78,7 @@ func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
7478
return nil, err
7579
}
7680
// create stats handler for backup and restore.
77-
err = dom.UpdateTableStatsLoop(se)
81+
err = dom.UpdateTableStatsLoop(se, initStatsSe)
7882
if err != nil {
7983
return nil, errors.Trace(err)
8084
}

domain/domain.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -1836,8 +1836,8 @@ func (do *Domain) StatsHandle() *handle.Handle {
18361836
}
18371837

18381838
// CreateStatsHandle is used only for test.
1839-
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error {
1840-
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
1839+
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
1840+
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
18411841
if err != nil {
18421842
return err
18431843
}
@@ -1900,8 +1900,8 @@ func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
19001900
}
19011901

19021902
// LoadAndUpdateStatsLoop loads and updates stats info.
1903-
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
1904-
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
1903+
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx sessionctx.Context) error {
1904+
if err := do.UpdateTableStatsLoop(ctxs[0], initStatsCtx); err != nil {
19051905
return err
19061906
}
19071907
do.StartLoadStatsSubWorkers(ctxs[1:])
@@ -1911,9 +1911,9 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
19111911
// UpdateTableStatsLoop creates a goroutine loads stats info and updates stats info in a loop.
19121912
// It will also start a goroutine to analyze tables automatically.
19131913
// It should be called only once in BootstrapSession.
1914-
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
1914+
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
19151915
ctx.GetSessionVars().InRestrictedSQL = true
1916-
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
1916+
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
19171917
if err != nil {
19181918
return err
19191919
}

planner/core/mock.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,13 @@ func MockContext() sessionctx.Context {
401401
ctx.Store = &mock.Store{
402402
Client: &mock.Client{},
403403
}
404+
initStatsCtx := mock.NewContext()
405+
initStatsCtx.Store = &mock.Store{
406+
Client: &mock.Client{},
407+
}
404408
ctx.GetSessionVars().CurrentDB = "test"
405409
do := domain.NewMockDomain()
406-
if err := do.CreateStatsHandle(ctx); err != nil {
410+
if err := do.CreateStatsHandle(ctx, initStatsCtx); err != nil {
407411
panic(fmt.Sprintf("create mock context panic: %+v", err))
408412
}
409413
domain.BindDomain(ctx, do)

resourcemanager/pooltask/BUILD.bazel

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "pooltask",
5-
srcs = ["task.go"],
5+
srcs = [
6+
"task.go",
7+
"task_manager.go",
8+
],
69
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
710
visibility = ["//visibility:public"],
11+
deps = ["@org_uber_go_atomic//:atomic"],
12+
)
13+
14+
go_test(
15+
name = "pooltask_test",
16+
srcs = ["task_test.go"],
17+
embed = [":pooltask"],
18+
deps = ["@com_github_stretchr_testify//require"],
819
)

resourcemanager/pooltask/task.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pooltask
1616

1717
import (
1818
"sync"
19+
"sync/atomic"
1920
)
2021

2122
// Context is a interface that can be used to create a context.
@@ -31,18 +32,41 @@ func (NilContext) GetContext() any {
3132
return nil
3233
}
3334

34-
// TaskBox is a box which contains all info about pooltask.
35+
const (
36+
// PendingTask is a task waiting to start.
37+
PendingTask int32 = iota
38+
// RunningTask is a task running.
39+
RunningTask
40+
// StopTask is a stop task.
41+
StopTask
42+
)
43+
44+
// TaskBox is a box which contains all info about pool task.
3545
type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct {
3646
constArgs C
3747
contextFunc TF
3848
wg *sync.WaitGroup
3949
task chan Task[T]
4050
resultCh chan U
4151
taskID uint64
52+
status atomic.Int32 // task manager is able to make this task stop, wait or running
53+
}
54+
55+
// GetStatus is to get the status of task.
56+
func (t *TaskBox[T, U, C, CT, TF]) GetStatus() int32 {
57+
return t.status.Load()
58+
}
59+
60+
// SetStatus is to set the status of task.
61+
func (t *TaskBox[T, U, C, CT, TF]) SetStatus(s int32) {
62+
t.status.Store(s)
4263
}
4364

4465
// NewTaskBox is to create a task box for pool.
4566
func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] {
67+
// We still need to do some work after a TaskBox finishes.
68+
// So we need to add 1 to waitgroup. After we finish the work, we need to call TaskBox.Finish()
69+
wg.Add(1)
4670
return TaskBox[T, U, C, CT, TF]{
4771
constArgs: constArgs,
4872
contextFunc: contextFunc,
@@ -54,7 +78,7 @@ func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contex
5478
}
5579

5680
// TaskID is to get the task id.
57-
func (t TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
81+
func (t *TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
5882
return t.taskID
5983
}
6084

@@ -83,6 +107,11 @@ func (t *TaskBox[T, U, C, CT, TF]) Done() {
83107
t.wg.Done()
84108
}
85109

110+
// Finish is to set the TaskBox finish status.
111+
func (t *TaskBox[T, U, C, CT, TF]) Finish() {
112+
t.wg.Done()
113+
}
114+
86115
// Clone is to copy the box
87116
func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
88117
newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID)
@@ -92,6 +121,8 @@ func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
92121
// GPool is a goroutine pool.
93122
type GPool[T any, U any, C any, CT any, TF Context[CT]] interface {
94123
Tune(size int)
124+
DeleteTask(id uint64)
125+
StopTask(id uint64)
95126
}
96127

97128
// TaskController is a controller that can control or watch the pool.
@@ -119,6 +150,12 @@ func (t *TaskController[T, U, C, CT, TF]) Wait() {
119150
<-t.close
120151
t.wg.Wait()
121152
close(t.resultCh)
153+
t.pool.DeleteTask(t.taskID)
154+
}
155+
156+
// Stop is to send stop command to the task. But you still need to wait the task to stop.
157+
func (t *TaskController[T, U, C, CT, TF]) Stop() {
158+
t.pool.StopTask(t.TaskID())
122159
}
123160

124161
// TaskID is to get the task id.
+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pooltask
16+
17+
import (
18+
"container/list"
19+
"sync"
20+
"time"
21+
22+
"go.uber.org/atomic"
23+
)
24+
25+
const shard int = 8
26+
27+
func getShardID(id uint64) uint64 {
28+
return id % uint64(shard)
29+
}
30+
31+
type tContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
32+
task *TaskBox[T, U, C, CT, TF]
33+
}
34+
35+
type meta struct {
36+
stats *list.List
37+
createTS time.Time
38+
origin int32
39+
running int32
40+
}
41+
42+
func newStats(concurrency int32) *meta {
43+
s := &meta{
44+
createTS: time.Now(),
45+
stats: list.New(),
46+
origin: concurrency,
47+
}
48+
return s
49+
}
50+
51+
func (m *meta) getOriginConcurrency() int32 {
52+
return m.origin
53+
}
54+
55+
// TaskStatusContainer is a container that can control or watch the pool.
56+
type TaskStatusContainer[T any, U any, C any, CT any, TF Context[CT]] struct {
57+
stats map[uint64]*meta
58+
rw sync.RWMutex
59+
}
60+
61+
// TaskManager is a manager that can control or watch the pool.
62+
type TaskManager[T any, U any, C any, CT any, TF Context[CT]] struct {
63+
task []TaskStatusContainer[T, U, C, CT, TF]
64+
running atomic.Int32
65+
concurrency int32
66+
}
67+
68+
// NewTaskManager create a new pooltask manager.
69+
func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskManager[T, U, C, CT, TF] {
70+
task := make([]TaskStatusContainer[T, U, C, CT, TF], shard)
71+
for i := 0; i < shard; i++ {
72+
task[i] = TaskStatusContainer[T, U, C, CT, TF]{
73+
stats: make(map[uint64]*meta),
74+
}
75+
}
76+
return TaskManager[T, U, C, CT, TF]{
77+
task: task,
78+
concurrency: c,
79+
}
80+
}
81+
82+
// RegisterTask register a task to the manager.
83+
func (t *TaskManager[T, U, C, CT, TF]) RegisterTask(taskID uint64, concurrency int32) {
84+
id := getShardID(taskID)
85+
t.task[id].rw.Lock()
86+
t.task[id].stats[taskID] = newStats(concurrency)
87+
t.task[id].rw.Unlock()
88+
}
89+
90+
// DeleteTask delete a task from the manager.
91+
func (t *TaskManager[T, U, C, CT, TF]) DeleteTask(taskID uint64) {
92+
shardID := getShardID(taskID)
93+
t.task[shardID].rw.Lock()
94+
delete(t.task[shardID].stats, taskID)
95+
t.task[shardID].rw.Unlock()
96+
}
97+
98+
// hasTask check if the task is in the manager.
99+
func (t *TaskManager[T, U, C, CT, TF]) hasTask(taskID uint64) bool {
100+
shardID := getShardID(taskID)
101+
t.task[shardID].rw.Lock()
102+
defer t.task[shardID].rw.Unlock()
103+
_, ok := t.task[shardID].stats[taskID]
104+
return ok
105+
}
106+
107+
// AddSubTask AddTask add a task to the manager.
108+
func (t *TaskManager[T, U, C, CT, TF]) AddSubTask(taskID uint64, task *TaskBox[T, U, C, CT, TF]) {
109+
shardID := getShardID(taskID)
110+
tc := tContainer[T, U, C, CT, TF]{
111+
task: task,
112+
}
113+
t.running.Inc()
114+
t.task[shardID].rw.Lock()
115+
t.task[shardID].stats[taskID].stats.PushBack(tc)
116+
t.task[shardID].stats[taskID].running++ // running job in this task
117+
t.task[shardID].rw.Unlock()
118+
}
119+
120+
// ExitSubTask is to exit a task, and it will decrease the count of running pooltask.
121+
func (t *TaskManager[T, U, C, CT, TF]) ExitSubTask(taskID uint64) {
122+
shardID := getShardID(taskID)
123+
t.running.Dec() // total running tasks
124+
t.task[shardID].rw.Lock()
125+
t.task[shardID].stats[taskID].running-- // running job in this task
126+
t.task[shardID].rw.Unlock()
127+
}
128+
129+
// Running return the count of running job in this task.
130+
func (t *TaskManager[T, U, C, CT, TF]) Running(taskID uint64) int32 {
131+
shardID := getShardID(taskID)
132+
t.task[shardID].rw.Lock()
133+
defer t.task[shardID].rw.Unlock()
134+
return t.task[shardID].stats[taskID].running
135+
}
136+
137+
// StopTask is to stop a task by TaskID.
138+
func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) {
139+
shardID := getShardID(taskID)
140+
t.task[shardID].rw.Lock()
141+
defer t.task[shardID].rw.Unlock()
142+
l := t.task[shardID].stats[taskID].stats
143+
for e := l.Front(); e != nil; e = e.Next() {
144+
e.Value.(tContainer[T, U, C, CT, TF]).task.SetStatus(StopTask)
145+
}
146+
}

resourcemanager/pooltask/task_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2022 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package pooltask
16+
17+
import (
18+
"sync"
19+
"testing"
20+
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestTaskManager(t *testing.T) {
25+
size := 32
26+
taskConcurrency := 8
27+
tm := NewTaskManager[int, int, int, any, NilContext](int32(size))
28+
tm.RegisterTask(1, int32(taskConcurrency))
29+
for i := 0; i < taskConcurrency; i++ {
30+
tid := NewTaskBox[int, int, int, any, NilContext](1, NilContext{}, &sync.WaitGroup{}, make(chan Task[int]), make(chan int), 1)
31+
tm.AddSubTask(1, &tid)
32+
}
33+
for i := 0; i < taskConcurrency; i++ {
34+
tm.ExitSubTask(1)
35+
}
36+
require.Equal(t, int32(0), tm.Running(1))
37+
require.True(t, tm.hasTask(1))
38+
tm.DeleteTask(1)
39+
require.False(t, tm.hasTask(1))
40+
}

resourcemanager/scheduler/cpu_scheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
3535
return Hold
3636
}
3737
if cpu.GetCPUUsage() < 0.5 {
38-
return Downclock
38+
return Overclock
3939
}
4040
if cpu.GetCPUUsage() > 0.7 {
41-
return Overclock
41+
return Downclock
4242
}
4343
return Hold
4444
}

session/session.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -3416,7 +3416,11 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
34163416
for i := 0; i < cnt; i++ {
34173417
subCtxs[i] = sessionctx.Context(syncStatsCtxs[i])
34183418
}
3419-
if err = dom.LoadAndUpdateStatsLoop(subCtxs); err != nil {
3419+
initStatsCtx, err := createSession(store)
3420+
if err != nil {
3421+
return nil, err
3422+
}
3423+
if err = dom.LoadAndUpdateStatsLoop(subCtxs, initStatsCtx); err != nil {
34203424
return nil, err
34213425
}
34223426

0 commit comments

Comments
 (0)