Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask, ddl: rename flowhandle #46497

Merged
merged 13 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
name = "ddl",
srcs = [
"backfilling.go",
"backfilling_dispatcher.go",
"backfilling_operator.go",
"backfilling_scheduler.go",
"callback.go",
Expand All @@ -28,7 +29,6 @@ go_library(
"delete_range.go",
"delete_range_util.go",
"dist_owner.go",
"disttask_flow.go",
"foreign_key.go",
"generated_column.go",
"index.go",
Expand Down Expand Up @@ -167,6 +167,7 @@ go_test(
timeout = "moderate",
srcs = [
"attributes_sql_test.go",
"backfilling_dispatcher_test.go",
"backfilling_test.go",
"cancel_test.go",
"cluster_test.go",
Expand All @@ -189,7 +190,6 @@ go_test(
"ddl_test.go",
"ddl_worker_test.go",
"ddl_workerpool_test.go",
"disttask_flow_test.go",
"export_test.go",
"fail_test.go",
"foreign_key_test.go",
Expand Down
23 changes: 12 additions & 11 deletions ddl/disttask_flow.go → ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,28 @@
"github.com/tikv/client-go/v2/tikv"
)

type litBackfillFlowHandle struct {
type backfillingDispatcher struct {
d *ddl
}

var _ dispatcher.TaskFlowHandle = (*litBackfillFlowHandle)(nil)
var _ dispatcher.Dispatcher = (*backfillingDispatcher)(nil)

// NewLitBackfillFlowHandle creates a new litBackfillFlowHandle.
func NewLitBackfillFlowHandle(d DDL) (dispatcher.TaskFlowHandle, error) {
// NewBackfillingDispatcher creates a new backfillingDispatcher.
func NewBackfillingDispatcher(d DDL) (dispatcher.Dispatcher, error) {
ddl, ok := d.(*ddl)
if !ok {
return nil, errors.New("The getDDL result should be the type of *ddl")
}
return &litBackfillFlowHandle{
return &backfillingDispatcher{
d: ddl,
}, nil
}

func (*litBackfillFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
func (*backfillingDispatcher) OnTick(_ context.Context, _ *proto.Task) {

Check warning on line 52 in ddl/backfilling_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

ddl/backfilling_dispatcher.go#L52

Added line #L52 was not covered by tests
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
// OnNextStage generate next stage's plan.
func (h *backfillingDispatcher) OnNextStage(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
var globalTaskMeta BackfillGlobalMeta
if err := json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil {
return nil, err
Expand Down Expand Up @@ -111,20 +111,21 @@
}
}

func (*litBackfillFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErr []error) (meta []byte, err error) {
// OnErrStage generate error handling stage's plan.
func (*backfillingDispatcher) OnErrStage(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErr []error) (meta []byte, err error) {
// We do not need extra meta info when rolling back
firstErr := receiveErr[0]
task.Error = firstErr

return nil, nil
}

func (*litBackfillFlowHandle) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {
func (*backfillingDispatcher) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {

Check warning on line 123 in ddl/backfilling_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

ddl/backfilling_dispatcher.go#L123

Added line #L123 was not covered by tests
return dispatcher.GenerateSchedulerNodes(ctx)
}

// IsRetryableErr implements TaskFlowHandle.IsRetryableErr interface.
func (*litBackfillFlowHandle) IsRetryableErr(error) bool {
func (*backfillingDispatcher) IsRetryableErr(error) bool {

Check warning on line 128 in ddl/backfilling_dispatcher.go

View check run for this annotation

Codecov / codecov/patch

ddl/backfilling_dispatcher.go#L128

Added line #L128 was not covered by tests
return true
}

Expand Down
20 changes: 10 additions & 10 deletions ddl/disttask_flow_test.go → ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import (
"github.com/stretchr/testify/require"
)

func TestBackfillFlowHandle(t *testing.T) {
func TestBackfillingDispatcher(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handler, err := ddl.NewLitBackfillFlowHandle(dom.DDL())
dsp, err := ddl.NewBackfillingDispatcher(dom.DDL())
require.NoError(t, err)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// test partition table ProcessNormalFlow
// test partition table OnNextStage.
tk.MustExec("create table tp1(id int primary key, v int) PARTITION BY RANGE (id) (\n " +
"PARTITION p0 VALUES LESS THAN (10),\n" +
"PARTITION p1 VALUES LESS THAN (100),\n" +
Expand All @@ -48,7 +48,7 @@ func TestBackfillFlowHandle(t *testing.T) {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tp1"))
require.NoError(t, err)
tblInfo := tbl.Meta()
metas, err := handler.ProcessNormalFlow(context.Background(), nil, gTask)
metas, err := dsp.OnNextStage(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, proto.StepOne, gTask.Step)
require.Equal(t, len(tblInfo.Partition.Definitions), len(metas))
Expand All @@ -58,24 +58,24 @@ func TestBackfillFlowHandle(t *testing.T) {
require.Equal(t, par.ID, subTask.PhysicalTableID)
}

// test partition table ProcessNormalFlow after step1 finished
// test partition table OnNextStage after step1 finished.
gTask.State = proto.TaskStateRunning
metas, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
metas, err = dsp.OnNextStage(context.Background(), nil, gTask)
require.NoError(t, err)
require.Equal(t, 0, len(metas))

// test partition table ProcessErrFlow
errMeta, err := handler.ProcessErrFlow(context.Background(), nil, gTask, []error{errors.New("mockErr")})
// test partition table OnErrStage.
errMeta, err := dsp.OnErrStage(context.Background(), nil, gTask, []error{errors.New("mockErr")})
require.NoError(t, err)
require.Nil(t, errMeta)

errMeta, err = handler.ProcessErrFlow(context.Background(), nil, gTask, []error{errors.New("mockErr")})
errMeta, err = dsp.OnErrStage(context.Background(), nil, gTask, []error{errors.New("mockErr")})
require.NoError(t, err)
require.Nil(t, errMeta)

tk.MustExec("create table t1(id int primary key, v int)")
gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.BackfillTaskType)
_, err = handler.ProcessNormalFlow(context.Background(), nil, gTask)
_, err = dsp.OnNextStage(context.Background(), nil, gTask)
require.NoError(t, err)
}

Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,11 @@
return NewBackfillSchedulerHandle(ctx, taskMeta, d, step == proto.StepTwo)
})

backfillHandle, err := NewLitBackfillFlowHandle(d)
backFillDsp, err := NewBackfillingDispatcher(d)
if err != nil {
logutil.BgLogger().Warn("NewLitBackfillFlowHandle failed", zap.String("category", "ddl"), zap.Error(err))
logutil.BgLogger().Warn("NewBackfillingDispatcher failed", zap.String("category", "ddl"), zap.Error(err))

Check warning on line 695 in ddl/ddl.go

View check run for this annotation

Codecov / codecov/patch

ddl/ddl.go#L695

Added line #L695 was not covered by tests
} else {
dispatcher.RegisterTaskFlowHandle(BackfillTaskType, backfillHandle)
dispatcher.RegisterTaskDispatcher(BackfillTaskType, backFillDsp)
scheduler.RegisterSubtaskExectorConstructor(BackfillTaskType, proto.StepOne,
func(proto.MinimalTask, int64) (scheduler.SubtaskExecutor, error) {
return &scheduler.EmptyExecutor{}, nil
Expand Down
8 changes: 0 additions & 8 deletions disttask/example/BUILD.bazel

This file was deleted.

38 changes: 0 additions & 38 deletions disttask/example/proto.go

This file was deleted.

4 changes: 1 addition & 3 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ go_library(
srcs = [
"dispatcher.go",
"dispatcher_manager.go",
"dispatcher_mock.go",
"register.go",
"interface.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/dispatcher",
visibility = ["//visibility:public"],
Expand All @@ -24,7 +23,6 @@ go_library(
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//mock",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
Expand Down
67 changes: 31 additions & 36 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,46 @@
retrySQLInterval = 500 * time.Millisecond
)

// TaskHandle provides the interface for operations needed by task flow handles.
// TaskHandle provides the interface for operations needed by Dispatcher.
// Then we can use dispatcher's function in Dispatcher interface.
type TaskHandle interface {
// GetAllSchedulerIDs gets handles the task's all scheduler instances.
GetAllSchedulerIDs(ctx context.Context, handle TaskFlowHandle, task *proto.Task) ([]string, error)
// GetPreviousSubtaskMetas gets previous subtask metas.
GetPreviousSubtaskMetas(taskID int64, step int64) ([][]byte, error)
storage.SessionExecutor
}

// Manage the lifetime of a task
// Manage the lifetime of a task.
// including submitting subtasks and updating the status of a task.
type dispatcher struct {
ctx context.Context
taskMgr *storage.TaskManager
task *proto.Task
logCtx context.Context
serverID string
impl Dispatcher
}

// MockOwnerChange mock owner change in tests.
var MockOwnerChange func()

func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) *dispatcher {
func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) (*dispatcher, error) {
logPrefix := fmt.Sprintf("task_id: %d, task_type: %s, server_id: %s", task.ID, task.Type, serverID)
return &dispatcher{
impl := GetTaskDispatcher(task.Type)
dsp := &dispatcher{
ctx: ctx,
taskMgr: taskMgr,
task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
serverID: serverID,
impl: impl,
}
if dsp.impl == nil {
logutil.BgLogger().Warn("gen dispatcher impl failed, this type impl doesn't register")
dsp.task.Error = errors.New("unsupported task type")
// state transform: pending -> failed.
return nil, dsp.updateTask(proto.TaskStateFailed, nil, retrySQLTimes)
}
return dsp, nil
}

// ExecuteTask start to schedule a task.
Expand Down Expand Up @@ -170,7 +179,7 @@
return d.updateTask(proto.TaskStateReverted, nil, retrySQLTimes)
}
// Wait all subtasks in this stage finished.
GetTaskFlowHandle(d.task.Type).OnTicker(d.ctx, d.task)
d.impl.OnTick(d.ctx, d.task)
logutil.Logger(d.logCtx).Debug("on reverting state, this task keeps current state", zap.String("state", d.task.State))
return nil
}
Expand Down Expand Up @@ -207,7 +216,7 @@
return d.onNextStage()
}
// Wait all subtasks in this stage finished.
GetTaskFlowHandle(d.task.Type).OnTicker(d.ctx, d.task)
d.impl.OnTick(d.ctx, d.task)
logutil.Logger(d.logCtx).Debug("on running state, this task keeps current state", zap.String("state", d.task.State))
return nil
}
Expand Down Expand Up @@ -245,27 +254,20 @@
}

func (d *dispatcher) onErrHandlingStage(receiveErr []error) error {
// TODO: Maybe it gets GetTaskFlowHandle fails when rolling upgrades.
// 1. generate the needed task meta and subTask meta (dist-plan).
handle := GetTaskFlowHandle(d.task.Type)
if handle == nil {
logutil.Logger(d.logCtx).Warn("gen task flow handle failed, this type handle doesn't register")
// state transform: pending --> running --> canceling --> failed.
return d.updateTask(proto.TaskStateFailed, nil, retrySQLTimes)
}
meta, err := handle.ProcessErrFlow(d.ctx, d, d.task, receiveErr)
meta, err := d.impl.OnErrStage(d.ctx, d, d.task, receiveErr)
if err != nil {
// processErrFlow must be retryable, if not, there will have resource leak for tasks.
// OnErrStage must be retryable, if not, there will have resource leak for tasks.

Check warning on line 260 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L260

Added line #L260 was not covered by tests
logutil.Logger(d.logCtx).Warn("handle error failed", zap.Error(err))
return err
}

// 2. dispatch revert dist-plan to EligibleInstances.
return d.dispatchSubTask4Revert(d.task, handle, meta)
return d.dispatchSubTask4Revert(d.task, meta)
}

func (d *dispatcher) dispatchSubTask4Revert(task *proto.Task, handle TaskFlowHandle, meta []byte) error {
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, handle, task)
func (d *dispatcher) dispatchSubTask4Revert(task *proto.Task, meta []byte) error {
instanceIDs, err := d.GetAllSchedulerIDs(d.ctx, task)
if err != nil {
logutil.Logger(d.logCtx).Warn("get task's all instances failed", zap.Error(err))
return err
Expand All @@ -280,22 +282,15 @@

func (d *dispatcher) onNextStage() error {
// 1. generate the needed global task meta and subTask meta (dist-plan).
handle := GetTaskFlowHandle(d.task.Type)
if handle == nil {
logutil.Logger(d.logCtx).Warn("gen task flow handle failed, this type handle doesn't register", zap.String("type", d.task.Type))
d.task.Error = errors.New("unsupported task type")
// state transform: pending -> failed.
return d.updateTask(proto.TaskStateFailed, nil, retrySQLTimes)
}
metas, err := handle.ProcessNormalFlow(d.ctx, d, d.task)
metas, err := d.impl.OnNextStage(d.ctx, d, d.task)
if err != nil {
return d.handlePlanErr(handle, err)
return d.handlePlanErr(err)
}
// 2. dispatch dist-plan to EligibleInstances.
return d.dispatchSubTask(d.task, handle, metas)
return d.dispatchSubTask(d.task, metas)
}

func (d *dispatcher) dispatchSubTask(task *proto.Task, handle TaskFlowHandle, metas [][]byte) error {
func (d *dispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error {
logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.String("state", d.task.State), zap.Uint64("concurrency", d.task.Concurrency), zap.Int("subtasks", len(metas)))
// 1. Adjust the global task's concurrency.
if task.Concurrency == 0 {
Expand Down Expand Up @@ -327,7 +322,7 @@
}

// 3. select all available TiDB nodes for task.
serverNodes, err := handle.GetEligibleInstances(d.ctx, task)
serverNodes, err := d.impl.GetEligibleInstances(d.ctx, task)
logutil.Logger(d.logCtx).Debug("eligible instances", zap.Int("num", len(serverNodes)))

if err != nil {
Expand All @@ -347,9 +342,9 @@
return d.updateTask(proto.TaskStateRunning, subTasks, retrySQLTimes)
}

func (d *dispatcher) handlePlanErr(handle TaskFlowHandle, err error) error {
func (d *dispatcher) handlePlanErr(err error) error {
logutil.Logger(d.logCtx).Warn("generate plan failed", zap.Error(err), zap.String("state", d.task.State))
if handle.IsRetryableErr(err) {
if d.impl.IsRetryableErr(err) {
return err
}
d.task.Error = err
Expand All @@ -375,8 +370,8 @@
}

// GetAllSchedulerIDs gets all the scheduler IDs.
func (d *dispatcher) GetAllSchedulerIDs(ctx context.Context, handle TaskFlowHandle, task *proto.Task) ([]string, error) {
serverInfos, err := handle.GetEligibleInstances(ctx, task)
func (d *dispatcher) GetAllSchedulerIDs(ctx context.Context, task *proto.Task) ([]string, error) {
serverInfos, err := d.impl.GetEligibleInstances(ctx, task)
if err != nil {
return nil, err
}
Expand Down
Loading