-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: refine dispatcher #45460
Merged
Merged
disttask: refine dispatcher #45460
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
fbc05ed
init
ywqzzy 5e0dc23
Merge branch 'master' of https://github.com/pingcap/tidb into dispatc…
ywqzzy 23a06c9
refine
ywqzzy 628a608
owner change test
ywqzzy 46b5a3d
fix build
ywqzzy 87b62be
rename
ywqzzy aa872f8
fix
ywqzzy 5e33ec9
rename submitTask to executeTask
ywqzzy b626a9e
fix
ywqzzy d660da5
fix
ywqzzy 4c69b20
rename d to p
ywqzzy cabcea5
rename
ywqzzy f6490e1
u
ywqzzy b9909f9
u
ywqzzy e0723dc
u
ywqzzy 83d2de1
u
ywqzzy 1bc2b34
remove useless line
ywqzzy e7dd265
refine
ywqzzy fe2f755
Merge branch 'master' of https://github.com/pingcap/tidb into dispatc…
ywqzzy 3ab5a6f
more refine
ywqzzy 1e0a147
refine
ywqzzy b22d2a8
fix
ywqzzy 52def17
fix
ywqzzy 61a4fe6
use gpool
ywqzzy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,14 +51,16 @@ | |
retrySQLInterval = 500 * time.Millisecond | ||
) | ||
|
||
// Dispatch defines the interface for operations inside a dispatcher. | ||
type Dispatch interface { | ||
// Dispatcher defines the interface for operations inside a dispatcher. | ||
type Dispatcher interface { | ||
// Start enables dispatching and monitoring mechanisms. | ||
Start() | ||
// GetAllSchedulerIDs gets handles the task's all available instances. | ||
GetAllSchedulerIDs(ctx context.Context, handle TaskFlowHandle, gTask *proto.Task) ([]string, error) | ||
// Stop stops the dispatcher. | ||
Stop() | ||
// Inited check if the dispatcher Started. | ||
Inited() bool | ||
} | ||
|
||
// TaskHandle provides the interface for operations needed by task flow handles. | ||
|
@@ -80,7 +82,6 @@ | |
d.runningGTasks.Lock() | ||
d.runningGTasks.taskIDs[gTask.ID] = struct{}{} | ||
d.runningGTasks.Unlock() | ||
d.detectPendingGTaskCh <- gTask | ||
} | ||
|
||
func (d *dispatcher) isRunningGTask(globalTaskID int64) bool { | ||
|
@@ -96,12 +97,15 @@ | |
delete(d.runningGTasks.taskIDs, globalTaskID) | ||
} | ||
|
||
// dispatcher dispatch and monitor tasks. | ||
// The monitoring task number is limited by size of gPool. | ||
type dispatcher struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
taskMgr *storage.TaskManager | ||
wg tidbutil.WaitGroupWrapper | ||
gPool *spool.Pool | ||
inited bool | ||
|
||
runningGTasks struct { | ||
syncutil.RWMutex | ||
|
@@ -111,35 +115,43 @@ | |
} | ||
|
||
// NewDispatcher creates a dispatcher struct. | ||
func NewDispatcher(ctx context.Context, taskTable *storage.TaskManager) (Dispatch, error) { | ||
func NewDispatcher(ctx context.Context, taskTable *storage.TaskManager) (Dispatcher, error) { | ||
dispatcher := &dispatcher{ | ||
taskMgr: taskTable, | ||
detectPendingGTaskCh: make(chan *proto.Task, DefaultDispatchConcurrency), | ||
} | ||
pool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true)) | ||
gPool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
dispatcher.gPool = pool | ||
dispatcher.gPool = gPool | ||
dispatcher.ctx, dispatcher.cancel = context.WithCancel(ctx) | ||
dispatcher.runningGTasks.taskIDs = make(map[int64]struct{}) | ||
|
||
return dispatcher, nil | ||
} | ||
|
||
// Start implements Dispatch.Start interface. | ||
// Start implements dispatcher.Start interface. | ||
func (d *dispatcher) Start() { | ||
d.wg.Run(d.DispatchTaskLoop) | ||
d.wg.Run(d.DetectTaskLoop) | ||
d.inited = true | ||
} | ||
|
||
// Stop implements Dispatch.Stop interface. | ||
// Stop implements dispatcher.Stop interface. | ||
func (d *dispatcher) Stop() { | ||
d.cancel() | ||
d.gPool.ReleaseAndWait() | ||
d.wg.Wait() | ||
d.inited = false | ||
} | ||
|
||
func (d *dispatcher) Inited() bool { | ||
return d.inited | ||
} | ||
|
||
// MockOwnerChange mock owner change in tests. | ||
var MockOwnerChange func() | ||
|
||
// DispatchTaskLoop dispatches the global tasks. | ||
func (d *dispatcher) DispatchTaskLoop() { | ||
logutil.BgLogger().Info("dispatch task loop start") | ||
|
@@ -175,6 +187,7 @@ | |
// the task is not in runningGTasks set when: | ||
// owner changed or task is cancelled when status is pending. | ||
if gTask.State == proto.TaskStateRunning || gTask.State == proto.TaskStateReverting || gTask.State == proto.TaskStateCancelling { | ||
d.executeTask(gTask) | ||
d.setRunningGTask(gTask) | ||
cnt++ | ||
continue | ||
|
@@ -184,20 +197,26 @@ | |
break | ||
} | ||
|
||
err = d.processNormalFlow(gTask) | ||
logutil.BgLogger().Info("dispatch task loop", zap.Int64("task ID", gTask.ID), | ||
zap.String("state", gTask.State), zap.Uint64("concurrency", gTask.Concurrency), zap.Error(err)) | ||
if err != nil || gTask.IsFinished() { | ||
continue | ||
} | ||
d.executeTask(gTask) | ||
d.setRunningGTask(gTask) | ||
cnt++ | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (d *dispatcher) probeTask(taskID int64) (gTask *proto.Task, finished bool, subTaskErrs []error) { | ||
func (d *dispatcher) executeTask(gTask *proto.Task) { | ||
// Using the pool with block, so it wouldn't return an error. | ||
_ = d.gPool.Run(func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about move to dispatcher manager?
Then we no need finished channel. |
||
logutil.BgLogger().Info("execute one task", zap.Int64("task ID", gTask.ID), | ||
zap.String("state", gTask.State), zap.Uint64("concurrency", gTask.Concurrency)) | ||
d.scheduleTask(gTask.ID) | ||
}) | ||
} | ||
|
||
// monitorTask checks whether the current step of one task is finished, | ||
// and gather subTaskErrs to handle subTask fails. | ||
func (d *dispatcher) monitorTask(taskID int64) (gTask *proto.Task, finished bool, subTaskErrs []error) { | ||
// TODO: Consider putting the following operations into a transaction. | ||
gTask, err := d.taskMgr.GetGlobalTaskByID(taskID) | ||
if err != nil { | ||
|
@@ -233,40 +252,25 @@ | |
} | ||
} | ||
|
||
// DetectTaskLoop monitors the status of the subtasks and processes them. | ||
func (d *dispatcher) DetectTaskLoop() { | ||
logutil.BgLogger().Info("detect task loop start") | ||
for { | ||
select { | ||
case <-d.ctx.Done(): | ||
logutil.BgLogger().Info("detect task loop exits", zap.Error(d.ctx.Err())) | ||
return | ||
case task := <-d.detectPendingGTaskCh: | ||
// Using the pool with block, so it wouldn't return an error. | ||
_ = d.gPool.Run(func() { d.detectTask(task.ID) }) | ||
} | ||
} | ||
} | ||
|
||
func (d *dispatcher) detectTask(taskID int64) { | ||
// scheduleTask schedule the task execution step by step. | ||
func (d *dispatcher) scheduleTask(taskID int64) { | ||
ticker := time.NewTicker(checkTaskFinishedInterval) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-d.ctx.Done(): | ||
logutil.BgLogger().Info("detect task exits", zap.Int64("task ID", taskID), zap.Error(d.ctx.Err())) | ||
logutil.BgLogger().Info("schedule task exits", zap.Int64("task ID", taskID), zap.Error(d.ctx.Err())) | ||
return | ||
case <-ticker.C: | ||
failpoint.Inject("cancelTaskBeforeProbe", func(val failpoint.Value) { | ||
if val.(bool) { | ||
gTask, stepIsFinished, errs := d.monitorTask(taskID) | ||
failpoint.Inject("cancelTaskAfterMonitorTask", func(val failpoint.Value) { | ||
if val.(bool) && gTask.State == proto.TaskStateRunning { | ||
err := d.taskMgr.CancelGlobalTask(taskID) | ||
if err != nil { | ||
logutil.BgLogger().Error("cancel global task failed", zap.Error(err)) | ||
logutil.BgLogger().Error("cancel task failed", zap.Error(err)) | ||
} | ||
} | ||
}) | ||
gTask, stepIsFinished, errs := d.probeTask(taskID) | ||
// The global task isn't finished and not failed. | ||
if !stepIsFinished && len(errs) == 0 { | ||
GetTaskFlowHandle(gTask.Type).OnTicker(d.ctx, gTask) | ||
|
@@ -287,6 +291,14 @@ | |
zap.Int64("task-id", gTask.ID), zap.String("state", gTask.State)) | ||
} | ||
} | ||
|
||
failpoint.Inject("mockOwnerChange", func(val failpoint.Value) { | ||
if val.(bool) { | ||
logutil.BgLogger().Info("mockOwnerChange called") | ||
MockOwnerChange() | ||
time.Sleep(time.Second) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
@@ -335,7 +347,7 @@ | |
// 1. generate the needed global task meta and subTask meta (dist-plan). | ||
handle := GetTaskFlowHandle(gTask.Type) | ||
if handle == nil { | ||
logutil.BgLogger().Warn("gen gTask flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type)) | ||
logutil.BgLogger().Warn("gen task flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type)) | ||
return d.updateTask(gTask, proto.TaskStateReverted, nil, retrySQLTimes) | ||
} | ||
meta, err := handle.ProcessErrFlow(d.ctx, d, gTask, receiveErr) | ||
|
@@ -351,7 +363,7 @@ | |
func (d *dispatcher) dispatchSubTask4Revert(gTask *proto.Task, handle TaskFlowHandle, meta []byte) error { | ||
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, handle, gTask) | ||
if err != nil { | ||
logutil.BgLogger().Warn("get global task's all instances failed", zap.Error(err)) | ||
logutil.BgLogger().Warn("get task's all instances failed", zap.Error(err)) | ||
return err | ||
} | ||
|
||
|
@@ -370,7 +382,7 @@ | |
// 1. generate the needed global task meta and subTask meta (dist-plan). | ||
handle := GetTaskFlowHandle(gTask.Type) | ||
if handle == nil { | ||
logutil.BgLogger().Warn("gen gTask flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type)) | ||
logutil.BgLogger().Warn("gen task flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type)) | ||
gTask.Error = errors.New("unsupported task type") | ||
return d.updateTask(gTask, proto.TaskStateReverted, nil, retrySQLTimes) | ||
} | ||
|
@@ -436,7 +448,7 @@ | |
pos := i % len(serverNodes) | ||
instanceID := disttaskutil.GenerateExecID(serverNodes[pos].IP, serverNodes[pos].Port) | ||
logutil.BgLogger().Debug("create subtasks", | ||
zap.Int("gTask.ID", int(gTask.ID)), zap.String("type", gTask.Type), zap.String("instanceID", instanceID)) | ||
zap.Int("task.ID", int(gTask.ID)), zap.String("type", gTask.Type), zap.String("instanceID", instanceID)) | ||
subTasks = append(subTasks, proto.NewSubtask(gTask.ID, gTask.Type, instanceID, meta)) | ||
} | ||
|
||
|
@@ -483,6 +495,7 @@ | |
return ids, nil | ||
} | ||
|
||
// GetPreviousSubtaskMetas get subtask metas from specific step. | ||
func (d *dispatcher) GetPreviousSubtaskMetas(gTaskID int64, step int64) ([][]byte, error) { | ||
previousSubtasks, err := d.taskMgr.GetSucceedSubtasksByStep(gTaskID, step) | ||
if err != nil { | ||
|
@@ -496,17 +509,19 @@ | |
return previousSubtaskMetas, nil | ||
} | ||
|
||
// WithNewSession executes the function with a new session. | ||
func (d *dispatcher) WithNewSession(fn func(se sessionctx.Context) error) error { | ||
return d.taskMgr.WithNewSession(fn) | ||
} | ||
|
||
// WithNewTxn executes the fn in a new transaction. | ||
func (d *dispatcher) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error { | ||
return d.taskMgr.WithNewTxn(ctx, fn) | ||
} | ||
|
||
func (*dispatcher) checkConcurrencyOverflow(cnt int) bool { | ||
if cnt >= DefaultDispatchConcurrency { | ||
logutil.BgLogger().Info("dispatch task loop, running GTask cnt is more than concurrency", | ||
logutil.BgLogger().Info("dispatch task loop, running task cnt is more than concurrency limitation", | ||
zap.Int("running cnt", cnt), zap.Int("concurrency", DefaultDispatchConcurrency)) | ||
return true | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use this function. Could we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
Inited
is better for checking of the dispatcher is started.