From d2be754e382ee54202a9c69317b971ed20739c99 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Fri, 15 Oct 2021 00:09:42 +0800 Subject: [PATCH 1/6] fix(kvclient): fix the problem that region merging would block the progress of resolveLock. close #3061 --- cdc/kv/region_worker.go | 32 +++++++++++++++++++++----------- cdc/kv/resolvedts_heap.go | 4 ++-- pkg/errors/errors.go | 2 +- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 0a4623bc396..46b19394f7b 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -369,10 +369,11 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { if rts.ts.penalty < resolveLockPenalty { if lastResolvedTs > rts.ts.resolvedTs { rts.ts.resolvedTs = lastResolvedTs - rts.ts.eventTime = time.Now() rts.ts.penalty = 0 + rts.ts.eventTime = time.Now() + w.rtsManager.Upsert(rts) } - w.rtsManager.Upsert(rts) + continue } log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", @@ -387,9 +388,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err)) continue } + } else { + rts.ts.resolvedTs = lastResolvedTs + w.rtsManager.Upsert(rts) } - rts.ts.resolvedTs = lastResolvedTs - w.rtsManager.Upsert(rts) } } } @@ -743,14 +745,29 @@ func (w *regionWorker) handleResolvedTs( return nil } regionID := state.sri.verID.GetID() + + // Send resolved ts update in non blocking way, since we can re-query real + // resolved ts from region state even if resolved ts update is discarded. + // NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic + // (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress; + // (2) if it is normal one, update rtsManager and check sinceLastResolvedTs + select { + case w.rtsUpdateCh <- ®ionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}: + default: + } + if resolvedTs < state.lastResolvedTs { log.Warn("The resolvedTs is fallen back in kvclient", zap.String("Event Type", "RESOLVED"), zap.Uint64("resolvedTs", resolvedTs), zap.Uint64("lastResolvedTs", state.lastResolvedTs), zap.Uint64("regionID", regionID)) + return nil } + + state.lastResolvedTs = resolvedTs + // emit a checkpointTs revent := model.RegionFeedEvent{ RegionID: regionID, @@ -759,13 +776,6 @@ func (w *regionWorker) handleResolvedTs( ResolvedTs: resolvedTs, }, } - state.lastResolvedTs = resolvedTs - // Send resolved ts update in non blocking way, since we can re-query real - // resolved ts from region state even if resolved ts update is discarded. - select { - case w.rtsUpdateCh <- ®ionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}: - default: - } select { case w.outputCh <- revent: diff --git a/cdc/kv/resolvedts_heap.go b/cdc/kv/resolvedts_heap.go index 68dd7c82af1..46865273535 100644 --- a/cdc/kv/resolvedts_heap.go +++ b/cdc/kv/resolvedts_heap.go @@ -91,9 +91,9 @@ func newRegionTsManager() *regionTsManager { // Upsert implements insert and update on duplicated key func (rm *regionTsManager) Upsert(item *regionTsInfo) { if old, ok := rm.m[item.regionID]; ok { - // in a single resolved ts manager, the resolved ts of a region should not be fallen back + // in a single resolved ts manager, we use the fallback resolved ts item to increase penalty if !item.ts.sortByEvTime { - if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) { + if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) { old.ts.penalty++ old.ts.eventTime = item.ts.eventTime heap.Fix(&rm.h, old.index) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 6edb74e863e..5660cbdb150 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -92,7 +92,7 @@ var ( ErrMySQLTxnError = errors.Normalize("MySQL txn error", errors.RFCCodeText("CDC:ErrMySQLTxnError")) ErrMySQLQueryError = errors.Normalize("MySQL query error", errors.RFCCodeText("CDC:ErrMySQLQueryError")) ErrMySQLConnectionError = errors.Normalize("MySQL connection error", errors.RFCCodeText("CDC:ErrMySQLConnectionError")) - ErrMySQLInvalidConfig = errors.Normalize("MySQL config invaldi", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig")) + ErrMySQLInvalidConfig = errors.Normalize("MySQL config invalid", errors.RFCCodeText("CDC:ErrMySQLInvalidConfig")) ErrMySQLWorkerPanic = errors.Normalize("MySQL worker panic", errors.RFCCodeText("CDC:ErrMySQLWorkerPanic")) ErrAvroToEnvelopeError = errors.Normalize("to envelope failed", errors.RFCCodeText("CDC:ErrAvroToEnvelopeError")) ErrAvroUnknownType = errors.Normalize("unknown type for Avro: %v", errors.RFCCodeText("CDC:ErrAvroUnknownType")) From dbc53cf4534deb7f1777271caa8b57bcd6aa4648 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Sun, 17 Oct 2021 16:38:31 +0800 Subject: [PATCH 2/6] bug(kvclient): fix bug --- cdc/kv/region_worker.go | 10 +++++----- cdc/kv/resolvedts_heap.go | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 46b19394f7b..71ee5e3e1f5 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -369,11 +369,11 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { if rts.ts.penalty < resolveLockPenalty { if lastResolvedTs > rts.ts.resolvedTs { rts.ts.resolvedTs = lastResolvedTs - rts.ts.penalty = 0 rts.ts.eventTime = time.Now() - w.rtsManager.Upsert(rts) + rts.ts.penalty = 0 } + w.rtsManager.Upsert(rts) continue } log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", @@ -388,10 +388,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err)) continue } - } else { - rts.ts.resolvedTs = lastResolvedTs - w.rtsManager.Upsert(rts) + rts.ts.penalty = 0 } + rts.ts.resolvedTs = lastResolvedTs + w.rtsManager.Upsert(rts) } } } diff --git a/cdc/kv/resolvedts_heap.go b/cdc/kv/resolvedts_heap.go index 46865273535..3e78a95a8f1 100644 --- a/cdc/kv/resolvedts_heap.go +++ b/cdc/kv/resolvedts_heap.go @@ -91,7 +91,8 @@ func newRegionTsManager() *regionTsManager { // Upsert implements insert and update on duplicated key func (rm *regionTsManager) Upsert(item *regionTsInfo) { if old, ok := rm.m[item.regionID]; ok { - // in a single resolved ts manager, we use the fallback resolved ts item to increase penalty + // in a single resolved ts manager, we should not expect a fallback resolved event + // but it's ok that we use fallback resolved event to increase penalty if !item.ts.sortByEvTime { if item.ts.resolvedTs <= old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) { old.ts.penalty++ From 639f1d17bbf8084dfbd4ed3259e4e8de80646879 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Sun, 17 Oct 2021 16:48:15 +0800 Subject: [PATCH 3/6] format(kvclient):remove blank line --- cdc/kv/region_worker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 71ee5e3e1f5..5448f7e433f 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -372,7 +372,6 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { rts.ts.eventTime = time.Now() rts.ts.penalty = 0 } - w.rtsManager.Upsert(rts) continue } From c3caa9949f043c9f4241f693072640abed0c7c80 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Mon, 18 Oct 2021 09:22:20 +0800 Subject: [PATCH 4/6] fix(errors): fix error message wrong close #3061 --- errors.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/errors.toml b/errors.toml index 3ea3497c4f1..0a2837ad6c9 100755 --- a/errors.toml +++ b/errors.toml @@ -453,7 +453,7 @@ MySQL connection error ["CDC:ErrMySQLInvalidConfig"] error = ''' -MySQL config invaldi +MySQL config invalid ''' ["CDC:ErrMySQLQueryError"] From 3c49f24d33dd44a4bbd750231b25e5513186727c Mon Sep 17 00:00:00 2001 From: maxshuang Date: Mon, 18 Oct 2021 14:52:13 +0800 Subject: [PATCH 5/6] test(kvclient): add ut for fallback event --- cdc/kv/resolvedts_heap_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/cdc/kv/resolvedts_heap_test.go b/cdc/kv/resolvedts_heap_test.go index e47d1b8acfe..96556cba690 100644 --- a/cdc/kv/resolvedts_heap_test.go +++ b/cdc/kv/resolvedts_heap_test.go @@ -96,6 +96,36 @@ func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) { c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000)) } +func (s *rtsHeapSuite) TestRegionTsManagerPenaltyForFallBackEvent(c *check.C) { + defer testleak.AfterTest(c)() + mgr := newRegionTsManager() + initRegions := []*regionTsInfo{ + {regionID: 100, ts: newResolvedTsItem(1000)}, + } + for _, rts := range initRegions { + mgr.Upsert(rts) + } + c.Assert(mgr.Len(), check.Equals, 1) + + // test penalty increases if we meet a fallback event + for i := 0; i < 6; i++ { + rts := ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(uint64(1000 - i))} + mgr.Upsert(rts) + } + rts := mgr.Pop() + // original resolvedTs will remain unchanged + c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000)) + c.Assert(rts.ts.penalty, check.Equals, 6) + + // test penalty is cleared to zero if resolved ts is advanced + mgr.Upsert(rts) + rtsNew := ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)} + mgr.Upsert(rtsNew) + rts = mgr.Pop() + c.Assert(rts.ts.penalty, check.DeepEquals, 0) + c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000)) +} + func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) { defer testleak.AfterTest(c)() mgr := newRegionTsManager() From 3c88d4868d9121310698028bb537bcb2186fee26 Mon Sep 17 00:00:00 2001 From: maxshuang Date: Tue, 19 Oct 2021 14:26:00 +0800 Subject: [PATCH 6/6] format(kvclient): remove blank line --- cdc/kv/region_worker.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 5448f7e433f..e44e8feca86 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -744,12 +744,11 @@ func (w *regionWorker) handleResolvedTs( return nil } regionID := state.sri.verID.GetID() - // Send resolved ts update in non blocking way, since we can re-query real // resolved ts from region state even if resolved ts update is discarded. // NOTICE: We send any regionTsInfo to resolveLock thread to give us a chance to trigger resolveLock logic // (1) if it is a fallback resolvedTs event, it will be discarded and accumulate penalty on the progress; - // (2) if it is normal one, update rtsManager and check sinceLastResolvedTs + // (2) if it is a normal one, update rtsManager and check sinceLastResolvedTs select { case w.rtsUpdateCh <- ®ionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}: default: @@ -761,12 +760,9 @@ func (w *regionWorker) handleResolvedTs( zap.Uint64("resolvedTs", resolvedTs), zap.Uint64("lastResolvedTs", state.lastResolvedTs), zap.Uint64("regionID", regionID)) - return nil } - state.lastResolvedTs = resolvedTs - // emit a checkpointTs revent := model.RegionFeedEvent{ RegionID: regionID,