From 8258169abb0096dd5fc0e90eb63030204c700ece Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 26 May 2023 19:08:39 +0800 Subject: [PATCH] cdc: region worker should handle stale events correctly (#9078) close pingcap/tiflow#9072 Signed-off-by: qupeng --- cdc/kv/client.go | 6 +++-- cdc/kv/client_test.go | 26 +----------------- cdc/kv/region_worker.go | 30 ++++++++++++--------- cdc/kv/region_worker_test.go | 51 ++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e56fc25a7f3..0ba2f18c28a 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -317,6 +317,7 @@ func currentRequestID() uint64 { type eventFeedSession struct { client *CDCClient + startTs model.Ts changefeed model.ChangeFeedID tableID model.TableID tableName string @@ -369,9 +370,10 @@ func newEventFeedSession( id := strconv.FormatUint(allocID(), 10) rangeLock := regionspan.NewRegionRangeLock( totalSpan.Start, totalSpan.End, startTs, - changefeed.Namespace+"."+changefeed.ID) + client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ client: client, + startTs: startTs, changefeed: client.changefeed, tableID: client.tableID, tableName: client.tableName, @@ -965,7 +967,7 @@ func (s *eventFeedSession) receiveFromStream( ) error { var tsStat *tableStoreStat s.client.tableStoreStats.Lock() - key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, storeID) + key := fmt.Sprintf("%d_%d", s.client.tableID, storeID) if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil { tsStat = new(tableStoreStat) s.client.tableStoreStats.v[key] = tsStat diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index f0ace0214cc..8a0bb559097 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -2063,6 +2063,7 @@ func TestResolveLock(t *testing.T) { } func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { + InitWorkerPool() ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -2156,31 +2157,6 @@ func TestCommittedFallback(t *testing.T) { testEventCommitTsFallback(t, events) } -// TestCommitFallback tests kv client should panic when receiving a fallback commit event -func TestCommitFallback(t *testing.T) { - events := []*cdcpb.ChangeDataEvent{ - mockInitializedEvent(3, currentRequestID()), - {Events: []*cdcpb.Event{ - { - RegionId: 3, - RequestId: currentRequestID(), - Event: &cdcpb.Event_Entries_{ - Entries: &cdcpb.Event_Entries{ - Entries: []*cdcpb.Event_Row{{ - Type: cdcpb.Event_COMMIT, - OpType: cdcpb.Event_Row_PUT, - Key: []byte("a-commit-event-ts-fallback"), - StartTs: 92, - CommitTs: 98, - }}, - }, - }, - }, - }}, - } - testEventCommitTsFallback(t, events) -} - // TestDeuplicateRequest tests kv client should panic when meeting a duplicate error func TestDuplicateRequest(t *testing.T) { events := []*cdcpb.ChangeDataEvent{ diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index d0dce75e950..0f1e5eb63b9 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -683,8 +683,11 @@ func (w *regionWorker) handleEventEntry( state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: w.metrics.metricPullEventCommitCounter.Inc() + // NOTE: state.getLastResolvedTs() will never less than session.startTs. resolvedTs := state.getLastResolvedTs() - if entry.CommitTs <= resolvedTs { + // TiKV can send events with StartTs/CommitTs less than startTs. + isStaleEvent := entry.CommitTs <= w.session.startTs + if entry.CommitTs <= resolvedTs && !isStaleEvent { logPanic("The CommitTs must be greater than the resolvedTs", zap.String("EventType", "COMMIT"), zap.Uint64("CommitTs", entry.CommitTs), @@ -692,8 +695,8 @@ func (w *regionWorker) handleEventEntry( zap.Uint64("regionID", regionID)) return errUnreachable } - ok := state.matcher.matchRow(entry, state.isInitialized()) - if !ok { + + if !state.matcher.matchRow(entry, state.isInitialized()) { if !state.isInitialized() { state.matcher.cacheCommitRow(entry) continue @@ -704,16 +707,17 @@ func (w *regionWorker) handleEventEntry( entry.GetType(), entry.GetOpType()) } - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !isStaleEvent { + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) + } + select { + case w.outputCh <- revent: + w.metrics.metricSendEventCommitCounter.Inc() + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + } } case cdcpb.Event_ROLLBACK: w.metrics.metricPullEventRollbackCounter.Inc() diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index d383b22d40e..fc4039aa9cd 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -19,6 +19,7 @@ import ( "runtime" "sync" "testing" + "time" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/tiflow/cdc/model" @@ -301,3 +302,53 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { require.Equal(t, 1, len(event.Resolved.Spans)) require.Equal(t, uint64(1), event.Resolved.Spans[0].Region) } + +func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { + ctx := context.Background() + s := createFakeEventFeedSession() + s.eventCh = make(chan model.RegionFeedEvent, 2) + s1 := newRegionFeedState(newSingleRegionInfo( + tikv.RegionVerID{}, + spanz.ToSpan([]byte{}, spanz.UpperBoundKey), + 9, &tikv.RPCContext{}), + 0) + s1.start() + w := newRegionWorker(model.ChangeFeedID{}, s, "") + + err := w.handleResolvedTs(ctx, &resolvedTsEvent{ + resolvedTs: 5, + regions: []*regionFeedState{s1}, + }) + require.Nil(t, err) + require.Equal(t, uint64(9), s1.lastResolvedTs) + + timer := time.NewTimer(time.Second) + select { + case <-w.rtsUpdateCh: + if !timer.Stop() { + <-timer.C + } + require.False(t, true, "should never get a ResolvedTs") + case <-timer.C: + } + + events := &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{ + {Type: cdcpb.Event_PREWRITE, StartTs: 7}, + }, + }, + } + err = w.handleEventEntry(ctx, events, s1) + require.Nil(t, err) + + events = &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{ + {Type: cdcpb.Event_COMMIT, CommitTs: 8}, + }, + }, + } + err = w.handleEventEntry(ctx, events, s1) + require.Nil(t, err) +}