From fd642d843ec3674d605dea27c36bdef235b77998 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 00:54:35 +0800 Subject: [PATCH 1/7] change --- ddl/backfilling_dispatcher.go | 18 +++-- ddl/backfilling_dispatcher_test.go | 21 ++++- disttask/framework/dispatcher/dispatcher.go | 76 +++++++++++-------- .../framework/dispatcher/dispatcher_test.go | 21 +++-- disttask/framework/dispatcher/interface.go | 13 ++-- .../framework_dynamic_dispatch_test.go | 24 +++--- .../framework/framework_err_handling_test.go | 36 +++------ disttask/framework/framework_ha_test.go | 19 ++--- disttask/framework/framework_rollback_test.go | 13 ++-- disttask/framework/framework_test.go | 31 ++++---- disttask/framework/proto/task.go | 8 +- disttask/framework/proto/task_test.go | 27 +++++++ disttask/framework/storage/task_table.go | 2 +- disttask/importinto/dispatcher.go | 29 +++---- .../importinto/dispatcher_testkit_test.go | 8 +- disttask/importinto/proto.go | 5 +- 16 files changed, 196 insertions(+), 155 deletions(-) create mode 100644 disttask/framework/proto/task_test.go diff --git a/ddl/backfilling_dispatcher.go b/ddl/backfilling_dispatcher.go index a49852718f561..c679cad1e18e3 100644 --- a/ddl/backfilling_dispatcher.go +++ b/ddl/backfilling_dispatcher.go @@ -117,14 +117,16 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch(ctx context.Context, } } -// StageFinished check if current stage finished. -func (*backfillingDispatcherExt) StageFinished(_ *proto.Task) bool { - return true -} - -// Finished check if current task finished. -func (*backfillingDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepOne +func (*backfillingDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + case proto.StepOne: + return proto.StepTwo + default: + // current step should be proto.StepOne + return proto.StepDone + } } // OnErrStage generate error handling stage's plan. diff --git a/ddl/backfilling_dispatcher_test.go b/ddl/backfilling_dispatcher_test.go index 832b80b34b496..acfb3300b5bcc 100644 --- a/ddl/backfilling_dispatcher_test.go +++ b/ddl/backfilling_dispatcher_test.go @@ -61,10 +61,19 @@ func TestBackfillingDispatcher(t *testing.T) { // 1.2 test partition table OnNextSubtasksBatch after StepInit finished. gTask.State = proto.TaskStateRunning - gTask.Step++ + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepOne, gTask.Step) + // empty stepTwo metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask) require.NoError(t, err) require.Equal(t, 0, len(metas)) + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepTwo, gTask.Step) + metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask) + require.NoError(t, err) + require.Equal(t, 0, len(metas)) + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepDone, gTask.Step) // 1.3 test partition table OnErrStage. errMeta, err := dsp.OnErrStage(context.Background(), nil, gTask, []error{errors.New("mockErr")}) @@ -93,12 +102,20 @@ func TestBackfillingDispatcher(t *testing.T) { metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask) require.NoError(t, err) require.Equal(t, 1, len(metas)) + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepOne, gTask.Step) // 2.2.2 stepOne - gTask.Step++ gTask.State = proto.TaskStateRunning metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask) require.NoError(t, err) require.Equal(t, 1, len(metas)) + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepTwo, gTask.Step) + metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask) + require.NoError(t, err) + require.Equal(t, 0, len(metas)) + gTask.Step = dsp.GetNextStep(gTask) + require.Equal(t, proto.StepDone, gTask.Step) } func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType string) *proto.Task { diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index 4c3272c15c88b..e5782c287f595 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -241,18 +241,6 @@ func (d *BaseDispatcher) onRunning() error { } if cnt == 0 { - logutil.Logger(d.logCtx).Info("previous subtasks finished, generate dist plan", zap.Int64("stage", d.Task.Step)) - // When all subtasks dispatched and processed, mark task as succeed. - if d.Finished(d.Task) { - d.Task.StateUpdateTime = time.Now().UTC() - logutil.Logger(d.logCtx).Info("all subtasks dispatched and processed, finish the task") - err := d.UpdateTask(proto.TaskStateSucceed, nil, RetrySQLTimes) - if err != nil { - logutil.Logger(d.logCtx).Warn("update task failed", zap.Error(err)) - return err - } - return nil - } return d.onNextStage() } // Check if any node are down. @@ -404,17 +392,29 @@ func (d *BaseDispatcher) dispatchSubTask4Revert(meta []byte) error { subTasks := make([]*proto.Subtask, 0, len(instanceIDs)) for _, id := range instanceIDs { - subTasks = append(subTasks, proto.NewSubtask(d.Task.ID, d.Task.Type, id, meta)) + // reverting subtasks belong to the same step as current active step. + subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, meta)) } return d.UpdateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes) } -func (d *BaseDispatcher) onNextStage() error { +func (*BaseDispatcher) nextStepSubtaskDispatched(*proto.Task) bool { + // TODO: will implement it when we we support dispatch subtask by batch. + // since subtask meta might be too large to save in one transaction. + return true +} + +func (d *BaseDispatcher) onNextStage() (err error) { /// dynamic dispatch subtasks. failpoint.Inject("mockDynamicDispatchErr", func() { failpoint.Return(errors.New("mockDynamicDispatchErr")) }) + nextStep := d.GetNextStep(d.Task) + logutil.Logger(d.logCtx).Info("onNextStage", + zap.Int64("current-step", d.Task.Step), + zap.Int64("next-step", nextStep)) + // 1. Adjust the global task's concurrency. if d.Task.State == proto.TaskStatePending { if d.Task.Concurrency == 0 { @@ -423,20 +423,36 @@ func (d *BaseDispatcher) onNextStage() error { if d.Task.Concurrency > MaxSubtaskConcurrency { d.Task.Concurrency = MaxSubtaskConcurrency } - d.Task.StateUpdateTime = time.Now().UTC() - if err := d.UpdateTask(proto.TaskStateRunning, nil, RetrySQLTimes); err != nil { - return err - } - } else if d.StageFinished(d.Task) { - // 2. when previous stage finished, update to next stage. - d.Task.Step++ - logutil.Logger(d.logCtx).Info("previous stage finished, run into next stage", zap.Int64("from", d.Task.Step-1), zap.Int64("to", d.Task.Step)) - d.Task.StateUpdateTime = time.Now().UTC() - err := d.UpdateTask(proto.TaskStateRunning, nil, RetrySQLTimes) + } + defer func() { if err != nil { - return err + return } - } + // invariant: task.Step always means the most recent step that all + // corresponding subtasks have been saved to system table. + // + // when all subtasks of task.Step is finished, we call OnNextSubtasksBatch + // to generate subtasks of next step. after all subtasks of next step are + // saved to system table, we will update task.Step to next step, so the + // invariant hold. + // see nextStepSubtaskDispatched for why we don't update task and subtasks + // in a single transaction. + if d.nextStepSubtaskDispatched(d.Task) { + currStep := d.Task.Step + d.Task.Step = nextStep + // When all subtasks dispatched and processed, mark task as succeed. + taskState := proto.TaskStateRunning + if d.Task.Step == proto.StepDone { + taskState = proto.TaskStateSucceed + logutil.Logger(d.logCtx).Info("all subtasks dispatched and processed, finish the task") + } else { + logutil.Logger(d.logCtx).Info("move to next stage", + zap.Int64("from", currStep), zap.Int64("to", d.Task.Step)) + } + d.Task.StateUpdateTime = time.Now().UTC() + err = d.UpdateTask(taskState, nil, RetrySQLTimes) + } + }() for { // 3. generate a batch of subtasks. @@ -451,12 +467,12 @@ func (d *BaseDispatcher) onNextStage() error { }) // 4. dispatch batch of subtasks to EligibleInstances. - err = d.dispatchSubTask(metas) + err = d.dispatchSubTask(nextStep, metas) if err != nil { return err } - if d.StageFinished(d.Task) { + if d.nextStepSubtaskDispatched(d.Task) { break } @@ -467,7 +483,7 @@ func (d *BaseDispatcher) onNextStage() error { return nil } -func (d *BaseDispatcher) dispatchSubTask(metas [][]byte) error { +func (d *BaseDispatcher) dispatchSubTask(subtaskStep int64, metas [][]byte) error { logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.String("state", d.Task.State), zap.Int64("step", d.Task.Step), zap.Uint64("concurrency", d.Task.Concurrency), zap.Int("subtasks", len(metas))) // select all available TiDB nodes for task. @@ -499,7 +515,7 @@ func (d *BaseDispatcher) dispatchSubTask(metas [][]byte) error { pos := i % len(serverNodes) instanceID := disttaskutil.GenerateExecID(serverNodes[pos].IP, serverNodes[pos].Port) logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID)) - subTasks = append(subTasks, proto.NewSubtask(d.Task.ID, d.Task.Type, instanceID, meta)) + subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, meta)) } return d.addSubtasks(subTasks) } diff --git a/disttask/framework/dispatcher/dispatcher_test.go b/disttask/framework/dispatcher/dispatcher_test.go index 8a3993cf6ee01..c7b8fac1a7076 100644 --- a/disttask/framework/dispatcher/dispatcher_test.go +++ b/disttask/framework/dispatcher/dispatcher_test.go @@ -66,12 +66,8 @@ func (*testDispatcherExt) IsRetryableErr(error) bool { return true } -func (dsp *testDispatcherExt) StageFinished(task *proto.Task) bool { - return true -} - -func (dsp *testDispatcherExt) Finished(task *proto.Task) bool { - return false +func (*testDispatcherExt) GetNextStep(*proto.Task) int64 { + return proto.StepDone } type numberExampleDispatcherExt struct{} @@ -108,12 +104,13 @@ func (*numberExampleDispatcherExt) IsRetryableErr(error) bool { return true } -func (*numberExampleDispatcherExt) StageFinished(task *proto.Task) bool { - return true -} - -func (*numberExampleDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepTwo +func (*numberExampleDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + default: + return proto.StepDone + } } func MockDispatcherManager(t *testing.T, pool *pools.ResourcePool) (*dispatcher.Manager, *storage.TaskManager) { diff --git a/disttask/framework/dispatcher/interface.go b/disttask/framework/dispatcher/interface.go index 758da0c52da9d..faf290b781d3e 100644 --- a/disttask/framework/dispatcher/interface.go +++ b/disttask/framework/dispatcher/interface.go @@ -38,6 +38,7 @@ type Extension interface { // it's called when: // 1. task is pending and entering it's first step. // 2. subtasks dispatched has all finished with no error. + // when next step is StepDone, it should return nil, nil. OnNextSubtasksBatch(ctx context.Context, h TaskHandle, task *proto.Task) (subtaskMetas [][]byte, err error) // OnErrStage is called when: @@ -52,14 +53,10 @@ type Extension interface { // IsRetryableErr is used to check whether the error occurred in dispatcher is retryable. IsRetryableErr(err error) bool - // StageFinished is used to check if all subtasks in current stage are dispatched and processed. - // StageFinished is called before generating batch of subtasks. - StageFinished(task *proto.Task) bool - - // Finished is used to check if all subtasks for the task are dispatched and processed. - // Finished is called before generating batch of subtasks. - // Once Finished return true, mark the task as succeed. - Finished(task *proto.Task) bool + // GetNextStep is used to get the next step for the task. + // if task runs successfully, it should go from StepInit to business steps, + // then to StepDone, then dispatcher will mark it as finished. + GetNextStep(task *proto.Task) int64 } // FactoryFn is used to create a dispatcher. diff --git a/disttask/framework/framework_dynamic_dispatch_test.go b/disttask/framework/framework_dynamic_dispatch_test.go index 29f5331e3cd06..53b5d0051854b 100644 --- a/disttask/framework/framework_dynamic_dispatch_test.go +++ b/disttask/framework/framework_dynamic_dispatch_test.go @@ -39,7 +39,7 @@ func (*testDynamicDispatcherExt) OnTick(_ context.Context, _ *proto.Task) {} func (dsp *testDynamicDispatcherExt) OnNextSubtasksBatch(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) { // step1 - if gTask.Step == proto.StepInit && dsp.cnt < 3 { + if gTask.Step == proto.StepInit { dsp.cnt++ return [][]byte{ []byte(fmt.Sprintf("task%d", dsp.cnt)), @@ -48,7 +48,7 @@ func (dsp *testDynamicDispatcherExt) OnNextSubtasksBatch(_ context.Context, _ di } // step2 - if gTask.Step == proto.StepOne && dsp.cnt < 4 { + if gTask.Step == proto.StepOne { dsp.cnt++ return [][]byte{ []byte(fmt.Sprintf("task%d", dsp.cnt)), @@ -61,18 +61,15 @@ func (*testDynamicDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.Task return nil, nil } -func (dsp *testDynamicDispatcherExt) StageFinished(task *proto.Task) bool { - if task.Step == proto.StepInit && dsp.cnt >= 3 { - return true +func (dsp *testDynamicDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + case proto.StepOne: + return proto.StepTwo + default: + return proto.StepDone } - if task.Step == proto.StepOne && dsp.cnt >= 4 { - return true - } - return false -} - -func (dsp *testDynamicDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepOne && dsp.cnt >= 4 } func (*testDynamicDispatcherExt) GetEligibleInstances(_ context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) { @@ -94,6 +91,7 @@ func TestFrameworkDynamicBasic(t *testing.T) { } func TestFrameworkDynamicHA(t *testing.T) { + t.Skip("skip this test because the function is not implemented yet") var m sync.Map ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/disttask/framework/framework_err_handling_test.go b/disttask/framework/framework_err_handling_test.go index 6479c3e7ae98a..3910b204b58cb 100644 --- a/disttask/framework/framework_err_handling_test.go +++ b/disttask/framework/framework_err_handling_test.go @@ -78,21 +78,15 @@ func (*planErrDispatcherExt) IsRetryableErr(error) bool { return true } -func (p *planErrDispatcherExt) StageFinished(task *proto.Task) bool { - if task.Step == proto.StepInit && p.cnt == 3 { - return true +func (p *planErrDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + case proto.StepOne: + return proto.StepTwo + default: + return proto.StepDone } - if task.Step == proto.StepOne && p.cnt == 4 { - return true - } - return false -} - -func (p *planErrDispatcherExt) Finished(task *proto.Task) bool { - if task.Step == proto.StepOne && p.cnt == 4 { - return true - } - return false } type planNotRetryableErrDispatcherExt struct { @@ -118,18 +112,8 @@ func (*planNotRetryableErrDispatcherExt) IsRetryableErr(error) bool { return false } -func (p *planNotRetryableErrDispatcherExt) StageFinished(task *proto.Task) bool { - if task.Step == proto.StepInit && p.cnt >= 3 { - return true - } - if task.Step == proto.StepOne && p.cnt >= 4 { - return true - } - return false -} - -func (p *planNotRetryableErrDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepOne && p.cnt >= 4 +func (p *planNotRetryableErrDispatcherExt) GetNextStep(task *proto.Task) int64 { + return proto.StepDone } func TestPlanErr(t *testing.T) { diff --git a/disttask/framework/framework_ha_test.go b/disttask/framework/framework_ha_test.go index 7cc43d93f2918..c358c96095632 100644 --- a/disttask/framework/framework_ha_test.go +++ b/disttask/framework/framework_ha_test.go @@ -78,18 +78,15 @@ func (*haTestDispatcherExt) IsRetryableErr(error) bool { return true } -func (dsp *haTestDispatcherExt) StageFinished(task *proto.Task) bool { - if task.Step == proto.StepInit && dsp.cnt >= 10 { - return true +func (dsp *haTestDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + case proto.StepOne: + return proto.StepTwo + default: + return proto.StepDone } - if task.Step == proto.StepOne && dsp.cnt >= 15 { - return true - } - return false -} - -func (dsp *haTestDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepOne && dsp.cnt >= 15 } func TestHABasic(t *testing.T) { diff --git a/disttask/framework/framework_rollback_test.go b/disttask/framework/framework_rollback_test.go index 85f9c4500fb6d..9e5c8d0f88c3b 100644 --- a/disttask/framework/framework_rollback_test.go +++ b/disttask/framework/framework_rollback_test.go @@ -65,12 +65,13 @@ func (*rollbackDispatcherExt) IsRetryableErr(error) bool { return true } -func (dsp *rollbackDispatcherExt) StageFinished(task *proto.Task) bool { - return task.Step == proto.StepInit && dsp.cnt >= 3 -} - -func (dsp *rollbackDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepInit && dsp.cnt >= 3 +func (dsp *rollbackDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + default: + return proto.StepDone + } } func registerRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map) { diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index 8f91ca1217e4d..21008e37666a7 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -66,18 +66,15 @@ func (*testDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.TaskHandle, return nil, nil } -func (dsp *testDispatcherExt) StageFinished(task *proto.Task) bool { - if task.Step == proto.StepInit && dsp.cnt >= 3 { - return true - } - if task.Step == proto.StepOne && dsp.cnt >= 4 { - return true +func (dsp *testDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return proto.StepOne + case proto.StepOne: + return proto.StepTwo + default: + return proto.StepDone } - return false -} - -func (dsp *testDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == proto.StepOne && dsp.cnt >= 4 } func generateSchedulerNodes4Test() ([]*infosync.ServerInfo, error) { @@ -116,9 +113,9 @@ func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map, dispat mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { switch subtask.Step { - case proto.StepInit: - m.Store("0", "0") case proto.StepOne: + m.Store("0", "0") + case proto.StepTwo: m.Store("1", "1") default: panic("invalid step") @@ -155,9 +152,9 @@ func RegisterTaskMetaForExample2(t *testing.T, ctrl *gomock.Controller, m *sync. mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { switch subtask.Step { - case proto.StepInit: - m.Store("2", "2") case proto.StepOne: + m.Store("2", "2") + case proto.StepTwo: m.Store("3", "3") default: panic("invalid step") @@ -174,9 +171,9 @@ func RegisterTaskMetaForExample3(t *testing.T, ctrl *gomock.Controller, m *sync. mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { switch subtask.Step { - case proto.StepInit: - m.Store("4", "4") case proto.StepOne: + m.Store("4", "4") + case proto.StepTwo: m.Store("5", "5") default: panic("invalid step") diff --git a/disttask/framework/proto/task.go b/disttask/framework/proto/task.go index c1ab89f1a4d34..1cb6280637585 100644 --- a/disttask/framework/proto/task.go +++ b/disttask/framework/proto/task.go @@ -47,8 +47,11 @@ const ( ) // TaskStep is the step of task. +// DO NOT change the value of the constants, will break backward compatibility. +// successfully task MUST go from StepInit to business steps, then StepDone. const ( - StepInit int64 = 0 + StepInit int64 = -1 + StepDone int64 = -2 StepOne int64 = 1 StepTwo int64 = 2 ) @@ -101,8 +104,9 @@ type Subtask struct { } // NewSubtask create a new subtask. -func NewSubtask(taskID int64, tp, schedulerID string, meta []byte) *Subtask { +func NewSubtask(step int64, taskID int64, tp, schedulerID string, meta []byte) *Subtask { return &Subtask{ + Step: step, Type: tp, TaskID: taskID, SchedulerID: schedulerID, diff --git a/disttask/framework/proto/task_test.go b/disttask/framework/proto/task_test.go new file mode 100644 index 0000000000000..8824a6bc79853 --- /dev/null +++ b/disttask/framework/proto/task_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proto + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTaskStep(t *testing.T) { + // make sure we don't change the value of StepInit accidentally + require.Equal(t, int64(-1), StepInit) + require.Equal(t, int64(-2), StepDone) +} diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 6524f47744e77..d3d11606b36c7 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -622,7 +622,7 @@ func (stm *TaskManager) AddSubTasks(task *proto.Task, subtasks []*proto.Subtask) _, err := ExecSQL(stm.ctx, se, `insert into mysql.tidb_background_subtask (step, task_key, exec_id, meta, state, type, checkpoint, summary) values (%?, %?, %?, %?, %?, %?, %?, %?)`, - task.Step, task.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}") + subtask.Step, task.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}") if err != nil { return err } diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index 0da29143cd187..d3195e94e6704 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -224,7 +224,7 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHan }() switch gTask.Step { - case StepImport: + case proto.StepInit: if metrics, ok := metric.GetCommonMetric(ctx); ok { metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(taskMeta.Plan.TotalFileSize)) } @@ -234,7 +234,7 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHan if err = startJob(ctx, logger, taskHandle, taskMeta); err != nil { return nil, err } - case StepPostProcess: + case StepImport: dsp.switchTiKV2NormalMode(ctx, gTask, logger) failpoint.Inject("clearLastSwitchTime", func() { dsp.lastSwitchTime.Store(time.Time{}) @@ -252,13 +252,13 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHan return nil, err } logger.Info("move to post-process step ", zap.Any("result", taskMeta.Result)) - case StepPostProcess + 1: + case StepPostProcess: return nil, nil default: return nil, errors.Errorf("unknown step %d", gTask.Step) } - previousSubtaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step-1) + previousSubtaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) if err != nil { return nil, err } @@ -271,7 +271,7 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHan if err != nil { return nil, err } - metaBytes, err := physicalPlan.ToSubtaskMetas(planCtx, gTask.Step) + metaBytes, err := physicalPlan.ToSubtaskMetas(planCtx, dsp.GetNextStep(gTask)) if err != nil { return nil, err } @@ -337,14 +337,17 @@ func (*ImportDispatcherExt) IsRetryableErr(error) bool { return false } -// StageFinished check if current stage finished. -func (*ImportDispatcherExt) StageFinished(_ *proto.Task) bool { - return true -} - -// Finished check if current task finished. -func (*ImportDispatcherExt) Finished(task *proto.Task) bool { - return task.Step == StepPostProcess+1 +// GetNextStep implements dispatcher.Extension interface. +func (*ImportDispatcherExt) GetNextStep(task *proto.Task) int64 { + switch task.Step { + case proto.StepInit: + return StepImport + case StepImport: + return StepPostProcess + default: + // current step must be StepPostProcess + return proto.StepDone + } } func (dsp *ImportDispatcherExt) switchTiKV2NormalMode(ctx context.Context, task *proto.Task, logger *zap.Logger) { diff --git a/disttask/importinto/dispatcher_testkit_test.go b/disttask/importinto/dispatcher_testkit_test.go index e359fbe0853a2..c2ad15c0816b5 100644 --- a/disttask/importinto/dispatcher_testkit_test.go +++ b/disttask/importinto/dispatcher_testkit_test.go @@ -92,6 +92,7 @@ func TestDispatcherExt(t *testing.T) { subtaskMetas, err := ext.OnNextSubtasksBatch(ctx, d, task) require.NoError(t, err) require.Len(t, subtaskMetas, 1) + task.Step = ext.GetNextStep(task) require.Equal(t, importinto.StepImport, task.Step) gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) require.NoError(t, err) @@ -99,7 +100,7 @@ func TestDispatcherExt(t *testing.T) { // update task/subtask, and finish subtask, so we can go to next stage subtasks := make([]*proto.Subtask, 0, len(subtaskMetas)) for _, m := range subtaskMetas { - subtasks = append(subtasks, proto.NewSubtask(task.ID, task.Type, "", m)) + subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", m)) } _, err = manager.UpdateGlobalTaskAndAddSubTasks(task, subtasks, proto.TaskStatePending) require.NoError(t, err) @@ -109,20 +110,21 @@ func TestDispatcherExt(t *testing.T) { require.NoError(t, manager.FinishSubtask(s.ID, []byte("{}"))) } // to post-process stage, job should be running and in validating step - task.Step++ subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, d, task) require.NoError(t, err) require.Len(t, subtaskMetas, 1) + task.Step = ext.GetNextStep(task) require.Equal(t, importinto.StepPostProcess, task.Step) gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) require.NoError(t, err) require.Equal(t, "running", gotJobInfo.Status) require.Equal(t, "validating", gotJobInfo.Step) // on next stage, job should be finished - task.Step++ subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, d, task) require.NoError(t, err) require.Len(t, subtaskMetas, 0) + task.Step = ext.GetNextStep(task) + require.Equal(t, proto.StepDone, task.Step) gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) require.NoError(t, err) require.Equal(t, "finished", gotJobInfo.Status) diff --git a/disttask/importinto/proto.go b/disttask/importinto/proto.go index 337ddbd70eb8f..86fd16da44ca1 100644 --- a/disttask/importinto/proto.go +++ b/disttask/importinto/proto.go @@ -32,10 +32,9 @@ import ( // steps are processed in the following order: StepInit -> StepImport -> StepPostProcess const ( // StepImport we sort source data and ingest it into TiKV in this step. - StepImport int64 = 0 + StepImport int64 = 1 // StepPostProcess we verify checksum and add index in this step. - // TODO: Might split into StepValidate and StepAddIndex later. - StepPostProcess int64 = 1 + StepPostProcess int64 = 2 ) // TaskMeta is the task of IMPORT INTO. From 6b359a84051797a142760eafe17ade329fe8335d Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 00:59:51 +0800 Subject: [PATCH 2/7] bazel --- disttask/framework/proto/BUILD.bazel | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/disttask/framework/proto/BUILD.bazel b/disttask/framework/proto/BUILD.bazel index 21bab39839933..c79a7260ad2c3 100644 --- a/disttask/framework/proto/BUILD.bazel +++ b/disttask/framework/proto/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "proto", @@ -6,3 +6,12 @@ go_library( importpath = "github.com/pingcap/tidb/disttask/framework/proto", visibility = ["//visibility:public"], ) + +go_test( + name = "proto_test", + timeout = "short", + srcs = ["task_test.go"], + embed = [":proto"], + flaky = True, + deps = ["@com_github_stretchr_testify//require"], +) From 237027da9a173012da28fc0d6c8772b153a9e553 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 01:58:58 +0800 Subject: [PATCH 3/7] change --- ddl/backfilling_dispatcher.go | 2 +- ddl/index.go | 2 +- ddl/stage_scheduler.go | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ddl/backfilling_dispatcher.go b/ddl/backfilling_dispatcher.go index c679cad1e18e3..a7bb328e388f6 100644 --- a/ddl/backfilling_dispatcher.go +++ b/ddl/backfilling_dispatcher.go @@ -374,7 +374,7 @@ func getSummaryFromLastStep( taskHandle dispatcher.TaskHandle, gTaskID int64, ) (min, max kv.Key, totalKVSize uint64, dataFiles, statFiles []string, err error) { - subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTaskID, proto.StepInit) + subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTaskID, proto.StepOne) if err != nil { return nil, nil, 0, nil, nil, errors.Trace(err) } diff --git a/ddl/index.go b/ddl/index.go index 1bb5fb6747ad6..8cfbd352ae18d 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1959,7 +1959,7 @@ func (w *worker) updateJobRowCount(taskKey string, jobID int64) { logutil.BgLogger().Warn("cannot get global task", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err)) return } - rowCount, err := taskMgr.GetSubtaskRowCount(gTask.ID, proto.StepInit) + rowCount, err := taskMgr.GetSubtaskRowCount(gTask.ID, proto.StepOne) if err != nil { logutil.BgLogger().Warn("cannot get subtask row count", zap.String("category", "ddl"), zap.String("task_key", taskKey), zap.Error(err)) return diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index f720a1a06cdb3..3ca92198c996b 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -79,13 +79,13 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, } switch stage { - case proto.StepInit: + case proto.StepOne: jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta) d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) return newReadIndexStage( d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil - case proto.StepOne: + case proto.StepTwo: if len(bgm.CloudStorageURI) > 0 { return newMergeSortStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) } @@ -114,7 +114,7 @@ func newBackfillDistScheduler(ctx context.Context, id string, taskID int64, task func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) { switch task.Step { - case proto.StepInit, proto.StepOne: + case proto.StepOne, proto.StepTwo: return NewBackfillSchedulerHandle(ctx, task.Meta, s.d, task.Step, summary) default: return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID) From c42130b574ac7589eadfa0d4d363acb734448bc0 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 11:08:06 +0800 Subject: [PATCH 4/7] change --- disttask/importinto/dispatcher.go | 2 +- executor/import_into.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index d3195e94e6704..79c4a0147e43c 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -503,7 +503,7 @@ func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[i // we will update taskMeta in place and make gTask.Meta point to the new taskMeta. func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error { - metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step-1) + metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) if err != nil { return err } diff --git a/executor/import_into.go b/executor/import_into.go index b63d0f14a4137..cd0a444b92dfb 100644 --- a/executor/import_into.go +++ b/executor/import_into.go @@ -295,7 +295,6 @@ func flushStats(ctx context.Context, se sessionctx.Context, tableID int64, resul func cancelImportJob(ctx context.Context, manager *fstorage.TaskManager, jobID int64) error { // todo: cancel is async operation, we don't wait here now, maybe add a wait syntax later. // todo: after CANCEL, user can see the job status is Canceled immediately, but the job might still running. - // and the state of framework task might became finished since framework don't force state change DAG when update task. // todo: add a CANCELLING status? return manager.WithNewTxn(ctx, func(se sessionctx.Context) error { exec := se.(sqlexec.SQLExecutor) From e573f2d2a9328371369593386ba119c764b2e281 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 11:51:31 +0800 Subject: [PATCH 5/7] change --- disttask/framework/framework_dynamic_dispatch_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/disttask/framework/framework_dynamic_dispatch_test.go b/disttask/framework/framework_dynamic_dispatch_test.go index 53b5d0051854b..30057c9558ebf 100644 --- a/disttask/framework/framework_dynamic_dispatch_test.go +++ b/disttask/framework/framework_dynamic_dispatch_test.go @@ -91,7 +91,6 @@ func TestFrameworkDynamicBasic(t *testing.T) { } func TestFrameworkDynamicHA(t *testing.T) { - t.Skip("skip this test because the function is not implemented yet") var m sync.Map ctrl := gomock.NewController(t) defer ctrl.Finish() From ac8fa04d4ce27b9ea943953a8c216bf5d9f464e1 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 19:37:51 +0800 Subject: [PATCH 6/7] fix due to conflict --- disttask/framework/storage/task_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 46ff118f5696f..6cdc66a57431d 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -677,7 +677,7 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas _, err = ExecSQL(stm.ctx, se, `insert into mysql.tidb_background_subtask (step, task_key, exec_id, meta, state, type, checkpoint, summary) values (%?, %?, %?, %?, %?, %?, %?, %?)`, - gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}") + subtask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{}, "{}") if err != nil { return err } From bad731a82f36043310d6f8c1166ed060ddd34d64 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 14 Sep 2023 21:06:41 +0800 Subject: [PATCH 7/7] fix test due to conflict --- disttask/framework/storage/table_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/disttask/framework/storage/table_test.go b/disttask/framework/storage/table_test.go index 4553635c8d0e9..33cb7b138f63c 100644 --- a/disttask/framework/storage/table_test.go +++ b/disttask/framework/storage/table_test.go @@ -322,11 +322,13 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) { task.State = proto.TaskStateRunning subTasks := []*proto.Subtask{ { + Step: proto.StepInit, Type: proto.TaskTypeExample, SchedulerID: "instance1", Meta: []byte("m1"), }, { + Step: proto.StepInit, Type: proto.TaskTypeExample, SchedulerID: "instance2", Meta: []byte("m2"), @@ -361,11 +363,13 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) { task.State = proto.TaskStateReverting subTasks = []*proto.Subtask{ { + Step: proto.StepInit, Type: proto.TaskTypeExample, SchedulerID: "instance3", Meta: []byte("m3"), }, { + Step: proto.StepInit, Type: proto.TaskTypeExample, SchedulerID: "instance4", Meta: []byte("m4"),