From a1dce78b569a5662128c5aa5d522656b33a81b90 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 30 May 2023 13:00:35 +0800 Subject: [PATCH] cdc: region worker should handle stale events correctly (#9078) (#9081) Signed-off-by: qupeng Co-authored-by: qupeng Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- cdc/kv/client.go | 126 ++++++++++++++++++++--------------- cdc/kv/client_test.go | 28 +------- cdc/kv/region_worker.go | 30 +++++---- cdc/kv/region_worker_test.go | 51 ++++++++++++++ 4 files changed, 141 insertions(+), 94 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 739fc05fb54..0ba2f18c28a 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -14,7 +14,6 @@ package kv import ( - "container/list" "context" "fmt" "io" @@ -167,17 +166,23 @@ type CDCClient struct { tableID model.TableID tableName string - regionCounts struct { - sync.Mutex - counts *list.List + tableStoreStats struct { + sync.RWMutex + // map[table_id/store_id] -> *tableStoreStat. + v map[string]*tableStoreStat } - ingressCommitTs model.Ts - ingressResolvedTs model.Ts + // filterLoop is used in BDR mode, when it is true, tikv cdc component // will filter data that are written by another TiCDC. filterLoop bool } +type tableStoreStat struct { + regionCount atomic.Uint64 + resolvedTs atomic.Uint64 + commitTs atomic.Uint64 +} + // NewCDCClient creates a CDCClient instance func NewCDCClient( ctx context.Context, @@ -190,10 +195,10 @@ func NewCDCClient( tableID model.TableID, tableName string, filterLoop bool, -) (c CDCKVClient) { +) CDCKVClient { clusterID := pd.GetClusterID(ctx) - c = &CDCClient{ + c := &CDCClient{ clusterID: clusterID, config: cfg, pd: pd, @@ -204,15 +209,10 @@ func NewCDCClient( changefeed: changefeed, tableID: tableID, tableName: tableName, - regionCounts: struct { - sync.Mutex - counts *list.List - }{ - counts: list.New(), - }, filterLoop: filterLoop, } - return + c.tableStoreStats.v = make(map[string]*tableStoreStat) + return c } func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { @@ -262,35 +262,46 @@ func (c *CDCClient) EventFeed( lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, ) error { - c.regionCounts.Lock() - regionCount := int64(0) - c.regionCounts.counts.PushBack(®ionCount) - c.regionCounts.Unlock() - s := newEventFeedSession( - c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName) - return s.eventFeed(ctx, ts, ®ionCount) + s := newEventFeedSession(c, span, lockResolver, ts, eventCh) + return s.eventFeed(ctx, ts) } // RegionCount returns the number of captured regions. -func (c *CDCClient) RegionCount() uint64 { - c.regionCounts.Lock() - defer c.regionCounts.Unlock() - - totalCount := uint64(0) - for e := c.regionCounts.counts.Front(); e != nil; e = e.Next() { - totalCount += uint64(atomic.LoadInt64(e.Value.(*int64))) +func (c *CDCClient) RegionCount() (totalCount uint64) { + c.tableStoreStats.RLock() + defer c.tableStoreStats.RUnlock() + for _, v := range c.tableStoreStats.v { + totalCount += v.regionCount.Load() } return totalCount } // ResolvedTs returns the current ingress resolved ts. func (c *CDCClient) ResolvedTs() model.Ts { - return atomic.LoadUint64(&c.ingressResolvedTs) + c.tableStoreStats.RLock() + defer c.tableStoreStats.RUnlock() + ingressResolvedTs := uint64(0) + for _, v := range c.tableStoreStats.v { + curr := v.resolvedTs.Load() + if curr > ingressResolvedTs { + ingressResolvedTs = curr + } + } + return ingressResolvedTs } // CommitTs returns the current ingress commit ts. func (c *CDCClient) CommitTs() model.Ts { - return atomic.LoadUint64(&c.ingressCommitTs) + c.tableStoreStats.RLock() + defer c.tableStoreStats.RUnlock() + ingressCommitTs := uint64(0) + for _, v := range c.tableStoreStats.v { + curr := v.commitTs.Load() + if curr > ingressCommitTs { + ingressCommitTs = curr + } + } + return ingressCommitTs } var currentID uint64 = 0 @@ -305,7 +316,11 @@ func currentRequestID() uint64 { } type eventFeedSession struct { - client *CDCClient + client *CDCClient + startTs model.Ts + changefeed model.ChangeFeedID + tableID model.TableID + tableName string lockResolver txnutil.LockResolver @@ -335,10 +350,6 @@ type eventFeedSession struct { streamsLock sync.RWMutex streamsCanceller map[string]context.CancelFunc - changefeed model.ChangeFeedID - tableID model.TableID - tableName string - // use sync.Pool to store resolved ts event only, because resolved ts event // has the same size and generate cycle. resolvedTsPool sync.Pool @@ -355,16 +366,18 @@ func newEventFeedSession( lockResolver txnutil.LockResolver, startTs uint64, eventCh chan<- model.RegionFeedEvent, - changefeed model.ChangeFeedID, - tableID model.TableID, - tableName string, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) rangeLock := regionspan.NewRegionRangeLock( totalSpan.Start, totalSpan.End, startTs, - changefeed.Namespace+"."+changefeed.ID) + client.changefeed.Namespace+"."+client.changefeed.ID) return &eventFeedSession{ - client: client, + client: client, + startTs: startTs, + changefeed: client.changefeed, + tableID: client.tableID, + tableName: client.tableName, + totalSpan: totalSpan, eventCh: eventCh, rangeLock: rangeLock, @@ -382,14 +395,10 @@ func newEventFeedSession( } }, }, - - changefeed: changefeed, - tableID: tableID, - tableName: tableName, } } -func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error { +func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]() s.regionCh = chann.NewDrainableChann[singleRegionInfo]() s.regionRouter = chann.NewDrainableChann[singleRegionInfo]() @@ -411,7 +420,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount }) g.Go(func() error { - return s.requestRegionToStore(ctx, g, regionCount) + return s.requestRegionToStore(ctx, g) }) g.Go(func() error { @@ -555,7 +564,6 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, - regionCount *int64, ) error { // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this @@ -646,8 +654,7 @@ func (s *eventFeedSession) requestRegionToStore( g.Go(func() error { defer s.deleteStream(storeAddr) - return s.receiveFromStream( - ctx, g, storeAddr, storeID, stream.client, pendingRegions, regionCount) + return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions) }) } @@ -957,8 +964,16 @@ func (s *eventFeedSession) receiveFromStream( storeID uint64, stream cdcpb.ChangeData_EventFeedClient, pendingRegions *syncRegionFeedStateMap, - regionCount *int64, ) error { + var tsStat *tableStoreStat + s.client.tableStoreStats.Lock() + key := fmt.Sprintf("%d_%d", s.client.tableID, storeID) + if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil { + tsStat = new(tableStoreStat) + s.client.tableStoreStats.v[key] = tsStat + } + s.client.tableStoreStats.Unlock() + // Cancel the pending regions if the stream failed. // Otherwise, it will remain unhandled in the pendingRegions list // however not registered in the new reconnected stream. @@ -1083,16 +1098,17 @@ func (s *eventFeedSession) receiveFromStream( if err != nil { return err } + // NOTE(qupeng): what if all regions are removed from the store? // TiKV send resolved ts events every second by default. // We check and update region count here to save CPU. - atomic.StoreInt64(regionCount, worker.statesManager.regionCount()) - atomic.StoreUint64(&s.client.ingressResolvedTs, cevent.ResolvedTs.Ts) + tsStat.regionCount.Store(uint64(worker.statesManager.regionCount())) + tsStat.resolvedTs.Store(cevent.ResolvedTs.Ts) if maxCommitTs == 0 { // In case, there is no write for the table, // we use resolved ts as maxCommitTs to make the stats meaningful. - atomic.StoreUint64(&s.client.ingressCommitTs, cevent.ResolvedTs.Ts) + tsStat.commitTs.Store(cevent.ResolvedTs.Ts) } else { - atomic.StoreUint64(&s.client.ingressCommitTs, maxCommitTs) + tsStat.commitTs.Store(maxCommitTs) } } } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 2d5bfdb9382..8a0bb559097 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -2063,6 +2063,7 @@ func TestResolveLock(t *testing.T) { } func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) { + InitWorkerPool() ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -2156,31 +2157,6 @@ func TestCommittedFallback(t *testing.T) { testEventCommitTsFallback(t, events) } -// TestCommitFallback tests kv client should panic when receiving a fallback commit event -func TestCommitFallback(t *testing.T) { - events := []*cdcpb.ChangeDataEvent{ - mockInitializedEvent(3, currentRequestID()), - {Events: []*cdcpb.Event{ - { - RegionId: 3, - RequestId: currentRequestID(), - Event: &cdcpb.Event_Entries_{ - Entries: &cdcpb.Event_Entries{ - Entries: []*cdcpb.Event_Row{{ - Type: cdcpb.Event_COMMIT, - OpType: cdcpb.Event_Row_PUT, - Key: []byte("a-commit-event-ts-fallback"), - StartTs: 92, - CommitTs: 98, - }}, - }, - }, - }, - }}, - } - testEventCommitTsFallback(t, events) -} - // TestDeuplicateRequest tests kv client should panic when meeting a duplicate error func TestDuplicateRequest(t *testing.T) { events := []*cdcpb.ChangeDataEvent{ @@ -3563,5 +3539,5 @@ func createFakeEventFeedSession() *eventFeedSession { nil, /*lockResolver*/ 100, /*startTs*/ nil, /*eventCh*/ - model.DefaultChangeFeedID("changefeed-test"), 0, "") + ) } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index d0dce75e950..0f1e5eb63b9 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -683,8 +683,11 @@ func (w *regionWorker) handleEventEntry( state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: w.metrics.metricPullEventCommitCounter.Inc() + // NOTE: state.getLastResolvedTs() will never less than session.startTs. resolvedTs := state.getLastResolvedTs() - if entry.CommitTs <= resolvedTs { + // TiKV can send events with StartTs/CommitTs less than startTs. + isStaleEvent := entry.CommitTs <= w.session.startTs + if entry.CommitTs <= resolvedTs && !isStaleEvent { logPanic("The CommitTs must be greater than the resolvedTs", zap.String("EventType", "COMMIT"), zap.Uint64("CommitTs", entry.CommitTs), @@ -692,8 +695,8 @@ func (w *regionWorker) handleEventEntry( zap.Uint64("regionID", regionID)) return errUnreachable } - ok := state.matcher.matchRow(entry, state.isInitialized()) - if !ok { + + if !state.matcher.matchRow(entry, state.isInitialized()) { if !state.isInitialized() { state.matcher.cacheCommitRow(entry) continue @@ -704,16 +707,17 @@ func (w *regionWorker) handleEventEntry( entry.GetType(), entry.GetOpType()) } - revent, err := assembleRowEvent(regionID, entry) - if err != nil { - return errors.Trace(err) - } - - select { - case w.outputCh <- revent: - w.metrics.metricSendEventCommitCounter.Inc() - case <-ctx.Done(): - return errors.Trace(ctx.Err()) + if !isStaleEvent { + revent, err := assembleRowEvent(regionID, entry) + if err != nil { + return errors.Trace(err) + } + select { + case w.outputCh <- revent: + w.metrics.metricSendEventCommitCounter.Inc() + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + } } case cdcpb.Event_ROLLBACK: w.metrics.metricPullEventRollbackCounter.Inc() diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index d383b22d40e..fef317d4c0d 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -19,6 +19,7 @@ import ( "runtime" "sync" "testing" + "time" "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/tiflow/cdc/model" @@ -301,3 +302,53 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) { require.Equal(t, 1, len(event.Resolved.Spans)) require.Equal(t, uint64(1), event.Resolved.Spans[0].Region) } + +func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) { + ctx := context.Background() + s := createFakeEventFeedSession() + s.eventCh = make(chan model.RegionFeedEvent, 2) + s1 := newRegionFeedState(singleRegionInfo{ + verID: tikv.RegionVerID{}, + resolvedTs: 9, + rpcCtx: &tikv.RPCContext{}, + }, 0) + s1.start() + w := newRegionWorker(model.ChangeFeedID{}, s, "") + + err := w.handleResolvedTs(ctx, &resolvedTsEvent{ + resolvedTs: 5, + regions: []*regionFeedState{s1}, + }) + require.Nil(t, err) + require.Equal(t, uint64(9), s1.lastResolvedTs) + + timer := time.NewTimer(time.Second) + select { + case <-w.rtsUpdateCh: + if !timer.Stop() { + <-timer.C + } + require.False(t, true, "should never get a ResolvedTs") + case <-timer.C: + } + + events := &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{ + {Type: cdcpb.Event_PREWRITE, StartTs: 7}, + }, + }, + } + err = w.handleEventEntry(ctx, events, s1) + require.Nil(t, err) + + events = &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{ + {Type: cdcpb.Event_COMMIT, CommitTs: 8}, + }, + }, + } + err = w.handleEventEntry(ctx, events, s1) + require.Nil(t, err) +}