Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix dispatcher cancel #45790

Merged
merged 7 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/resourcemanager/pool/spool"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
disttaskutil "github.com/pingcap/tidb/util/disttask"
Expand Down Expand Up @@ -72,35 +71,27 @@ type TaskHandle interface {
// Manage the lifetime of a task
// including submitting subtasks and updating the status of a task.
type dispatcher struct {
ctx context.Context
gPool *spool.Pool
taskMgr *storage.TaskManager
task *proto.Task
finishedCh chan *proto.Task
ctx context.Context
taskMgr *storage.TaskManager
task *proto.Task
}

// MockOwnerChange mock owner change in tests.
var MockOwnerChange func()

func newDispatcher(ctx context.Context, gPool *spool.Pool, taskMgr *storage.TaskManager, task *proto.Task, finishedCh chan *proto.Task) *dispatcher {
func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *proto.Task) *dispatcher {
return &dispatcher{
ctx,
gPool,
taskMgr,
task,
finishedCh,
}
}

// ExecuteTask start to schedule a task
func (d *dispatcher) ExecuteTask() {
// Using the pool with block, so it wouldn't return an error.
_ = d.gPool.Run(func() {
logutil.BgLogger().Info("execute one task", zap.Int64("task ID", d.task.ID),
zap.String("state", d.task.State), zap.Uint64("concurrency", d.task.Concurrency))
d.scheduleTask(d.task.ID)
d.finishedCh <- d.task
})
logutil.BgLogger().Info("execute one task", zap.Int64("task ID", d.task.ID),
zap.String("state", d.task.State), zap.Uint64("concurrency", d.task.Concurrency))
d.scheduleTask(d.task.ID)
}

// monitorTask checks whether the current step of one task is finished,
Expand Down
18 changes: 10 additions & 8 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (dm *Manager) setRunningTask(task *proto.Task, dispatcher *dispatcher) {
defer dm.runningTasks.Unlock()
dm.runningTasks.taskIDs[task.ID] = struct{}{}
dm.runningTasks.dispatchers[task.ID] = dispatcher
dispatcher.ExecuteTask()
}

func (dm *Manager) isRunningTask(taskID int64) bool {
Expand Down Expand Up @@ -104,13 +103,13 @@ func NewManager(ctx context.Context, taskTable *storage.TaskManager) (*Manager,
return dispatcherManager, nil
}

// Start start the dispatcherManager, start the dispatchTaskLoop to start multiple dispatchers.
// Start the dispatcherManager, start the dispatchTaskLoop to start multiple dispatchers.
func (dm *Manager) Start() {
dm.wg.Run(dm.dispatchTaskLoop)
dm.inited = true
}

// Stop stop the dispatcherManager.
// Stop the dispatcherManager.
func (dm *Manager) Stop() {
dm.cancel()
dm.gPool.ReleaseAndWait()
Expand All @@ -134,8 +133,6 @@ func (dm *Manager) dispatchTaskLoop() {
case <-dm.ctx.Done():
logutil.BgLogger().Info("dispatch task loop exits", zap.Error(dm.ctx.Err()), zap.Int64("interval", int64(checkTaskRunningInterval)/1000000))
return
case task := <-dm.finishedTaskCh:
dm.delRunningTask(task.ID)
case <-ticker.C:
cnt := dm.getRunningTaskCnt()
if dm.checkConcurrencyOverflow(cnt) {
Expand Down Expand Up @@ -185,11 +182,16 @@ func (*Manager) checkConcurrencyOverflow(cnt int) bool {
}

func (dm *Manager) startDispatcher(task *proto.Task) {
dispatcher := newDispatcher(dm.ctx, dm.gPool, dm.taskMgr, task, dm.finishedTaskCh)
dm.setRunningTask(task, dispatcher)
// Using the pool with block, so it wouldn't return an error.
_ = dm.gPool.Run(func() {
dispatcher := newDispatcher(dm.ctx, dm.taskMgr, task)
dm.setRunningTask(task, dispatcher)
dispatcher.ExecuteTask()
dm.delRunningTask(task.ID)
})
}

// MockDispatcher mock one dispatcher for one task, only used for tests.
func (dm *Manager) MockDispatcher(task *proto.Task) *dispatcher {
return &dispatcher{dm.ctx, dm.gPool, dm.taskMgr, task, nil}
return &dispatcher{dm.ctx, dm.taskMgr, task}
}
8 changes: 4 additions & 4 deletions disttask/framework/dispatcher/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ type DispatcherManagerForTest interface {
}

// GetRunningGTaskCnt implements Dispatcher.GetRunningGTaskCnt interface.
func (d *Manager) GetRunningTaskCnt() int {
return d.getRunningTaskCnt()
func (dm *Manager) GetRunningTaskCnt() int {
return dm.getRunningTaskCnt()
}

// DelRunningGTask implements Dispatcher.DelRunningGTask interface.
func (d *Manager) DelRunningTask(globalTaskID int64) {
d.delRunningTask(globalTaskID)
func (dm *Manager) DelRunningTask(globalTaskID int64) {
dm.delRunningTask(globalTaskID)
}

func TestMain(m *testing.M) {
Expand Down