From 6043399abbe79e6693d92ba49779dfb13d1491be Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 7 May 2024 15:32:23 +0800 Subject: [PATCH 1/8] fix: update changefeedErrorStuckDuration --- cdc/owner/feed_state_manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index db4a0a32988..18306050328 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -144,6 +144,15 @@ func (m *feedStateManager) Tick( // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true + changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration + if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { + log.Info("changefeed-error-stuck-duration update", + zap.Duration("old-changefeed-error-stuck-duration", m.changefeedErrorStuckDuration), + zap.Duration("new-changefeed-error-stuck-duration", changefeedErrorStuckDuration), + ) + } + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration return } From 8f08be63128c3c9781239de2280adff947f32fcf Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 7 May 2024 16:08:29 +0800 Subject: [PATCH 2/8] fmt --- cdc/owner/feed_state_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 18306050328..873d36c0c12 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -146,9 +146,9 @@ func (m *feedStateManager) Tick( adminJobPending = true changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { - log.Info("changefeed-error-stuck-duration update", - zap.Duration("old-changefeed-error-stuck-duration", m.changefeedErrorStuckDuration), - zap.Duration("new-changefeed-error-stuck-duration", changefeedErrorStuckDuration), + log.Info("changefeedErrorStuckDuration update", + zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), + zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), ) } m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration From 7f499c119916837473563de9176881d8476addb7 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 9 May 2024 10:16:35 +0800 Subject: [PATCH 3/8] update --- cdc/owner/feed_state_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 873d36c0c12..9d124cc3758 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -25,6 +25,7 @@ import ( cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -144,7 +145,7 @@ func (m *feedStateManager) Tick( // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true - changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration + changefeedErrorStuckDuration := util.GetOrZero(m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration) if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { log.Info("changefeedErrorStuckDuration update", zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), From 0977f4d527ee0b7ec39c17a614279837cf0cf103 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 9 May 2024 17:44:18 +0800 Subject: [PATCH 4/8] update and add an unit test case --- cdc/owner/feed_state_manager.go | 4 +- cdc/owner/feed_state_manager_test.go | 82 +++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 9d124cc3758..48b8adaf1b8 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -151,9 +151,9 @@ func (m *feedStateManager) Tick( zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), ) + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration } - m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration - m.changefeedErrorStuckDuration = changefeedErrorStuckDuration return } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index e81ae7584ac..dada353a55e 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -985,7 +986,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.True(t, manager.ShouldRunning()) // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, - // the changefeed state will remain warning whena new warning is encountered. + // the changefeed state will remain warning when a new warning is encountered. time.Sleep(manager.changefeedErrorStuckDuration + 10) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.NotNil(t, status) @@ -1050,3 +1051,82 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.Equal(t, model.StateFailed, state.Info.State) require.False(t, manager.ShouldRunning()) } + +func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { + globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID(changefeedInfo.ID)) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{}, true, nil + }) + tester.MustApplyPatches() + manager.state = state + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + // stop a changefeed + manager.PushAdminJob(&model.AdminJob{ + CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + Type: model.AdminStop, + }) + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, state.Info.State, model.StateStopped) + require.Equal(t, state.Info.AdminJobType, model.AdminStop) + require.Equal(t, state.Status.AdminJobType, model.AdminStop) + + // update ChangefeedErrorStuckDuration + stuckDuration := time.Second * 10 + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.NotNil(t, info) + info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) + return info, true, nil + }) + tester.MustApplyPatches() + + // resume the changefeed in stopped state + manager.PushAdminJob(&model.AdminJob{ + CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + Type: model.AdminResume, + OverwriteCheckpointTs: 100, + }) + + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.AdminJobType, model.AdminNone) + require.Equal(t, state.Status.AdminJobType, model.AdminNone) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + + time.Sleep(stuckDuration - 10) + manager.Tick(200, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateNormal) + + time.Sleep(stuckDuration) + manager.Tick(201, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateFailed) +} From 0dede047bad91ac5264c7e37a68f0d144506cb47 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 16:29:54 +0800 Subject: [PATCH 5/8] update unit test case --- cdc/owner/feed_state_manager_test.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index dada353a55e..733da7bbf40 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1071,21 +1071,25 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { manager.Tick(0, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - // stop a changefeed - manager.PushAdminJob(&model.AdminJob{ - CfID: model.DefaultChangeFeedID(changefeedInfo.ID), - Type: model.AdminStop, - }) - manager.Tick(0, state.Status, state.Info) + + stuckDuration := time.Second * 10 + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + time.Sleep(stuckDuration - 10) + manager.Tick(100, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) - require.False(t, manager.ShouldRemoved()) - require.Equal(t, state.Info.State, model.StateStopped) - require.Equal(t, state.Info.AdminJobType, model.AdminStop) - require.Equal(t, state.Status.AdminJobType, model.AdminStop) + require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateFailed) // update ChangefeedErrorStuckDuration - stuckDuration := time.Second * 10 state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.NotNil(t, info) info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) @@ -1100,7 +1104,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { OverwriteCheckpointTs: 100, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(101, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) From 4292cdf5ee0a1b3165d59fbbbb26a9c039b2c261 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 17:38:43 +0800 Subject: [PATCH 6/8] fix --- cdc/owner/feed_state_manager_test.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 733da7bbf40..ce28aeeae92 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1082,7 +1082,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - time.Sleep(stuckDuration - 10) + time.Sleep(stuckDuration - time.Second) manager.Tick(100, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -1095,9 +1095,16 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) return info, true, nil }) + // update status + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, true, nil + }) tester.MustApplyPatches() - // resume the changefeed in stopped state + // resume the changefeed in failed state manager.PushAdminJob(&model.AdminJob{ CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminResume, @@ -1121,14 +1128,25 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { Message: "fake error for test", }}, true, nil }) + tester.MustApplyPatches() - time.Sleep(stuckDuration - 10) + time.Sleep(stuckDuration - time.Second) manager.Tick(200, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.State, model.StateWarning) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() - time.Sleep(stuckDuration) + time.Sleep(time.Second) manager.Tick(201, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) From 40feff0908b0dcefb915a483dd722ef370fa42e0 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 17:47:01 +0800 Subject: [PATCH 7/8] chore --- cdc/owner/feed_state_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index ce28aeeae92..80fe8593fe6 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1072,7 +1072,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - stuckDuration := time.Second * 10 + stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3 state.PatchTaskPosition(globalVars.CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Warning: &model.RunningError{ From 5c965edcbe5ce6d66f699b3705cb21fb58e30fc4 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 14 May 2024 13:10:04 +0800 Subject: [PATCH 8/8] fix --- cdc/owner/feed_state_manager.go | 2 +- cdc/owner/feed_state_manager_test.go | 30 ++++++++++++++-------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 48b8adaf1b8..9786b6e4b1e 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -145,7 +145,7 @@ func (m *feedStateManager) Tick( // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true - changefeedErrorStuckDuration := util.GetOrZero(m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration) + changefeedErrorStuckDuration := util.GetOrZero(m.state.Info.Config.ChangefeedErrorStuckDuration) if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { log.Info("changefeedErrorStuckDuration update", zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 80fe8593fe6..d4002d68d41 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1053,10 +1053,11 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { } func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { - globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + ctx := cdcContext.NewBackendContext4Test(true) manager := newFeedStateManager4Test(200, 1600, 0, 2.0) state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, - model.DefaultChangeFeedID(changefeedInfo.ID)) + ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(t, state, nil) state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.Nil(t, info) @@ -1068,22 +1069,22 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(state, 0) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3 - state.PatchTaskPosition(globalVars.CaptureInfo.ID, + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Warning: &model.RunningError{ - Addr: globalVars.CaptureInfo.AdvertiseAddr, + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, Code: "[CDC:ErrSinkManagerRunError]", // it is fake error Message: "fake error for test", }}, true, nil }) tester.MustApplyPatches() time.Sleep(stuckDuration - time.Second) - manager.Tick(100, state.Status, state.Info) + manager.Tick(state, 100) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) @@ -1106,12 +1107,11 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { // resume the changefeed in failed state manager.PushAdminJob(&model.AdminJob{ - CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + CfID: ctx.ChangefeedVars().ID, Type: model.AdminResume, OverwriteCheckpointTs: 100, }) - - manager.Tick(101, state.Status, state.Info) + manager.Tick(state, 101) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -1120,10 +1120,10 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { require.Equal(t, state.Info.AdminJobType, model.AdminNone) require.Equal(t, state.Status.AdminJobType, model.AdminNone) - state.PatchTaskPosition(globalVars.CaptureInfo.ID, + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Warning: &model.RunningError{ - Addr: globalVars.CaptureInfo.AdvertiseAddr, + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, Code: "[CDC:ErrSinkManagerRunError]", // it is fake error Message: "fake error for test", }}, true, nil @@ -1131,15 +1131,15 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() time.Sleep(stuckDuration - time.Second) - manager.Tick(200, state.Status, state.Info) + manager.Tick(state, 200) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateWarning) - state.PatchTaskPosition(globalVars.CaptureInfo.ID, + state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Warning: &model.RunningError{ - Addr: globalVars.CaptureInfo.AdvertiseAddr, + Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr, Code: "[CDC:ErrSinkManagerRunError]", // it is fake error Message: "fake error for test", }}, true, nil @@ -1147,7 +1147,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() time.Sleep(time.Second) - manager.Tick(201, state.Status, state.Info) + manager.Tick(state, 201) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateFailed)