Skip to content

Commit

Permalink
disktask: replace failure nodes with alive ones (#45935)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
xhebox authored Sep 4, 2023
1 parent 7a4c566 commit 1901239
Show file tree
Hide file tree
Showing 18 changed files with 579 additions and 101 deletions.
23 changes: 13 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ tools/bin/vfsgendev:
tools/bin/gotestsum:
GOBIN=$(shell pwd)/tools/bin $(GO) install gotest.tools/gotestsum@v1.8.1

tools/bin/mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/mockgen@v0.2.0

# Usage:
#
# $ make vectorized-bench VB_FILE=Time VB_FUNC=builtinCurrentDateSig
Expand Down Expand Up @@ -370,18 +373,18 @@ br_compatibility_test_prepare:
br_compatibility_test:
@cd br && tests/run_compatible.sh run

mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go
mock_s3iface: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock:
@mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go
mock_lightning: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
3 changes: 2 additions & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ go_test(
timeout = "short",
srcs = [
"framework_err_handling_test.go",
"framework_ha_test.go",
"framework_rollback_test.go",
"framework_test.go",
],
flaky = True,
race = "on",
shard_count = 14,
shard_count = 22,
deps = [
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//sessionctx/variable",
"//util",
"//util/disttask",
"//util/intest",
"//util/logutil",
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
Expand Down
106 changes: 97 additions & 9 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dispatcher
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/pingcap/errors"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -36,6 +38,8 @@ const (
DefaultSubtaskConcurrency = 16
// MaxSubtaskConcurrency is the maximum concurrency for handling subtask.
MaxSubtaskConcurrency = 256
// DefaultLiveNodesCheckInterval is the tick interval of fetching all server infos from etcd.
DefaultLiveNodesCheckInterval = 2
)

var (
Expand Down Expand Up @@ -65,6 +69,17 @@ type dispatcher struct {
logCtx context.Context
serverID string
impl Dispatcher

// for HA
// liveNodes will fetch and store all live nodes every liveNodeInterval ticks.
liveNodes []*infosync.ServerInfo
liveNodeFetchInterval int
// liveNodeFetchTick is the tick variable.
liveNodeFetchTick int
// taskNodes stores the id of current scheduler nodes.
taskNodes []string
// rand is for generating random selection of nodes.
rand *rand.Rand
}

// MockOwnerChange mock owner change in tests.
Expand All @@ -74,12 +89,17 @@ func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, serverID s
logPrefix := fmt.Sprintf("task_id: %d, task_type: %s, server_id: %s", task.ID, task.Type, serverID)
impl := GetTaskDispatcher(task.Type)
dsp := &dispatcher{
ctx: ctx,
taskMgr: taskMgr,
task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
serverID: serverID,
impl: impl,
ctx: ctx,
taskMgr: taskMgr,
task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
serverID: serverID,
impl: impl,
liveNodes: nil,
liveNodeFetchInterval: DefaultLiveNodesCheckInterval,
liveNodeFetchTick: 0,
taskNodes: nil,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
if dsp.impl == nil {
logutil.BgLogger().Warn("gen dispatcher impl failed, this type impl doesn't register")
Expand Down Expand Up @@ -215,12 +235,70 @@ func (d *dispatcher) onRunning() error {
logutil.Logger(d.logCtx).Info("previous stage finished, generate dist plan", zap.Int64("stage", d.task.Step))
return d.onNextStage()
}
// Check if any node are down.
if err := d.replaceDeadNodesIfAny(); err != nil {
return err
}
// Wait all subtasks in this stage finished.
d.impl.OnTick(d.ctx, d.task)
logutil.Logger(d.logCtx).Debug("on running state, this task keeps current state", zap.String("state", d.task.State))
return nil
}

func (d *dispatcher) replaceDeadNodesIfAny() error {
if len(d.taskNodes) == 0 {
return errors.Errorf("len(d.taskNodes) == 0, onNextStage is not invoked before onRunning")
}
d.liveNodeFetchTick++
if d.liveNodeFetchTick == d.liveNodeFetchInterval {
d.liveNodeFetchTick = 0
serverInfos, err := GenerateSchedulerNodes(d.ctx)
if err != nil {
return err
}
eligibleServerInfos, err := d.impl.GetEligibleInstances(d.ctx, d.task)
if err != nil {
return err
}
newInfos := serverInfos[:0]
for _, m := range serverInfos {
found := false
for _, n := range eligibleServerInfos {
if m.ID == n.ID {
found = true
break
}
}
if found {
newInfos = append(newInfos, m)
}
}
d.liveNodes = newInfos
}
if len(d.liveNodes) > 0 {
replaceNodes := make(map[string]string)
for _, nodeID := range d.taskNodes {
if ok := disttaskutil.MatchServerInfo(d.liveNodes, nodeID); !ok {
n := d.liveNodes[d.rand.Int()%len(d.liveNodes)] //nolint:gosec
replaceNodes[nodeID] = disttaskutil.GenerateExecID(n.IP, n.Port)
}
}
if err := d.taskMgr.UpdateFailedSchedulerIDs(d.task.ID, replaceNodes); err != nil {
return err
}
// replace local cache.
for k, v := range replaceNodes {
for m, n := range d.taskNodes {
if n == k {
d.taskNodes[m] = v
break
}
}
}
}
return nil
}

func (d *dispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
prevState := d.task.State
d.task.State = taskState
Expand Down Expand Up @@ -331,6 +409,10 @@ func (d *dispatcher) dispatchSubTask(task *proto.Task, metas [][]byte) error {
if len(serverNodes) == 0 {
return errors.New("no available TiDB node to dispatch subtasks")
}
d.taskNodes = make([]string, len(serverNodes))
for i := range serverNodes {
d.taskNodes[i] = disttaskutil.GenerateExecID(serverNodes[i].IP, serverNodes[i].Port)
}
subTasks := make([]*proto.Subtask, 0, len(metas))
for i, meta := range metas {
// we assign the subtask to the instance in a round-robin way.
Expand All @@ -353,16 +435,22 @@ func (d *dispatcher) handlePlanErr(err error) error {
}

// GenerateSchedulerNodes generate a eligible TiDB nodes.
func GenerateSchedulerNodes(ctx context.Context) ([]*infosync.ServerInfo, error) {
serverInfos, err := infosync.GetAllServerInfo(ctx)
func GenerateSchedulerNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
var serverInfos map[string]*infosync.ServerInfo
_, etcd := ctx.Value("etcd").(bool)
if intest.InTest && !etcd {
serverInfos = infosync.MockGlobalServerInfoManagerEntry.GetAllServerInfo()
} else {
serverInfos, err = infosync.GetAllServerInfo(ctx)
}
if err != nil {
return nil, err
}
if len(serverInfos) == 0 {
return nil, errors.New("not found instance")
}

serverNodes := make([]*infosync.ServerInfo, 0, len(serverInfos))
serverNodes = make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, serverInfo := range serverInfos {
serverNodes = append(serverNodes, serverInfo)
}
Expand Down
28 changes: 8 additions & 20 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (*numberExampleDispatcher) IsRetryableErr(error) bool {
}

func MockDispatcherManager(t *testing.T, pool *pools.ResourcePool) (*dispatcher.Manager, *storage.TaskManager) {
ctx := context.Background()
ctx := context.WithValue(context.Background(), "etcd", true)
mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool)
storage.SetTaskManager(mgr)
dsp, err := dispatcher.NewManager(util.WithInternalSourceType(ctx, "dispatcher"), mgr, "host:port")
Expand Down Expand Up @@ -220,31 +220,19 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
// 3s
cnt := 60
checkGetRunningTaskCnt := func(expected int) {
var retCnt int
for i := 0; i < cnt; i++ {
retCnt = dsp.GetRunningTaskCnt()
if retCnt == expected {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, expected)
require.Eventually(t, func() bool {
return dsp.GetRunningTaskCnt() == expected
}, time.Second, 50*time.Millisecond)
}

checkTaskRunningCnt := func() []*proto.Task {
var retCnt int
var tasks []*proto.Task
var err error
for i := 0; i < cnt; i++ {
require.Eventually(t, func() bool {
var err error
tasks, err = mgr.GetGlobalTasksInStates(proto.TaskStateRunning)
require.NoError(t, err)
retCnt = len(tasks)
if retCnt == taskCnt {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, taskCnt)
return len(tasks) == taskCnt
}, time.Second, 50*time.Millisecond)
return tasks
}

Expand Down
Loading

0 comments on commit 1901239

Please sign in to comment.