diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index db4a0a32988..9786b6e4b1e 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -25,6 +25,7 @@ import ( cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -144,6 +145,15 @@ func (m *feedStateManager) Tick( // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true + changefeedErrorStuckDuration := util.GetOrZero(m.state.Info.Config.ChangefeedErrorStuckDuration) + if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { + log.Info("changefeedErrorStuckDuration update", + zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), + zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), + ) + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration + } return } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index e81ae7584ac..d4002d68d41 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -985,7 +986,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.True(t, manager.ShouldRunning()) // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, - // the changefeed state will remain warning whena new warning is encountered. + // the changefeed state will remain warning when a new warning is encountered. time.Sleep(manager.changefeedErrorStuckDuration + 10) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.NotNil(t, status) @@ -1050,3 +1051,104 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.Equal(t, model.StateFailed, state.Info.State) require.False(t, manager.ShouldRunning()) } + +func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(true) + manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + ctx.ChangefeedVars().ID) + + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{}, true, nil + }) + tester.MustApplyPatches() + manager.state = state + manager.Tick(state, 0) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + + stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3 + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + time.Sleep(stuckDuration - time.Second) + manager.Tick(state, 100) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateFailed) + + // update ChangefeedErrorStuckDuration + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.NotNil(t, info) + info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) + return info, true, nil + }) + // update status + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, true, nil + }) + tester.MustApplyPatches() + + // resume the changefeed in failed state + manager.PushAdminJob(&model.AdminJob{ + CfID: ctx.ChangefeedVars().ID, + Type: model.AdminResume, + OverwriteCheckpointTs: 100, + }) + manager.Tick(state, 101) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.AdminJobType, model.AdminNone) + require.Equal(t, state.Status.AdminJobType, model.AdminNone) + + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + + time.Sleep(stuckDuration - time.Second) + manager.Tick(state, 200) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateWarning) + + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + + time.Sleep(time.Second) + manager.Tick(state, 201) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateFailed) +}