Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix removing meta when met network partition for so long then recover from it #48005

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,16 @@ func TestFrameworkSetLabel(t *testing.T) {
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
tk := testkit.NewTestKit(t, distContext.Store)

// 1. all "" role.
DispatchTaskAndCheckSuccess("😁", t, &m)

// 2. one "background" role.
tk.MustExec("set global tidb_service_scope=background")
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))
DispatchTaskAndCheckSuccess("😊", t, &m)

// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
DispatchTaskAndCheckSuccess("😆", t, &m)
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/metrics",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
Expand Down
94 changes: 64 additions & 30 deletions pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var (
schedulerPoolSize int32 = 4
// same as dispatcher
checkTime = 300 * time.Millisecond
retrySQLTimes = 3
retrySQLInterval = 500 * time.Millisecond
checkTime = 300 * time.Millisecond
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -70,7 +73,7 @@ type Manager struct {
}
// id, it's the same as server id now, i.e. host:port.
id string
wg sync.WaitGroup
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
Expand All @@ -97,36 +100,33 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
return m, nil
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
var err error
func (m *Manager) initMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.StartManager(m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
if i%10 == 0 {
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", retrySQLTimes), zap.Error(err))
logutil.Logger(m.logCtx).Warn("start manager failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", i),
zap.Error(err))
}
time.Sleep(retrySQLInterval)
}
if err != nil {
return err
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
if err := m.initMeta(); err != nil {
return err
}

m.wg.Add(1)
go func() {
defer m.wg.Done()
m.fetchAndHandleRunnableTasks(m.ctx)
}()

m.wg.Add(1)
go func() {
defer m.wg.Done()
m.fetchAndFastCancelTasks(m.ctx)
}()
m.wg.Run(m.fetchAndHandleRunnableTasksLoop)
m.wg.Run(m.fetchAndFastCancelTasksLoop)
m.wg.Run(m.recoverMetaLoop)
return nil
}

Expand All @@ -138,40 +138,43 @@ func (m *Manager) Stop() {
}

// fetchAndHandleRunnableTasks fetches the runnable tasks from the global task table and handles them.
func (m *Manager) fetchAndHandleRunnableTasks(ctx context.Context) {
func (m *Manager) fetchAndHandleRunnableTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndHandleRunnableTasksLoop", m.fetchAndHandleRunnableTasksLoop, false)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(checkTime)
for {
select {
case <-ctx.Done():
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasks done")
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateRunning, proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
}
m.onRunnableTasks(ctx, tasks)
m.onRunnableTasks(m.ctx, tasks)
}
}
}

// fetchAndFastCancelTasks fetches the reverting/pausing tasks from the global task table and fast cancels them.
func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {
func (m *Manager) fetchAndFastCancelTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndFastCancelTasksLoop", m.fetchAndFastCancelTasksLoop, false)

ticker := time.NewTicker(checkTime)
for {
select {
case <-ctx.Done():
case <-m.ctx.Done():
m.cancelAllRunningTasks()
logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasks done")
logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasksLoop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
}
m.onCanceledTasks(ctx, tasks)
m.onCanceledTasks(m.ctx, tasks)

// cancel pending/running subtasks, and mark them as paused.
pausingTasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStatePausing)
Expand All @@ -189,6 +192,9 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {

// onRunnableTasks handles runnable tasks.
func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
if len(tasks) == 0 {
return
}
tasks = m.filterAlreadyHandlingTasks(tasks)
for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step,
Expand Down Expand Up @@ -221,6 +227,9 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {

// onCanceledTasks cancels the running subtasks.
func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
if len(tasks) == 0 {
return
}
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
Expand All @@ -234,6 +243,9 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {

// onPausingTasks pauses/cancels the pending/running subtasks.
func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
if len(tasks) == 0 {
return nil
}
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
Expand All @@ -250,6 +262,28 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
return nil
}

// recoverMetaLoop inits and recovers dist_framework_meta for the tidb node running the scheduler manager.
// This is necessary when the TiDB node experiences a prolonged network partition
// and the dispatcher deletes `dist_framework_meta`.
// When the TiDB node recovers from the network partition,
// we need to re-insert the metadata.
func (m *Manager) recoverMetaLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "recoverMetaLoop", m.recoverMetaLoop, false)
ticker := time.NewTicker(recoverMetaInterval)
for {
select {
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("recoverMetaLoop done")
return
case <-ticker.C:
if err := m.initMeta(); err != nil {
m.logErr(err)
continue
}
}
}
}

// cancelAllRunningTasks cancels all running tasks.
func (m *Manager) cancelAllRunningTasks() {
m.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func TestDistFrameworkMeta(t *testing.T) {

require.NoError(t, sm.StartManager(":4000", "background"))
require.NoError(t, sm.StartManager(":4001", ""))
require.NoError(t, sm.StartManager(":4002", ""))
require.NoError(t, sm.StartManager(":4002", "background"))

allNodes, err := sm.GetAllNodes()
Expand Down
3 changes: 1 addition & 2 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func (stm *TaskManager) StartSubtask(subtaskID int64) error {

// StartManager insert the manager information into dist_framework_meta.
func (stm *TaskManager) StartManager(tidbID string, role string) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.dist_framework_meta values(%?, %?, DEFAULT)
on duplicate key update role = %?`, tidbID, role, role)
_, err := stm.executeSQLWithNewSession(stm.ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, tidbID, role)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error {
func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, schedulerManager *scheduler.Manager, serverID string) {
err := schedulerManager.Start()
if err != nil {
logutil.BgLogger().Error("dist task scheduler manager failed", zap.Error(err))
logutil.BgLogger().Error("dist task scheduler manager start failed", zap.Error(err))
return
}
logutil.BgLogger().Info("dist task scheduler manager started")
Expand Down
4 changes: 1 addition & 3 deletions pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
dom := domain.GetDomain(e.Ctx())
serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID())
_, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx,
`update mysql.dist_framework_meta
set role = %?
where host = %?`, valStr, serverID)
`replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, serverID, valStr)
}
return err
}
Expand Down