Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: region worker should handle stale events correctly (#9078) #9083

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ func currentRequestID() uint64 {
}

type eventFeedSession struct {
client *CDCClient
client *CDCClient
startTs model.Ts
changefeed model.ChangeFeedID

lockResolver txnutil.LockResolver
isPullerInit PullerInitialization
Expand Down Expand Up @@ -491,7 +493,10 @@ func newEventFeedSession(
totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"-"+client.changefeed.ID)
return &eventFeedSession{
client: client,
client: client,
startTs: startTs,
changefeed: client.changefeed,

totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit),
Expand Down
50 changes: 0 additions & 50 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2173,56 +2173,6 @@ func testEventCommitTsFallback(t *testing.T, events []*cdcpb.ChangeDataEvent) {
cancel()
}

// TestCommittedFallback tests kv client should panic when receiving a fallback committed event
func TestCommittedFallback(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMITTED,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a"),
Value: []byte("committed with commit ts before resolved ts"),
StartTs: 92,
CommitTs: 98,
}},
},
},
},
}},
}
testEventCommitTsFallback(t, events)
}

// TestCommitFallback tests kv client should panic when receiving a fallback commit event
func TestCommitFallback(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
mockInitializedEvent(3, currentRequestID()),
{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMIT,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a-commit-event-ts-fallback"),
StartTs: 92,
CommitTs: 98,
}},
},
},
},
}},
}
testEventCommitTsFallback(t, events)
}

// TestDeuplicateRequest tests kv client should panic when meeting a duplicate error
func TestDuplicateRequest(t *testing.T) {
events := []*cdcpb.ChangeDataEvent{
Expand Down
31 changes: 18 additions & 13 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,16 +731,20 @@ func (w *regionWorker) handleEventEntry(
state.matcher.putPrewriteRow(entry)
case cdcpb.Event_COMMIT:
w.metrics.metricPullEventCommitCounter.Inc()
if entry.CommitTs <= state.lastResolvedTs {
// NOTE: state.getLastResolvedTs() will never less than session.startTs.
resolvedTs := state.getLastResolvedTs()
// TiKV can send events with StartTs/CommitTs less than startTs.
isStaleEvent := entry.CommitTs <= w.session.startTs
if entry.CommitTs <= resolvedTs && !isStaleEvent {
logPanic("The CommitTs must be greater than the resolvedTs",
zap.String("EventType", "COMMIT"),
zap.Uint64("CommitTs", entry.CommitTs),
zap.Uint64("resolvedTs", state.lastResolvedTs),
zap.Uint64("regionID", regionID))
return errUnreachable
}
ok := state.matcher.matchRow(entry, state.initialized)
if !ok {

if !state.matcher.matchRow(entry, state.initialized) {
if !state.initialized {
state.matcher.cacheCommitRow(entry)
continue
Expand All @@ -751,16 +755,17 @@ func (w *regionWorker) handleEventEntry(
entry.GetType(), entry.GetOpType())
}

revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}

select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
if !isStaleEvent {
revent, err := assembleRowEvent(regionID, entry)
if err != nil {
return errors.Trace(err)
}
select {
case w.outputCh <- revent:
w.metrics.metricSendEventCommitCounter.Inc()
case <-ctx.Done():
return errors.Trace(ctx.Err())
}
}
case cdcpb.Event_ROLLBACK:
w.metrics.metricPullEventRollbackCounter.Inc()
Expand Down
51 changes: 51 additions & 0 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sync"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -306,3 +307,53 @@ func TestRegionWorkerHandleResolvedTs(t *testing.T) {
require.Equal(t, 1, len(event.Resolved.Spans))
require.Equal(t, uint64(1), event.Resolved.Spans[0].Region)
}

func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) {
ctx := context.Background()
s := createFakeEventFeedSession(ctx)
s.eventCh = make(chan model.RegionFeedEvent, 2)
s1 := newRegionFeedState(singleRegionInfo{
verID: tikv.RegionVerID{},
ts: 9,
rpcCtx: &tikv.RPCContext{},
}, 0)
s1.start()
w := newRegionWorker(model.ChangeFeedID{}, s, "")

err := w.handleResolvedTs(ctx, &resolvedTsEvent{
resolvedTs: 5,
regions: []*regionFeedState{s1},
})
require.Nil(t, err)
require.Equal(t, uint64(9), s1.lastResolvedTs)

timer := time.NewTimer(time.Second)
select {
case <-w.rtsUpdateCh:
if !timer.Stop() {
<-timer.C
}
require.False(t, true, "should never get a ResolvedTs")
case <-timer.C:
}

events := &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{
{Type: cdcpb.Event_PREWRITE, StartTs: 7},
},
},
}
err = w.handleEventEntry(ctx, events, s1)
require.Nil(t, err)

events = &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{
{Type: cdcpb.Event_COMMIT, CommitTs: 8},
},
},
}
err = w.handleEventEntry(ctx, events, s1)
require.Nil(t, err)
}