Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kvclient): fix region merging would block the progress of resolveLock (#3062) #3101

Merged
Merged
19 changes: 12 additions & 7 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
continue
}
rts.ts.penalty = 0
}
rts.ts.resolvedTs = lastResolvedTs
w.rtsManager.Upsert(rts)
Expand Down Expand Up @@ -743,6 +744,16 @@ func (w *regionWorker) handleResolvedTs(
return nil
}
regionID := state.sri.verID.GetID()
// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
// NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic
// (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress;
// (2) if it is a normal one, update rtsManager and check sinceLastResolvedTs
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
default:
}

if resolvedTs < state.lastResolvedTs {
log.Warn("The resolvedTs is fallen back in kvclient",
zap.String("Event Type", "RESOLVED"),
Expand All @@ -751,6 +762,7 @@ func (w *regionWorker) handleResolvedTs(
zap.Uint64("regionID", regionID))
return nil
}
state.lastResolvedTs = resolvedTs
// emit a checkpointTs
revent := model.RegionFeedEvent{
RegionID: regionID,
Expand All @@ -759,13 +771,6 @@ func (w *regionWorker) handleResolvedTs(
ResolvedTs: resolvedTs,
},
}
state.lastResolvedTs = resolvedTs
// Send resolved ts update in non blocking way, since we can re-query real
// resolved ts from region state even if resolved ts update is discarded.
select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}:
default:
}

select {
case w.outputCh <- revent:
Expand Down
5 changes: 3 additions & 2 deletions cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ func newRegionTsManager() *regionTsManager {
// Upsert implements insert and update on duplicated key
func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
// in a single resolved ts manager, we should not expect a fallback resolved event
// but it's ok that we use fallback resolved event to increase penalty
if !item.ts.sortByEvTime {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
Expand Down
30 changes: 30 additions & 0 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,36 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) {
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerPenaltyForFallBackEvent(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newResolvedTsItem(1000)},
}
for _, rts := range initRegions {
mgr.Upsert(rts)
}
c.Assert(mgr.Len(), check.Equals, 1)

// test penalty increases if we meet a fallback event
for i := 0; i < 6; i++ {
rts := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(uint64(1000 - i))}
mgr.Upsert(rts)
}
rts := mgr.Pop()
// original resolvedTs will remain unchanged
c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000))
c.Assert(rts.ts.penalty, check.Equals, 6)

// test penalty is cleared to zero if resolved ts is advanced
mgr.Upsert(rts)
rtsNew := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)}
mgr.Upsert(rtsNew)
rts = mgr.Pop()
c.Assert(rts.ts.penalty, check.DeepEquals, 0)
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ MySQL connection error

["CDC:ErrMySQLInvalidConfig"]
error = '''
MySQL config invaldi
MySQL config invalid
'''

["CDC:ErrMySQLQueryError"]
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var (
ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError"))
ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError"))
ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError"))
ErrMySQLInvalidConfig = errors.Normalize("MySQL config invaldi", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig"))
ErrMySQLInvalidConfig = errors.Normalize("MySQL config invalid", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig"))
ErrMySQLWorkerPanic = errors.Normalize("MySQL worker panic", errors.RFCCodeText("CDC:ErrMySQLWorkerPanic"))
ErrAvroToEnvelopeError = errors.Normalize("to envelope failed", errors.RFCCodeText("CDC:ErrAvroToEnvelopeError"))
ErrAvroUnknownType = errors.Normalize("unknown type for Avro: %v", errors.RFCCodeText("CDC:ErrAvroUnknownType"))
Expand Down