From cb543a7d06828d1b13fcf54d7b60d385d65ba7f5 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Thu, 1 Jun 2023 18:14:26 +0800 Subject: [PATCH 1/3] add err --- cdc/processor/sinkmanager/manager.go | 8 +++ cdc/scheduler/internal/v3/agent/agent.go | 36 ++++++++----- cdc/scheduler/internal/v3/agent/agent_test.go | 20 +++---- .../v3/replication/replication_set.go | 52 +++++++++++-------- pkg/errors/cdc_errors.go | 4 ++ 5 files changed, 77 insertions(+), 43 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 46867d0e22f..b0c090e9fb9 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -937,6 +937,14 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { resolvedTs = m.sourceManager.GetTableResolvedTs(span) } + if resolvedTs < checkpointTs.ResolvedMark() { + log.Error("sinkManager: resolved ts should not less than checkpoint ts", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Uint64("resolvedTs", resolvedTs), + zap.Any("checkpointTs", checkpointTs)) + } return TableStats{ CheckpointTs: checkpointTs.ResolvedMark(), ResolvedTs: resolvedTs, diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 57472765ae1..bc7f60a4cd3 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -18,7 +18,6 @@ import ( "time" "github.com/google/uuid" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -27,7 +26,7 @@ import ( "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/version" @@ -148,7 +147,7 @@ func newAgent( revision, err := etcdClient.GetOwnerRevision(etcdCliCtx, ownerCaptureID) if err != nil { - if cerror.ErrOwnerNotFound.Equal(err) || cerror.ErrNotOwner.Equal(err) { + if errors.ErrOwnerNotFound.Equal(err) || errors.ErrNotOwner.Equal(err) { // These are expected errors when no owner has been elected log.Info("schedulerv3: no owner found when querying for the owner revision", zap.String("ownerCaptureID", ownerCaptureID), @@ -207,7 +206,10 @@ func (a *agent) Tick(ctx context.Context) (*schedulepb.Barrier, error) { return nil, errors.Trace(err) } - outboundMessages, barrier := a.handleMessage(inboundMessages) + outboundMessages, barrier, err := a.handleMessage(inboundMessages) + if err != nil { + return nil, errors.Trace(err) + } responses, err := a.tableM.poll(ctx) if err != nil { @@ -236,10 +238,8 @@ func (a *agent) handleLivenessUpdate(liveness model.Liveness) { } func (a *agent) handleMessage(msg []*schedulepb.Message) ( - []*schedulepb.Message, *schedulepb.Barrier, + result []*schedulepb.Message, barrier *schedulepb.Barrier, err error, ) { - result := make([]*schedulepb.Message, 0) - var barrier *schedulepb.Barrier for _, message := range msg { ownerCaptureID := message.GetFrom() header := message.GetHeader() @@ -254,7 +254,10 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( switch message.GetMsgType() { case schedulepb.MsgHeartbeat: var reMsg *schedulepb.Message - reMsg, barrier = a.handleMessageHeartbeat(message.GetHeartbeat()) + reMsg, barrier, err = a.handleMessageHeartbeat(message.GetHeartbeat()) + if err != nil { + return + } result = append(result, reMsg) case schedulepb.MsgDispatchTableRequest: a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch) @@ -266,22 +269,31 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) ( zap.Any("message", message)) } } - return result, barrier + return } func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) ( - *schedulepb.Message, *schedulepb.Barrier, + *schedulepb.Message, *schedulepb.Barrier, error, ) { allTables := a.tableM.getAllTableSpans() result := make([]tablepb.TableStatus, 0, allTables.Len()) + + isValidCheckpointTs := true allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool { status := table.getTableSpanStatus(request.CollectStats) + isValidCheckpointTs = status.Checkpoint.CheckpointTs <= status.Checkpoint.ResolvedTs if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } result = append(result, status) - return true + return isValidCheckpointTs }) + if !isValidCheckpointTs { + status := result[len(result)-1] + checkpointTs := status.Checkpoint.CheckpointTs + resolvedTs := status.Checkpoint.ResolvedTs + return nil, nil, errors.ErrInvalidCheckpointTs.GenWithStackByArgs(checkpointTs, resolvedTs) + } for _, span := range request.GetSpans() { if _, ok := allTables.Get(span); !ok { status := a.tableM.getTableSpanStatus(span, request.CollectStats) @@ -308,7 +320,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) ( zap.String("changefeed", a.ChangeFeedID.ID), zap.Any("message", message)) - return message, request.GetBarrier() + return message, request.GetBarrier(), nil } type dispatchTableTaskStatus int32 diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 27a5ce911ce..0e87b962c5e 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -356,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { }, } - response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureAlive, response[0].GetHeartbeatResponse().Liveness) @@ -376,7 +376,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { } a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true} - response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) result = response[0].GetHeartbeatResponse().Tables sort.Slice(result, func(i, j int) bool { return result[i].Span.TableID < result[j].Span.TableID @@ -384,13 +384,13 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { require.Equal(t, tablepb.TableStateStopping, result[1].State) a.handleLivenessUpdate(model.LivenessCaptureStopping) - response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) a.handleLivenessUpdate(model.LivenessCaptureAlive) heartbeat.Heartbeat.IsStopping = true - response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Equal(t, model.LivenessCaptureStopping, response[0].GetHeartbeatResponse().Liveness) require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load()) } @@ -577,7 +577,7 @@ func TestAgentHandleMessage(t *testing.T) { } // handle the first heartbeat, from the known owner. - response, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ := a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) addTableRequest := &schedulepb.Message{ @@ -600,17 +600,17 @@ func TestAgentHandleMessage(t *testing.T) { }, } // wrong epoch, ignored - responses, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) + responses, _, _ := a.handleMessage([]*schedulepb.Message{addTableRequest}) require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1))) require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.Epoch - _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) + _, _, _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) heartbeat.Header.OwnerRevision.Revision = 2 - response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 1) // this should never happen in real world @@ -624,12 +624,12 @@ func TestAgentHandleMessage(t *testing.T) { From: a.ownerInfo.ID, } - response, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) + response, _, _ = a.handleMessage([]*schedulepb.Message{unknownMessage}) require.Len(t, response, 0) // staled message heartbeat.Header.OwnerRevision.Revision = 1 - response, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) + response, _, _ = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Len(t, response, 0) } diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index 3bfea5c49d2..f71d00e097d 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -17,12 +17,11 @@ import ( "encoding/json" "fmt" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" - cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) @@ -163,7 +162,9 @@ func NewReplicationSet( return nil, r.inconsistentError(table, captureID, "schedulerv3: table id inconsistent") } - r.updateCheckpointAndStats(table.Checkpoint, table.Stats) + if err := r.updateCheckpointAndStats(table.Checkpoint, table.Stats); err != nil { + return nil, errors.Trace(err) + } switch table.State { case tablepb.TableStateReplicating: @@ -270,7 +271,7 @@ func (r *ReplicationSet) setCapture(captureID model.CaptureID, role Role) error cr, ok := r.Captures[captureID] if ok && cr != role { jsonR, _ := json.Marshal(r) - return cerror.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( + return errors.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( "can not set %s as %s, it's %s, %v", captureID, role, cr, string(jsonR))) } r.Captures[captureID] = role @@ -281,7 +282,7 @@ func (r *ReplicationSet) clearCapture(captureID model.CaptureID, role Role) erro cr, ok := r.Captures[captureID] if ok && cr != role { jsonR, _ := json.Marshal(r) - return cerror.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( + return errors.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( "can not clear %s as %s, it's %s, %v", captureID, role, cr, string(jsonR))) } delete(r.Captures, captureID) @@ -297,7 +298,7 @@ func (r *ReplicationSet) promoteSecondary(captureID model.CaptureID) error { role, ok := r.Captures[captureID] if ok && role != RoleSecondary { jsonR, _ := json.Marshal(r) - return cerror.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( + return errors.ErrReplicationSetInconsistent.GenWithStackByArgs(fmt.Sprintf( "can not promote %s to primary, it's %s, %v", captureID, role, string(jsonR))) } if r.Primary != "" { @@ -323,7 +324,7 @@ func (r *ReplicationSet) inconsistentError( zap.Any("replicationSet", r), }...) log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...) - return cerror.ErrReplicationSetInconsistent.GenWithStackByArgs( + return errors.ErrReplicationSetInconsistent.GenWithStackByArgs( fmt.Sprintf("tableID %d, %s", r.Span.TableID, msg)) } @@ -336,7 +337,7 @@ func (r *ReplicationSet) multiplePrimaryError( zap.Any("replicationSet", r), }...) log.L().WithOptions(zap.AddCallerSkip(1)).Error(msg, fields...) - return cerror.ErrReplicationSetMultiplePrimaryError.GenWithStackByArgs( + return errors.ErrReplicationSetMultiplePrimaryError.GenWithStackByArgs( fmt.Sprintf("tableID %d, %s", r.Span.TableID, msg)) } @@ -484,8 +485,8 @@ func (r *ReplicationSet) pollOnPrepare( } case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, nil + err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, err } case tablepb.TableStateStopping, tablepb.TableStateStopped: if r.Primary == captureID { @@ -588,7 +589,9 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopped, tablepb.TableStateAbsent: if r.Primary == captureID { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { + return nil, false, errors.Trace(err) + } original := r.Primary r.clearPrimary() if !r.hasRole(RoleSecondary) { @@ -652,7 +655,9 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { + return nil, false, errors.Trace(err) + } if r.hasRole(RoleSecondary) { // Original primary is not stopped, ask for stopping. return &schedulepb.Message{ @@ -687,8 +692,8 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopping: if r.Primary == captureID && r.hasRole(RoleSecondary) { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, nil + err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, err } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", zap.Stringer("tableState", input), @@ -713,8 +718,8 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) - return nil, false, nil + err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + return nil, false, err } return nil, false, r.multiplePrimaryError( input, captureID, "schedulerv3: multiple primary") @@ -725,7 +730,9 @@ func (r *ReplicationSet) pollOnReplicating( case tablepb.TableStateStopping: case tablepb.TableStateStopped: if r.Primary == captureID { - r.updateCheckpointAndStats(input.Checkpoint, input.Stats) + if err := r.updateCheckpointAndStats(input.Checkpoint, input.Stats); err != nil { + return nil, false, errors.Trace(err) + } // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. @@ -914,7 +921,7 @@ func (r *ReplicationSet) handleCaptureShutdown( func (r *ReplicationSet) updateCheckpointAndStats( checkpoint tablepb.Checkpoint, stats tablepb.Stats, -) { +) error { if checkpoint.ResolvedTs < checkpoint.CheckpointTs { log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("replicationSet", r), @@ -932,12 +939,15 @@ func (r *ReplicationSet) updateCheckpointAndStats( r.Checkpoint.ResolvedTs = checkpoint.ResolvedTs } if r.Checkpoint.ResolvedTs < r.Checkpoint.CheckpointTs { - log.Panic("schedulerv3: resolved ts should not less than checkpoint ts", + log.Warn("schedulerv3: resolved ts should not less than checkpoint ts", zap.Any("replicationSet", r), - zap.Any("checkpoint", r.Checkpoint.ResolvedTs), - zap.Any("resolved", r.Checkpoint.CheckpointTs)) + zap.Any("checkpointTs", r.Checkpoint.CheckpointTs), + zap.Any("resolvedTs", r.Checkpoint.ResolvedTs)) + return errors.ErrInvalidCheckpointTs.GenWithStackByArgs(r.Checkpoint.CheckpointTs, + r.Checkpoint.ResolvedTs) } r.Stats = stats + return nil } // SetHeap is a max-heap, it implements heap.Interface. diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index fc4fa67920e..1a5a7150597 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -599,6 +599,10 @@ var ( "if you want to ignore these tables, please set ignore_ineligible_table to true", errors.RFCCodeText("CDC:ErrTableIneligible"), ) + ErrInvalidCheckpointTs = errors.Normalize( + "checkpointTs(%v) should not larger than resolvedTs(%v)", + errors.RFCCodeText("CDC:ErrInvalidCheckpointTs"), + ) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. From bd44b3c1593086187398536e5e348750c9ea43df Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 2 Jun 2023 15:02:21 +0800 Subject: [PATCH 2/3] fix sorter --- cdc/processor/sinkmanager/manager_test.go | 2 +- cdc/processor/sinkmanager/redo_log_worker_test.go | 2 +- cdc/processor/sinkmanager/table_sink_worker_test.go | 2 +- cdc/processor/sourcemanager/engine/engine.go | 2 +- .../sourcemanager/engine/memory/event_sorter.go | 4 ++-- .../engine/memory/event_sorter_test.go | 2 +- .../sourcemanager/engine/pebble/event_sorter.go | 8 +++++--- .../engine/pebble/event_sorter_test.go | 13 +++++++------ cdc/processor/sourcemanager/manager.go | 2 +- 9 files changed, 20 insertions(+), 17 deletions(-) diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 077e54806b9..85d0cb28919 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -44,7 +44,7 @@ func addTableAndAddEventsToSortEngine( engine engine.SortEngine, span tablepb.Span, ) { - engine.AddTable(span) + engine.AddTable(span, 0) events := []*model.PolymorphicEvent{ { StartTs: 1, diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go index 5545d56fc22..8ed9af15f60 100644 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -81,7 +81,7 @@ func (suite *redoLogWorkerSuite) addEventsToSortEngine( events []*model.PolymorphicEvent, sortEngine engine.SortEngine, ) { - sortEngine.AddTable(suite.testSpan) + sortEngine.AddTable(suite.testSpan, 0) for _, event := range events { sortEngine.Add(suite.testSpan, event) } diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 98bad91299b..33b73726599 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -144,7 +144,7 @@ func (suite *tableSinkWorkerSuite) addEventsToSortEngine( events []*model.PolymorphicEvent, sortEngine engine.SortEngine, ) { - sortEngine.AddTable(suite.testSpan) + sortEngine.AddTable(suite.testSpan, 0) for _, event := range events { sortEngine.Add(suite.testSpan, event) } diff --git a/cdc/processor/sourcemanager/engine/engine.go b/cdc/processor/sourcemanager/engine/engine.go index bedcc6fe915..49cccadd3aa 100644 --- a/cdc/processor/sourcemanager/engine/engine.go +++ b/cdc/processor/sourcemanager/engine/engine.go @@ -27,7 +27,7 @@ type SortEngine interface { IsTableBased() bool // AddTable adds the table into the engine. - AddTable(span tablepb.Span) + AddTable(span tablepb.Span, startTs model.Ts) // RemoveTable removes the table from the engine. RemoveTable(span tablepb.Span) diff --git a/cdc/processor/sourcemanager/engine/memory/event_sorter.go b/cdc/processor/sourcemanager/engine/memory/event_sorter.go index b270f178c83..01bf0bfc7fc 100644 --- a/cdc/processor/sourcemanager/engine/memory/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter.go @@ -58,8 +58,8 @@ func (s *EventSorter) IsTableBased() bool { } // AddTable implements engine.SortEngine. -func (s *EventSorter) AddTable(span tablepb.Span) { - resolvedTs := model.Ts(0) +func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) { + resolvedTs := startTs if _, exists := s.tables.LoadOrStore(span, &tableSorter{resolvedTs: &resolvedTs}); exists { log.Panic("add an exist table", zap.Stringer("span", &span)) } diff --git a/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go index b5f4caf1640..17296aa42ea 100644 --- a/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter_test.go @@ -93,7 +93,7 @@ func TestEventSorter(t *testing.T) { span := spanz.TableIDToComparableSpan(1) es := New(context.Background()) - es.AddTable(span) + es.AddTable(span, 0) var nextToFetch engine.Position for _, tc := range testCases { for _, entry := range tc.input { diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index 3145ec31877..84aafc66177 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -113,7 +113,7 @@ func (s *EventSorter) IsTableBased() bool { } // AddTable implements engine.SortEngine. -func (s *EventSorter) AddTable(span tablepb.Span) { +func (s *EventSorter) AddTable(span tablepb.Span, startTs model.Ts) { s.mu.Lock() if _, exists := s.tables.Get(span); exists { s.mu.Unlock() @@ -123,10 +123,12 @@ func (s *EventSorter) AddTable(span tablepb.Span) { zap.Stringer("span", &span)) return } - s.tables.ReplaceOrInsert(span, &tableState{ + state := &tableState{ uniqueID: genUniqueID(), ch: s.channs[getDB(span, len(s.dbs))], - }) + } + state.maxReceivedResolvedTs.Store(startTs) + s.tables.ReplaceOrInsert(span, state) s.mu.Unlock() } diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go index dd316f1f4d0..b78beea4774 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter_test.go @@ -41,10 +41,10 @@ func TestTableOperations(t *testing.T) { require.True(t, s.IsTableBased()) span := spanz.TableIDToComparableSpan(1) - s.AddTable(span) - s.AddTable(span) + s.AddTable(span, 2) + s.AddTable(span, 1) - require.Equal(t, model.Ts(0), s.GetResolvedTs(span)) + require.Equal(t, model.Ts(2), s.GetResolvedTs(span)) s.RemoveTable(span) s.RemoveTable(span) @@ -64,7 +64,7 @@ func TestNoResolvedTs(t *testing.T) { require.True(t, s.IsTableBased()) span := spanz.TableIDToComparableSpan(1) - s.AddTable(span) + s.AddTable(span, 0) resolvedTs := make(chan model.Ts) s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts }) @@ -95,7 +95,7 @@ func TestEventFetch(t *testing.T) { require.True(t, s.IsTableBased()) span := spanz.TableIDToComparableSpan(1) - s.AddTable(span) + s.AddTable(span, 1) resolvedTs := make(chan model.Ts) s.OnResolve(func(_ tablepb.Span, ts model.Ts) { resolvedTs <- ts }) @@ -128,6 +128,7 @@ func TestEventFetch(t *testing.T) { s.Add(span, inputEvents...) s.Add(span, model.NewResolvedPolymorphicEvent(0, 4)) + require.Equal(t, model.Ts(4), s.GetResolvedTs(span)) sortedEvents := make([]*model.PolymorphicEvent, 0, len(inputEvents)) sortedPositions := make([]engine.Position, 0, len(inputEvents)) @@ -177,7 +178,7 @@ func TestCleanData(t *testing.T) { require.True(t, s.IsTableBased()) span := spanz.TableIDToComparableSpan(1) - s.AddTable(span) + s.AddTable(span, 0) require.Panics(t, func() { s.CleanByTable(spanz.TableIDToComparableSpan(2), engine.Position{}) }) diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index e996921d413..4a91812f4f5 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -106,7 +106,7 @@ func NewForTest( // AddTable adds a table to the source manager. Start puller and register table to the engine. func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. - m.engine.AddTable(span) + m.engine.AddTable(span, startTs) p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode) p.Start(m.ctx, m.up, m.engine, m.errChan) m.pullers.Store(span, p) From 26314821e8566b1ea1f24371c2c5e2b75855e7e8 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 2 Jun 2023 15:55:17 +0800 Subject: [PATCH 3/3] fix error --- cdc/processor/sourcemanager/engine/mock/engine_mock.go | 8 ++++---- errors.toml | 5 +++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/cdc/processor/sourcemanager/engine/mock/engine_mock.go b/cdc/processor/sourcemanager/engine/mock/engine_mock.go index 15d8e794f2e..1d411ea3872 100644 --- a/cdc/processor/sourcemanager/engine/mock/engine_mock.go +++ b/cdc/processor/sourcemanager/engine/mock/engine_mock.go @@ -54,15 +54,15 @@ func (mr *MockSortEngineMockRecorder) Add(span interface{}, events ...interface{ } // AddTable mocks base method. -func (m *MockSortEngine) AddTable(span tablepb.Span) { +func (m *MockSortEngine) AddTable(span tablepb.Span, startTs model.Ts) { m.ctrl.T.Helper() - m.ctrl.Call(m, "AddTable", span) + m.ctrl.Call(m, "AddTable", span, startTs) } // AddTable indicates an expected call of AddTable. -func (mr *MockSortEngineMockRecorder) AddTable(span interface{}) *gomock.Call { +func (mr *MockSortEngineMockRecorder) AddTable(span, startTs interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTable", reflect.TypeOf((*MockSortEngine)(nil).AddTable), span) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTable", reflect.TypeOf((*MockSortEngine)(nil).AddTable), span, startTs) } // CleanAllTables mocks base method. diff --git a/errors.toml b/errors.toml index 24bc2f311c8..bd4af3d88ec 100755 --- a/errors.toml +++ b/errors.toml @@ -346,6 +346,11 @@ error = ''' bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", the length should no more than %d, eg, "simple-changefeed-task", ''' +["CDC:ErrInvalidCheckpointTs"] +error = ''' +checkpointTs(%v) should not larger than resolvedTs(%v) +''' + ["CDC:ErrInvalidDDLJob"] error = ''' invalid ddl job(%d)