Skip to content

Commit

Permalink
cdc: region worker should handle stale events correctly (#9078) (#9081)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
Co-authored-by: qupeng <qupeng@pingcap.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored May 30, 2023
1 parent 0745619 commit a1dce78
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 94 deletions.
126 changes: 71 additions & 55 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package kv

import (
"container/list"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -167,17 +166,23 @@ type CDCClient struct {
tableID model.TableID
tableName string

regionCounts struct {
sync.Mutex
counts *list.List
tableStoreStats struct {
sync.RWMutex
// map[table_id/store_id] -> *tableStoreStat.
v map[string]*tableStoreStat
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts

// filterLoop is used in BDR mode, when it is true, tikv cdc component
// will filter data that are written by another TiCDC.
filterLoop bool
}

type tableStoreStat struct {
regionCount atomic.Uint64
resolvedTs atomic.Uint64
commitTs atomic.Uint64
}

// NewCDCClient creates a CDCClient instance
func NewCDCClient(
ctx context.Context,
Expand All @@ -190,10 +195,10 @@ func NewCDCClient(
tableID model.TableID,
tableName string,
filterLoop bool,
) (c CDCKVClient) {
) CDCKVClient {
clusterID := pd.GetClusterID(ctx)

c = &CDCClient{
c := &CDCClient{
clusterID: clusterID,
config: cfg,
pd: pd,
Expand All @@ -204,15 +209,10 @@ func NewCDCClient(
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
regionCounts: struct {
sync.Mutex
counts *list.List
}{
counts: list.New(),
},
filterLoop: filterLoop,
}
return
c.tableStoreStats.v = make(map[string]*tableStoreStat)
return c
}

func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) {
Expand Down Expand Up @@ -262,35 +262,46 @@ func (c *CDCClient) EventFeed(
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
) error {
c.regionCounts.Lock()
regionCount := int64(0)
c.regionCounts.counts.PushBack(&regionCount)
c.regionCounts.Unlock()
s := newEventFeedSession(
c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName)
return s.eventFeed(ctx, ts, &regionCount)
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx, ts)
}

// RegionCount returns the number of captured regions.
func (c *CDCClient) RegionCount() uint64 {
c.regionCounts.Lock()
defer c.regionCounts.Unlock()

totalCount := uint64(0)
for e := c.regionCounts.counts.Front(); e != nil; e = e.Next() {
totalCount += uint64(atomic.LoadInt64(e.Value.(*int64)))
func (c *CDCClient) RegionCount() (totalCount uint64) {
c.tableStoreStats.RLock()
defer c.tableStoreStats.RUnlock()
for _, v := range c.tableStoreStats.v {
totalCount += v.regionCount.Load()
}
return totalCount
}

// ResolvedTs returns the current ingress resolved ts.
func (c *CDCClient) ResolvedTs() model.Ts {
return atomic.LoadUint64(&c.ingressResolvedTs)
c.tableStoreStats.RLock()
defer c.tableStoreStats.RUnlock()
ingressResolvedTs := uint64(0)
for _, v := range c.tableStoreStats.v {
curr := v.resolvedTs.Load()
if curr > ingressResolvedTs {
ingressResolvedTs = curr
}
}
return ingressResolvedTs
}

// CommitTs returns the current ingress commit ts.
func (c *CDCClient) CommitTs() model.Ts {
return atomic.LoadUint64(&c.ingressCommitTs)
c.tableStoreStats.RLock()
defer c.tableStoreStats.RUnlock()
ingressCommitTs := uint64(0)
for _, v := range c.tableStoreStats.v {
curr := v.commitTs.Load()
if curr > ingressCommitTs {
ingressCommitTs = curr
}
}
return ingressCommitTs
}

var currentID uint64 = 0
Expand All @@ -305,7 +316,11 @@ func currentRequestID() uint64 {
}

type eventFeedSession struct {
client *CDCClient
client *CDCClient
startTs model.Ts
changefeed model.ChangeFeedID
tableID model.TableID
tableName string

lockResolver txnutil.LockResolver

Expand Down Expand Up @@ -335,10 +350,6 @@ type eventFeedSession struct {
streamsLock sync.RWMutex
streamsCanceller map[string]context.CancelFunc

changefeed model.ChangeFeedID
tableID model.TableID
tableName string

// use sync.Pool to store resolved ts event only, because resolved ts event
// has the same size and generate cycle.
resolvedTsPool sync.Pool
Expand All @@ -355,16 +366,18 @@ func newEventFeedSession(
lockResolver txnutil.LockResolver,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
changefeed model.ChangeFeedID,
tableID model.TableID,
tableName string,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs,
changefeed.Namespace+"."+changefeed.ID)
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,

totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
Expand All @@ -382,14 +395,10 @@ func newEventFeedSession(
}
},
},

changefeed: changefeed,
tableID: tableID,
tableName: tableName,
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]()
s.regionCh = chann.NewDrainableChann[singleRegionInfo]()
s.regionRouter = chann.NewDrainableChann[singleRegionInfo]()
Expand All @@ -411,7 +420,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount
})

g.Go(func() error {
return s.requestRegionToStore(ctx, g, regionCount)
return s.requestRegionToStore(ctx, g)
})

g.Go(func() error {
Expand Down Expand Up @@ -555,7 +564,6 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr
func (s *eventFeedSession) requestRegionToStore(
ctx context.Context,
g *errgroup.Group,
regionCount *int64,
) error {
// Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map,
// and it will be loaded by the receiver thread when it receives the first response from that region. We need this
Expand Down Expand Up @@ -646,8 +654,7 @@ func (s *eventFeedSession) requestRegionToStore(

g.Go(func() error {
defer s.deleteStream(storeAddr)
return s.receiveFromStream(
ctx, g, storeAddr, storeID, stream.client, pendingRegions, regionCount)
return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions)
})
}

Expand Down Expand Up @@ -957,8 +964,16 @@ func (s *eventFeedSession) receiveFromStream(
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
pendingRegions *syncRegionFeedStateMap,
regionCount *int64,
) error {
var tsStat *tableStoreStat
s.client.tableStoreStats.Lock()
key := fmt.Sprintf("%d_%d", s.client.tableID, storeID)
if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil {
tsStat = new(tableStoreStat)
s.client.tableStoreStats.v[key] = tsStat
}
s.client.tableStoreStats.Unlock()

// Cancel the pending regions if the stream failed.
// Otherwise, it will remain unhandled in the pendingRegions list
// however not registered in the new reconnected stream.
Expand Down Expand Up @@ -1083,16 +1098,17 @@ func (s *eventFeedSession) receiveFromStream(
if err != nil {
return err
}
// NOTE(qupeng): what if all regions are removed from the store?
// TiKV send resolved ts events every second by default.
// We check and update region count here to save CPU.
atomic.StoreInt64(regionCount, worker.statesManager.regionCount())
atomic.StoreUint64(&s.client.ingressResolvedTs, cevent.ResolvedTs.Ts)
tsStat.regionCount.Store(uint64(worker.statesManager.regionCount()))
tsStat.resolvedTs.Store(cevent.ResolvedTs.Ts)
if maxCommitTs == 0 {
// In case, there is no write for the table,
// we use resolved ts as maxCommitTs to make the stats meaningful.
atomic.StoreUint64(&s.client.ingressCommitTs, cevent.ResolvedTs.Ts)
tsStat.commitTs.Store(cevent.ResolvedTs.Ts)
} else {
atomic.StoreUint64(&s.client.ingressCommitTs, maxCommitTs)
tsStat.commitTs.Store(maxCommitTs)
}
}
}
Expand Down
28 changes: 2 additions & 26 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,7 @@ func TestResolveLock(t *testing.T) {
}

func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
InitWorkerPool()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

Expand Down Expand Up @@ -2156,31 +2157,6 @@ func TestCommittedFallback(t *testing.T) {
testEventCommitTsFallback(t, events)
}

// TestCommitFallback tests kv client should panic when receiving a fallback commit event
func TestCommitFallback(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
mockInitializedEvent(3, currentRequestID()),
{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMIT,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a-commit-event-ts-fallback"),
StartTs: 92,
CommitTs: 98,
}},
},
},
},
}},
}
testEventCommitTsFallback(t, events)
}

// TestDeuplicateRequest tests kv client should panic when meeting a duplicate error
func TestDuplicateRequest(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
Expand Down Expand Up @@ -3563,5 +3539,5 @@ func createFakeEventFeedSession() *eventFeedSession {
nil, /*lockResolver*/
100, /*startTs*/
nil, /*eventCh*/
model.DefaultChangeFeedID("changefeed-test"), 0, "")
)
}
30 changes: 17 additions & 13 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,17 +683,20 @@ func (w *regionWorker) handleEventEntry(
state.matcher.putPrewriteRow(entry)
case cdcpb.Event_COMMIT:
w.metrics.metricPullEventCommitCounter.Inc()
// NOTE: state.getLastResolvedTs() will never less than session.startTs.
resolvedTs := state.getLastResolvedTs()
if entry.CommitTs <= resolvedTs {
// TiKV can send events with StartTs/CommitTs less than startTs.
isStaleEvent := entry.CommitTs <= w.session.startTs
if entry.CommitTs <= resolvedTs && !isStaleEvent {
logPanic("The CommitTs must be greater than the resolvedTs",
zap.String("EventType", "COMMIT"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("regionID", regionID))
return errUnreachable
}
ok := state.matcher.matchRow(entry, state.isInitialized())
if !ok {

if !state.matcher.matchRow(entry, state.isInitialized()) {
if !state.isInitialized() {
state.matcher.cacheCommitRow(entry)
continue
Expand All @@ -704,16 +707,17 @@ func (w *regionWorker) handleEventEntry(
entry.GetType(), entry.GetOpType())
}

revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}

select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
if !isStaleEvent {
revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}
select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
}
}
case cdcpb.Event_ROLLBACK:
w.metrics.metricPullEventRollbackCounter.Inc()
Expand Down
Loading

0 comments on commit a1dce78

Please sign in to comment.