From 74d8629d47b5c9970573ce1552b3c8c2dc566b49 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 7 May 2024 15:32:23 +0800 Subject: [PATCH 1/7] fix: update changefeedErrorStuckDuration --- cdc/owner/feed_state_manager.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index ba2ec2f9943..ea16e91d8bf 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -168,6 +168,15 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true + changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration + if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { + log.Info("changefeed-error-stuck-duration update", + zap.Duration("old-changefeed-error-stuck-duration", m.changefeedErrorStuckDuration), + zap.Duration("new-changefeed-error-stuck-duration", changefeedErrorStuckDuration), + ) + } + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration return } From d76f1d795fa74fddf52f32eac33a0f91c19cc12d Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 7 May 2024 16:08:29 +0800 Subject: [PATCH 2/7] fmt --- cdc/owner/feed_state_manager.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index ea16e91d8bf..fe739be9b64 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -170,9 +170,9 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, adminJobPending = true changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { - log.Info("changefeed-error-stuck-duration update", - zap.Duration("old-changefeed-error-stuck-duration", m.changefeedErrorStuckDuration), - zap.Duration("new-changefeed-error-stuck-duration", changefeedErrorStuckDuration), + log.Info("changefeedErrorStuckDuration update", + zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), + zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), ) } m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration From ee34071f13cd350a68f90dda165781a6951e7c72 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 9 May 2024 10:16:35 +0800 Subject: [PATCH 3/7] update --- cdc/owner/feed_state_manager.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index fe739be9b64..dc4572d71d5 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -168,7 +169,7 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, // `handleAdminJob` returns true means that some admin jobs are pending // skip to the next tick until all the admin jobs is handled adminJobPending = true - changefeedErrorStuckDuration := *m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration + changefeedErrorStuckDuration := util.GetOrZero(m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration) if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration { log.Info("changefeedErrorStuckDuration update", zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), From 9bfe767e52370ca7d2b3365dadfae8b45f728725 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Thu, 9 May 2024 17:44:18 +0800 Subject: [PATCH 4/7] update and add an unit test case --- cdc/owner/feed_state_manager.go | 4 +- cdc/owner/feed_state_manager_test.go | 82 +++++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index dc4572d71d5..dfa6748df59 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -175,9 +175,9 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration), zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration), ) + m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration + m.changefeedErrorStuckDuration = changefeedErrorStuckDuration } - m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration - m.changefeedErrorStuckDuration = changefeedErrorStuckDuration return } diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 18747a4d8cb..c558acbf564 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" ) @@ -999,7 +1000,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.True(t, manager.ShouldRunning()) // 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing, - // the changefeed state will remain warning whena new warning is encountered. + // the changefeed state will remain warning when a new warning is encountered. time.Sleep(manager.changefeedErrorStuckDuration + 10) state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { require.NotNil(t, status) @@ -1064,3 +1065,82 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { require.Equal(t, model.StateFailed, state.Info.State) require.False(t, manager.ShouldRunning()) } + +func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { + globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test() + manager := newFeedStateManager4Test(200, 1600, 0, 2.0) + state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID, + model.DefaultChangeFeedID(changefeedInfo.ID)) + tester := orchestrator.NewReactorStateTester(t, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.Nil(t, info) + return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.Nil(t, status) + return &model.ChangeFeedStatus{}, true, nil + }) + tester.MustApplyPatches() + manager.state = state + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + // stop a changefeed + manager.PushAdminJob(&model.AdminJob{ + CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + Type: model.AdminStop, + }) + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, state.Info.State, model.StateStopped) + require.Equal(t, state.Info.AdminJobType, model.AdminStop) + require.Equal(t, state.Status.AdminJobType, model.AdminStop) + + // update ChangefeedErrorStuckDuration + stuckDuration := time.Second * 10 + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + require.NotNil(t, info) + info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) + return info, true, nil + }) + tester.MustApplyPatches() + + // resume the changefeed in stopped state + manager.PushAdminJob(&model.AdminJob{ + CfID: model.DefaultChangeFeedID(changefeedInfo.ID), + Type: model.AdminResume, + OverwriteCheckpointTs: 100, + }) + + manager.Tick(0, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.False(t, manager.ShouldRemoved()) + require.Equal(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.AdminJobType, model.AdminNone) + require.Equal(t, state.Status.AdminJobType, model.AdminNone) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + + time.Sleep(stuckDuration - 10) + manager.Tick(200, state.Status, state.Info) + tester.MustApplyPatches() + require.True(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateNormal) + + time.Sleep(stuckDuration) + manager.Tick(201, state.Status, state.Info) + tester.MustApplyPatches() + require.False(t, manager.ShouldRunning()) + require.Equal(t, state.Info.State, model.StateFailed) +} From 729e3951e3e73edacb6f71bfbcc09292f8331e7b Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 16:29:54 +0800 Subject: [PATCH 5/7] update unit test case --- cdc/owner/feed_state_manager_test.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index c558acbf564..213ce1009ce 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1085,21 +1085,25 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { manager.Tick(0, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - // stop a changefeed - manager.PushAdminJob(&model.AdminJob{ - CfID: model.DefaultChangeFeedID(changefeedInfo.ID), - Type: model.AdminStop, - }) - manager.Tick(0, state.Status, state.Info) + + stuckDuration := time.Second * 10 + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() + time.Sleep(stuckDuration - 10) + manager.Tick(100, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) - require.False(t, manager.ShouldRemoved()) - require.Equal(t, state.Info.State, model.StateStopped) - require.Equal(t, state.Info.AdminJobType, model.AdminStop) - require.Equal(t, state.Status.AdminJobType, model.AdminStop) + require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) + require.Equal(t, state.Info.State, model.StateFailed) // update ChangefeedErrorStuckDuration - stuckDuration := time.Second * 10 state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { require.NotNil(t, info) info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) @@ -1114,7 +1118,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { OverwriteCheckpointTs: 100, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(101, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) From 4f2667f473b44875d78f64a1d9ad6fa0002f9fe4 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 17:38:43 +0800 Subject: [PATCH 6/7] fix --- cdc/owner/feed_state_manager_test.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 213ce1009ce..9c81ae11c60 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1096,7 +1096,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - time.Sleep(stuckDuration - 10) + time.Sleep(stuckDuration - time.Second) manager.Tick(100, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -1109,9 +1109,16 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration) return info, true, nil }) + // update status + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + require.NotNil(t, status) + return &model.ChangeFeedStatus{ + CheckpointTs: 100, + }, true, nil + }) tester.MustApplyPatches() - // resume the changefeed in stopped state + // resume the changefeed in failed state manager.PushAdminJob(&model.AdminJob{ CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminResume, @@ -1135,14 +1142,25 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { Message: "fake error for test", }}, true, nil }) + tester.MustApplyPatches() - time.Sleep(stuckDuration - 10) + time.Sleep(stuckDuration - time.Second) manager.Tick(200, state.Status, state.Info) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - require.Equal(t, state.Info.State, model.StateNormal) + require.Equal(t, state.Info.State, model.StateWarning) + + state.PatchTaskPosition(globalVars.CaptureInfo.ID, + func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return &model.TaskPosition{Warning: &model.RunningError{ + Addr: globalVars.CaptureInfo.AdvertiseAddr, + Code: "[CDC:ErrSinkManagerRunError]", // it is fake error + Message: "fake error for test", + }}, true, nil + }) + tester.MustApplyPatches() - time.Sleep(stuckDuration) + time.Sleep(time.Second) manager.Tick(201, state.Status, state.Info) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) From ce895c09900dbc7d92451a2ed13cf85bd88fc51e Mon Sep 17 00:00:00 2001 From: wk989898 Date: Fri, 10 May 2024 17:47:01 +0800 Subject: [PATCH 7/7] chore --- cdc/owner/feed_state_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 9c81ae11c60..72250a499a6 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -1086,7 +1086,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) - stuckDuration := time.Second * 10 + stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3 state.PatchTaskPosition(globalVars.CaptureInfo.ID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { return &model.TaskPosition{Warning: &model.RunningError{