Skip to content

Commit

Permalink
disttask: remove mini executor abstraction (pingcap#46922)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Sep 13, 2023
1 parent d399c47 commit 3735e11
Show file tree
Hide file tree
Showing 28 changed files with 235 additions and 648 deletions.
2 changes: 1 addition & 1 deletion .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ header:
- "build/image/.ci_bazel"
- "**/OWNERS"
- "OWNERS_ALIASES"
- "disttask/framework/mock/**/*_mock.go"
- "disttask/**/mock/**/*_mock.go"
- "util/sqlexec/mock/*_mock.go"
comment: on-failure
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor,MiniTaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/util/sqlexec RestrictedSQLExecutor > util/sqlexec/mock/restricted_sql_executor_mock.go

Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
}

scheduler.RegisterTaskType(BackfillTaskType,
func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable, pool scheduler.Pool) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, taskID, taskTable, pool, d)
func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, taskID, taskTable, d)
}, scheduler.WithSummary,
)

Expand Down
4 changes: 2 additions & 2 deletions ddl/stage_ingest_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (i *ingestIndexStage) Init(ctx context.Context) error {
return err
}

func (*ingestIndexStage) SplitSubtask(ctx context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) {
func (*ingestIndexStage) RunSubtask(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("ingest index stage split subtask")
return nil, nil
return nil
}

func (i *ingestIndexStage) Cleanup(ctx context.Context) error {
Expand Down
11 changes: 5 additions & 6 deletions ddl/stage_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (*mergeSortStage) Init(ctx context.Context) error {
return nil
}

func (m *mergeSortStage) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) {
func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("merge sort stage split subtask")

sm := &BackfillSubTaskMeta{}
Expand All @@ -67,13 +67,12 @@ func (m *mergeSortStage) SplitSubtask(ctx context.Context, subtask *proto.Subtas
logutil.BgLogger().Error("unmarshal error",
zap.String("category", "ddl"),
zap.Error(err))
return nil, err
return err
}

local := m.bc.GetLocalBackend()
if local == nil {
return nil,
errors.Errorf("local backend not found")
return errors.Errorf("local backend not found")
}
_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID))
err = local.CloseEngine(ctx, &backend.EngineConfig{
Expand All @@ -89,10 +88,10 @@ func (m *mergeSortStage) SplitSubtask(ctx context.Context, subtask *proto.Subtas
},
}, engineUUID)
if err != nil {
return nil, err
return err
}
err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys))
return nil, err
return err
}

func (m *mergeSortStage) Cleanup(ctx context.Context) error {
Expand Down
18 changes: 9 additions & 9 deletions ddl/stage_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (*readIndexStage) Init(_ context.Context) error {
return nil
}

func (r *readIndexStage) SplitSubtask(ctx context.Context, subtask *proto.Subtask) ([]proto.MinimalTask, error) {
func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.BgLogger().Info("read index stage run subtask",
zap.String("category", "ddl"))

Expand All @@ -102,18 +102,18 @@ func (r *readIndexStage) SplitSubtask(ctx context.Context, subtask *proto.Subtas
logutil.BgLogger().Error("unmarshal error",
zap.String("category", "ddl"),
zap.Error(err))
return nil, err
return err
}

startKey, endKey, tbl, err := r.getTableStartEndKey(sm)
if err != nil {
return nil, err
return err
}

sessCtx, err := newSessCtx(
d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName)
if err != nil {
return nil, err
return err
}

opCtx := NewOperatorCtx(ctx)
Expand All @@ -127,23 +127,23 @@ func (r *readIndexStage) SplitSubtask(ctx context.Context, subtask *proto.Subtas
pipe, err = r.buildLocalStorePipeline(opCtx, d, sessCtx, tbl, startKey, endKey, totalRowCount)
}
if err != nil {
return nil, err
return err
}

err = pipe.Execute()
if err != nil {
return nil, err
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return nil, opCtx.OperatorErr()
return opCtx.OperatorErr()
}
if err != nil {
return nil, err
return err
}

r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load())
return nil, nil
return nil
}

func (r *readIndexStage) Cleanup(ctx context.Context) error {
Expand Down
8 changes: 2 additions & 6 deletions ddl/stage_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type backfillDistScheduler struct {
d *ddl
}

func newBackfillDistScheduler(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable, pool scheduler.Pool, d *ddl) scheduler.Scheduler {
func newBackfillDistScheduler(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler {
s := &backfillDistScheduler{
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, taskID, taskTable, pool),
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, taskID, taskTable),
d: d,
}
s.BaseScheduler.Extension = s
Expand All @@ -120,7 +120,3 @@ func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *pr
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}
}

func (*backfillDistScheduler) GetMiniTaskExecutor(_ proto.MinimalTask, _ string, _ int64) (execute.MiniTaskExecutor, error) {
return &scheduler.EmptyMiniTaskExecutor{}, nil
}
2 changes: 1 addition & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ go_test(
deps = [
"//disttask/framework/dispatcher",
"//disttask/framework/mock",
"//disttask/framework/mock/execute",
"//disttask/framework/proto",
"//disttask/framework/scheduler",
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//domain/infosync",
"//testkit",
Expand Down
64 changes: 18 additions & 46 deletions disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/disttask/framework/dispatcher"
"github.com/pingcap/tidb/disttask/framework/mock"
mockexecute "github.com/pingcap/tidb/disttask/framework/mock/execute"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -72,54 +73,25 @@ func (dsp *rollbackDispatcherExt) Finished(task *proto.Task) bool {
return task.Step == proto.StepInit && dsp.cnt >= 3
}

type testRollbackMiniTask struct{}

func (testRollbackMiniTask) IsMinimalTask() {}

func (testRollbackMiniTask) String() string {
return ""
}

type rollbackScheduler struct {
m *sync.Map
}

func (*rollbackScheduler) Init(_ context.Context) error { return nil }

func (t *rollbackScheduler) Cleanup(_ context.Context) error { return nil }

func (t *rollbackScheduler) Rollback(_ context.Context) error {
t.m = &sync.Map{}
rollbackCnt.Add(1)
return nil
}

func (t *rollbackScheduler) SplitSubtask(_ context.Context, _ *proto.Subtask) ([]proto.MinimalTask, error) {
return []proto.MinimalTask{
testRollbackMiniTask{},
testRollbackMiniTask{},
testRollbackMiniTask{},
}, nil
}

func (t *rollbackScheduler) OnFinished(_ context.Context, _ *proto.Subtask) error {
return nil
}

type rollbackSubtaskExecutor struct {
m *sync.Map
}

func (e *rollbackSubtaskExecutor) Run(_ context.Context) error {
e.m.Store("1", "1")
return nil
}

func registerRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map) {
mockExtension := mock.NewMockExtension(ctrl)
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&rollbackScheduler{m: m}, nil).AnyTimes()
mockExtension.EXPECT().GetMiniTaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&rollbackSubtaskExecutor{m: m}, nil).AnyTimes()
registerTaskMetaInner(t, mockExtension, &rollbackDispatcherExt{})
mockExecutor := mockexecute.NewMockSubtaskExecutor(ctrl)
mockExecutor.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes()
mockExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes()
mockExecutor.EXPECT().Rollback(gomock.Any()).DoAndReturn(
func(_ context.Context) error {
rollbackCnt.Add(1)
return nil
},
).AnyTimes()
mockExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ *proto.Subtask) error {
m.Store("1", "1")
return nil
}).AnyTimes()
mockExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes()
registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, &rollbackDispatcherExt{})
rollbackCnt.Store(0)
}

Expand Down
Loading

0 comments on commit 3735e11

Please sign in to comment.