Skip to content

Commit

Permalink
owner(ticdc): update changefeed configuration changefeed-error-stuck-…
Browse files Browse the repository at this point in the history
…duration correctly (#11042)

close #10998
  • Loading branch information
wk989898 authored May 13, 2024
1 parent 687b21d commit 985f8af
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
10 changes: 10 additions & 0 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -168,6 +169,15 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts,
// `handleAdminJob` returns true means that some admin jobs are pending
// skip to the next tick until all the admin jobs is handled
adminJobPending = true
changefeedErrorStuckDuration := util.GetOrZero(m.state.GetChangefeedInfo().Config.ChangefeedErrorStuckDuration)
if m.changefeedErrorStuckDuration != changefeedErrorStuckDuration {
log.Info("changefeedErrorStuckDuration update",
zap.Duration("oldChangefeedErrorStuckDuration", m.changefeedErrorStuckDuration),
zap.Duration("newChangefeedErrorStuckDuration", changefeedErrorStuckDuration),
)
m.errBackoff.MaxElapsedTime = changefeedErrorStuckDuration
m.changefeedErrorStuckDuration = changefeedErrorStuckDuration
}
return
}

Expand Down
104 changes: 103 additions & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -999,7 +1000,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
require.True(t, manager.ShouldRunning())

// 2. test when the changefeed is in warning state, and the resolvedTs and checkpointTs is not progressing,
// the changefeed state will remain warning whena new warning is encountered.
// the changefeed state will remain warning when a new warning is encountered.
time.Sleep(manager.changefeedErrorStuckDuration + 10)
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
Expand Down Expand Up @@ -1064,3 +1065,104 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) {
require.Equal(t, model.StateFailed, state.Info.State)
require.False(t, manager.ShouldRunning())
}

func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) {
globalVars, changefeedInfo := vars.NewGlobalVarsAndChangefeedInfo4Test()
manager := newFeedStateManager4Test(200, 1600, 0, 2.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
model.DefaultChangeFeedID(changefeedInfo.ID))
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{}, true, nil
})
tester.MustApplyPatches()
manager.state = state
manager.Tick(0, state.Status, state.Info)
tester.MustApplyPatches()
require.True(t, manager.ShouldRunning())

stuckDuration := manager.changefeedErrorStuckDuration + time.Second*3
state.PatchTaskPosition(globalVars.CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: globalVars.CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
time.Sleep(stuckDuration - time.Second)
manager.Tick(100, state.Status, state.Info)
tester.MustApplyPatches()
require.False(t, manager.ShouldRunning())
require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration)
require.Equal(t, state.Info.State, model.StateFailed)

// update ChangefeedErrorStuckDuration
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.NotNil(t, info)
info.Config.ChangefeedErrorStuckDuration = util.AddressOf(stuckDuration)
return info, true, nil
})
// update status
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.NotNil(t, status)
return &model.ChangeFeedStatus{
CheckpointTs: 100,
}, true, nil
})
tester.MustApplyPatches()

// resume the changefeed in failed state
manager.PushAdminJob(&model.AdminJob{
CfID: model.DefaultChangeFeedID(changefeedInfo.ID),
Type: model.AdminResume,
OverwriteCheckpointTs: 100,
})

manager.Tick(101, state.Status, state.Info)
tester.MustApplyPatches()
require.True(t, manager.ShouldRunning())
require.False(t, manager.ShouldRemoved())
require.Equal(t, manager.changefeedErrorStuckDuration, stuckDuration)
require.Equal(t, state.Info.State, model.StateNormal)
require.Equal(t, state.Info.AdminJobType, model.AdminNone)
require.Equal(t, state.Status.AdminJobType, model.AdminNone)

state.PatchTaskPosition(globalVars.CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: globalVars.CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()

time.Sleep(stuckDuration - time.Second)
manager.Tick(200, state.Status, state.Info)
tester.MustApplyPatches()
require.True(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateWarning)

state.PatchTaskPosition(globalVars.CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Warning: &model.RunningError{
Addr: globalVars.CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrSinkManagerRunError]", // it is fake error
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()

time.Sleep(time.Second)
manager.Tick(201, state.Status, state.Info)
tester.MustApplyPatches()
require.False(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateFailed)
}

0 comments on commit 985f8af

Please sign in to comment.