Skip to content

Commit

Permalink
cdc: region worker should handle stale events correctly (#9078)
Browse files Browse the repository at this point in the history
close #9072

Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed May 30, 2023
1 parent 7fd541d commit 8258169
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 40 deletions.
6 changes: 4 additions & 2 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ func currentRequestID() uint64 {

type eventFeedSession struct {
client *CDCClient
startTs model.Ts
changefeed model.ChangeFeedID
tableID model.TableID
tableName string
Expand Down Expand Up @@ -369,9 +370,10 @@ func newEventFeedSession(
id := strconv.FormatUint(allocID(), 10)
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs,
changefeed.Namespace+"."+changefeed.ID)
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,
Expand Down Expand Up @@ -965,7 +967,7 @@ func (s *eventFeedSession) receiveFromStream(
) error {
var tsStat *tableStoreStat
s.client.tableStoreStats.Lock()
key := fmt.Sprintf("%d_%d", s.totalSpan.TableID, storeID)
key := fmt.Sprintf("%d_%d", s.client.tableID, storeID)
if tsStat = s.client.tableStoreStats.v[key]; tsStat == nil {
tsStat = new(tableStoreStat)
s.client.tableStoreStats.v[key] = tsStat
Expand Down
26 changes: 1 addition & 25 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,7 @@ func TestResolveLock(t *testing.T) {
}

func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
InitWorkerPool()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

Expand Down Expand Up @@ -2156,31 +2157,6 @@ func TestCommittedFallback(t *testing.T) {
testEventCommitTsFallback(t, events)
}

// TestCommitFallback tests kv client should panic when receiving a fallback commit event
func TestCommitFallback(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
mockInitializedEvent(3, currentRequestID()),
{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMIT,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a-commit-event-ts-fallback"),
StartTs: 92,
CommitTs: 98,
}},
},
},
},
}},
}
testEventCommitTsFallback(t, events)
}

// TestDeuplicateRequest tests kv client should panic when meeting a duplicate error
func TestDuplicateRequest(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
Expand Down
30 changes: 17 additions & 13 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,17 +683,20 @@ func (w *regionWorker) handleEventEntry(
state.matcher.putPrewriteRow(entry)
case cdcpb.Event_COMMIT:
w.metrics.metricPullEventCommitCounter.Inc()
// NOTE: state.getLastResolvedTs() will never less than session.startTs.
resolvedTs := state.getLastResolvedTs()
if entry.CommitTs <= resolvedTs {
// TiKV can send events with StartTs/CommitTs less than startTs.
isStaleEvent := entry.CommitTs <= w.session.startTs
if entry.CommitTs <= resolvedTs && !isStaleEvent {
logPanic("The CommitTs must be greater than the resolvedTs",
zap.String("EventType", "COMMIT"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("regionID", regionID))
return errUnreachable
}
ok := state.matcher.matchRow(entry, state.isInitialized())
if !ok {

if !state.matcher.matchRow(entry, state.isInitialized()) {
if !state.isInitialized() {
state.matcher.cacheCommitRow(entry)
continue
Expand All @@ -704,16 +707,17 @@ func (w *regionWorker) handleEventEntry(
entry.GetType(), entry.GetOpType())
}

revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}

select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
if !isStaleEvent {
revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}
select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
}
}
case cdcpb.Event_ROLLBACK:
w.metrics.metricPullEventRollbackCounter.Inc()
Expand Down
51 changes: 51 additions & 0 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sync"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -301,3 +302,53 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) {
require.Equal(t, 1, len(event.Resolved.Spans))
require.Equal(t, uint64(1), event.Resolved.Spans[0].Region)
}

func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) {
ctx := context.Background()
s := createFakeEventFeedSession()
s.eventCh = make(chan model.RegionFeedEvent, 2)
s1 := newRegionFeedState(newSingleRegionInfo(
tikv.RegionVerID{},
spanz.ToSpan([]byte{}, spanz.UpperBoundKey),
9, &tikv.RPCContext{}),
0)
s1.start()
w := newRegionWorker(model.ChangeFeedID{}, s, "")

err := w.handleResolvedTs(ctx, &resolvedTsEvent{
resolvedTs: 5,
regions: []*regionFeedState{s1},
})
require.Nil(t, err)
require.Equal(t, uint64(9), s1.lastResolvedTs)

timer := time.NewTimer(time.Second)
select {
case <-w.rtsUpdateCh:
if !timer.Stop() {
<-timer.C
}
require.False(t, true, "should never get a ResolvedTs")
case <-timer.C:
}

events := &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{
{Type: cdcpb.Event_PREWRITE, StartTs: 7},
},
},
}
err = w.handleEventEntry(ctx, events, s1)
require.Nil(t, err)

events = &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{
{Type: cdcpb.Event_COMMIT, CommitTs: 8},
},
},
}
err = w.handleEventEntry(ctx, events, s1)
require.Nil(t, err)
}

0 comments on commit 8258169

Please sign in to comment.