From 2022afdcb5288162dd8fb70e139cd9f3601c1f29 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 1 Sep 2020 16:42:00 +0800 Subject: [PATCH 1/5] cdc, pkg/txnutil: skip resolve lock before puller initialization Signed-off-by: Neil Shen --- cdc/kv/client.go | 160 +++++++++++------------------------ cdc/kv/client_test.go | 9 +- cdc/kv/testing.go | 18 +++- cdc/puller/mock_puller.go | 4 + cdc/puller/puller.go | 34 +++++++- pkg/txnutil/lock_resolver.go | 124 +++++++++++++++++++++++++++ 6 files changed, 228 insertions(+), 121 deletions(-) create mode 100644 pkg/txnutil/lock_resolver.go diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 52833d5b112..5b7b50cbd07 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -14,7 +14,6 @@ package kv import ( - "bytes" "context" "fmt" "io" @@ -34,11 +33,11 @@ import ( "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil" "github.com/pingcap/ticdc/pkg/util" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -358,14 +357,23 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) return } +// PullerInitialization is a workaround to solved cyclic import. +type PullerInitialization interface { + IsInitialized() bool +} + // EventFeed divides a EventFeed request on range boundaries and establishes // a EventFeed to each of the individual region. It streams back result on the // provided channel. // The `Start` and `End` field in input span must be memcomparable encoded. func (c *CDCClient) EventFeed( - ctx context.Context, span regionspan.ComparableSpan, ts uint64, enableOldValue bool, eventCh chan<- *model.RegionFeedEvent, + ctx context.Context, span regionspan.ComparableSpan, ts uint64, + enableOldValue bool, + lockResolver txnutil.LockResolver, + isPullerInit PullerInitialization, + eventCh chan<- *model.RegionFeedEvent, ) error { - s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, enableOldValue, eventCh) + s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, lockResolver, isPullerInit, enableOldValue, eventCh) return s.eventFeed(ctx, ts) } @@ -380,6 +388,9 @@ type eventFeedSession struct { regionCache *tikv.RegionCache kvStorage tikv.Storage + lockResolver txnutil.LockResolver + isPullerInit PullerInitialization + // The whole range that is being subscribed. totalSpan regionspan.ComparableSpan @@ -413,6 +424,8 @@ func newEventFeedSession( regionCache *tikv.RegionCache, kvStorage tikv.Storage, totalSpan regionspan.ComparableSpan, + lockResolver txnutil.LockResolver, + isPullerInit PullerInitialization, enableOldValue bool, eventCh chan<- *model.RegionFeedEvent, ) *eventFeedSession { @@ -428,6 +441,8 @@ func newEventFeedSession( requestRangeCh: make(chan rangeRequestTask, 16), rangeLock: regionspan.NewRegionRangeLock(), enableOldValue: enableOldValue, + lockResolver: lockResolver, + isPullerInit: isPullerInit, id: strconv.FormatUint(allocID(), 10), regionChSizeGauge: clientChannelSize.WithLabelValues(id, "region"), errChSizeGauge: clientChannelSize.WithLabelValues(id, "err"), @@ -1104,9 +1119,9 @@ func (s *eventFeedSession) receiveFromStream( // singleEventFeed handles events of a single EventFeed stream. // Results will be send to eventCh -// EventFeed RPC will not return checkpoint event directly +// EventFeed RPC will not return resolved event directly // Resolved event is generate while there's not non-match pre-write -// Return the maximum checkpoint +// Return the maximum resolved func (s *eventFeedSession) singleEventFeed( ctx context.Context, regionID uint64, @@ -1133,7 +1148,7 @@ func (s *eventFeedSession) singleEventFeed( defer advanceCheckTicker.Stop() lastReceivedEventTime := time.Now() startFeedTime := time.Now() - checkpointTs := startTs + lastResolvedTs := startTs select { case s.eventCh <- &model.RegionFeedEvent{ Resolved: &model.ResolvedSpan{ @@ -1142,18 +1157,22 @@ func (s *eventFeedSession) singleEventFeed( }, }: case <-ctx.Done(): - return checkpointTs, errors.Trace(ctx.Err()) + return lastResolvedTs, errors.Trace(ctx.Err()) } for { var event *cdcpb.Event var ok bool select { case <-ctx.Done(): - return checkpointTs, ctx.Err() + return lastResolvedTs, ctx.Err() case <-advanceCheckTicker.C: if time.Since(startFeedTime) < 20*time.Second { continue } + if !s.isPullerInit.IsInitialized() { + // Initializing a puller may take a long time, skip resolved lock to save unnecessary overhead. + continue + } sinceLastEvent := time.Since(lastReceivedEventTime) if sinceLastEvent > time.Second*20 { log.Warn("region not receiving event from tikv for too long time", @@ -1165,14 +1184,14 @@ func (s *eventFeedSession) singleEventFeed( continue } currentTimeFromPD := oracle.GetTimeFromTS(version.Ver) - sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(checkpointTs)) + sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(lastResolvedTs)) if sinceLastResolvedTs > time.Second*20 && initialized { log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", zap.Uint64("regionID", regionID), zap.Stringer("span", span), zap.Duration("duration", sinceLastResolvedTs), - zap.Uint64("checkpointTs", checkpointTs)) + zap.Uint64("resolvedTs", lastResolvedTs)) maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0) - err = s.resolveLock(ctx, regionID, maxVersion) + err = s.lockResolver.Resolve(ctx, regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", zap.Uint64("regionID", regionID), zap.Error(err)) continue @@ -1183,12 +1202,12 @@ func (s *eventFeedSession) singleEventFeed( } if !ok { log.Debug("singleEventFeed receiver closed") - return checkpointTs, nil + return lastResolvedTs, nil } if event == nil { log.Debug("singleEventFeed closed by error") - return checkpointTs, errors.New("single event feed aborted") + return lastResolvedTs, errors.New("single event feed aborted") } lastReceivedEventTime = time.Now() @@ -1218,13 +1237,13 @@ func (s *eventFeedSession) singleEventFeed( } revent, err := assembleCommitEvent(cacheEntry, value) if err != nil { - return checkpointTs, errors.Trace(err) + return lastResolvedTs, errors.Trace(err) } select { case s.eventCh <- revent: metricSendEventCommitCounter.Inc() case <-ctx.Done(): - return checkpointTs, errors.Trace(ctx.Err()) + return lastResolvedTs, errors.Trace(ctx.Err()) } } matcher.clearCacheCommit() @@ -1237,7 +1256,7 @@ func (s *eventFeedSession) singleEventFeed( case cdcpb.Event_Row_PUT: opType = model.OpTypePut default: - return checkpointTs, errors.Errorf("unknown tp: %v", entry.GetOpType()) + return lastResolvedTs, errors.Errorf("unknown tp: %v", entry.GetOpType()) } revent := &model.RegionFeedEvent{ @@ -1251,29 +1270,29 @@ func (s *eventFeedSession) singleEventFeed( }, } - if entry.CommitTs <= checkpointTs { + if entry.CommitTs <= lastResolvedTs { log.Fatal("The CommitTs must be greater than the resolvedTs", zap.String("Event Type", "COMMITTED"), zap.Uint64("CommitTs", entry.CommitTs), - zap.Uint64("resolvedTs", checkpointTs), + zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("regionID", regionID)) } select { case s.eventCh <- revent: metricSendEventCommittedCounter.Inc() case <-ctx.Done(): - return checkpointTs, errors.Trace(ctx.Err()) + return lastResolvedTs, errors.Trace(ctx.Err()) } case cdcpb.Event_PREWRITE: metricPullEventPrewriteCounter.Inc() matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: metricPullEventCommitCounter.Inc() - if entry.CommitTs <= checkpointTs { + if entry.CommitTs <= lastResolvedTs { log.Fatal("The CommitTs must be greater than the resolvedTs", zap.String("Event Type", "COMMIT"), zap.Uint64("CommitTs", entry.CommitTs), - zap.Uint64("resolvedTs", checkpointTs), + zap.Uint64("resolvedTs", lastResolvedTs), zap.Uint64("regionID", regionID)) } // emit a value @@ -1283,21 +1302,21 @@ func (s *eventFeedSession) singleEventFeed( matcher.cacheCommitRow(entry) continue } - return checkpointTs, + return lastResolvedTs, errors.Errorf("prewrite not match, key: %b, start-ts: %d", entry.GetKey(), entry.GetStartTs()) } revent, err := assembleCommitEvent(entry, value) if err != nil { - return checkpointTs, errors.Trace(err) + return lastResolvedTs, errors.Trace(err) } select { case s.eventCh <- revent: metricSendEventCommitCounter.Inc() case <-ctx.Done(): - return checkpointTs, errors.Trace(ctx.Err()) + return lastResolvedTs, errors.Trace(ctx.Err()) } case cdcpb.Event_ROLLBACK: metricPullEventRollbackCounter.Inc() @@ -1307,16 +1326,16 @@ func (s *eventFeedSession) singleEventFeed( case *cdcpb.Event_Admin_: log.Info("receive admin event", zap.Stringer("event", event)) case *cdcpb.Event_Error: - return checkpointTs, errors.Trace(&eventError{err: x.Error}) + return lastResolvedTs, errors.Trace(&eventError{err: x.Error}) case *cdcpb.Event_ResolvedTs: if !initialized { continue } - if x.ResolvedTs < checkpointTs { + if x.ResolvedTs < lastResolvedTs { log.Warn("The resolvedTs is fallen back in kvclient", zap.String("Event Type", "RESOLVED"), zap.Uint64("resolvedTs", x.ResolvedTs), - zap.Uint64("lastResolvedTs", checkpointTs), + zap.Uint64("lastResolvedTs", lastResolvedTs), zap.Uint64("regionID", regionID)) continue } @@ -1327,98 +1346,17 @@ func (s *eventFeedSession) singleEventFeed( ResolvedTs: x.ResolvedTs, }, } - checkpointTs = x.ResolvedTs + lastResolvedTs = x.ResolvedTs select { case s.eventCh <- revent: metricSendEventResolvedCounter.Inc() case <-ctx.Done(): - return checkpointTs, errors.Trace(ctx.Err()) - } - } - - } -} - -const scanLockLimit = 1024 - -func (s *eventFeedSession) resolveLock(ctx context.Context, regionID uint64, maxVersion uint64) error { - // TODO test whether this function will kill active transaction - req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ - MaxVersion: maxVersion, - Limit: scanLockLimit, - }) - - bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff) - var loc *tikv.KeyLocation - var key []byte - flushRegion := func() error { - var err error - loc, err = s.kvStorage.GetRegionCache().LocateRegionByID(bo, regionID) - if err != nil { - return err - } - key = loc.StartKey - return nil - } - if err := flushRegion(); err != nil { - return errors.Trace(err) - } - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - req.ScanLock().StartKey = key - resp, err := s.kvStorage.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium) - if err != nil { - return errors.Trace(err) - } - regionErr, err := resp.GetRegionError() - if err != nil { - return errors.Trace(err) - } - if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return errors.Trace(err) + return lastResolvedTs, errors.Trace(ctx.Err()) } - if err := flushRegion(); err != nil { - return errors.Trace(err) - } - continue - } - if resp.Resp == nil { - return errors.Trace(tikv.ErrBodyMissing) - } - locksResp := resp.Resp.(*kvrpcpb.ScanLockResponse) - if locksResp.GetError() != nil { - return errors.Errorf("unexpected scanlock error: %s", locksResp) - } - locksInfo := locksResp.GetLocks() - locks := make([]*tikv.Lock, len(locksInfo)) - for i := range locksInfo { - locks[i] = tikv.NewLock(locksInfo[i]) - } - - _, _, err1 := s.kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks) - if err1 != nil { - return errors.Trace(err1) - } - if len(locks) < scanLockLimit { - key = loc.EndKey - } else { - key = locks[len(locks)-1].Key } - if len(key) == 0 || (len(loc.EndKey) != 0 && bytes.Compare(key, loc.EndKey) >= 0) { - break - } - bo = tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff) } - log.Info("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Uint64("maxVersion", maxVersion)) - return nil } func assembleCommitEvent(entry *cdcpb.Event_Row, value *pendingValue) (*model.RegionFeedEvent, error) { diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index fc7d0608f66..b3ee7c99921 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv" @@ -144,6 +145,8 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { cluster.AddStore(2, "localhost:23376") cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4) + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} cdcClient, err := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{}) c.Assert(err, check.IsNil) ctx, cancel := context.WithCancel(context.Background()) @@ -151,7 +154,7 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) { eventCh := make(chan *model.RegionFeedEvent, 10) wg.Add(1) go func() { - err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, eventCh) + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) c.Assert(errors.Cause(err), check.Equals, context.Canceled) wg.Done() }() @@ -209,12 +212,14 @@ func (s *etcdSuite) TodoTestIncompatibleTiKV(c *check.C) { cluster.AddStore(1, "localhost:23375") cluster.Bootstrap(2, []uint64{1}, []uint64{3}, 3) + lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage)) + isPullInit := &mockPullerInit{} cdcClient, err := NewCDCClient(context.Background(), pdClient, kvStorage.(tikv.Storage), &security.Credential{}) c.Assert(err, check.IsNil) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() eventCh := make(chan *model.RegionFeedEvent, 10) - err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, eventCh) + err = cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 1, false, lockresolver, isPullInit, eventCh) _ = err // TODO find a way to verify the error } diff --git a/cdc/kv/testing.go b/cdc/kv/testing.go index 12faf04a546..25fe9f53920 100644 --- a/cdc/kv/testing.go +++ b/cdc/kv/testing.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/tikv" @@ -135,6 +136,12 @@ func mustGetValue(t require.TestingT, eventCh <-chan *model.RegionFeedEvent, val } } +type mockPullerInit struct{} + +func (*mockPullerInit) IsInitialized() bool { + return true +} + // TestSplit try split on every region, and test can get value event from // every region after split. func TestSplit(t require.TestingT, pdCli pd.Client, storage kv.Storage) { @@ -148,8 +155,10 @@ func TestSplit(t require.TestingT, pdCli pd.Client, storage kv.Storage) { startTS := mustGetTimestamp(t, storage) + lockresolver := txnutil.NewLockerResolver(storage.(tikv.Storage)) + isPullInit := &mockPullerInit{} go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, eventCh) require.Equal(t, err, context.Canceled) }() @@ -234,9 +243,10 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage kv.Storage) { defer cancel() startTS := mustGetTimestamp(t, storage) - + lockresolver := txnutil.NewLockerResolver(storage.(tikv.Storage)) + isPullInit := &mockPullerInit{} go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh) require.Equal(t, err, context.Canceled) }() @@ -258,7 +268,7 @@ func TestGetKVSimple(t require.TestingT, pdCli pd.Client, storage kv.Storage) { if i == 1 { checker = newEventChecker(t) go func() { - err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, checker.eventCh) + err := cli.EventFeed(ctx, regionspan.ComparableSpan{Start: nil, End: nil}, startTS, false, lockresolver, isPullInit, checker.eventCh) require.Equal(t, err, context.Canceled) }() } diff --git a/cdc/puller/mock_puller.go b/cdc/puller/mock_puller.go index 1938ceb7977..8a9f9ed75b9 100644 --- a/cdc/puller/mock_puller.go +++ b/cdc/puller/mock_puller.go @@ -176,6 +176,10 @@ func (p *mockPuller) GetResolvedTs() uint64 { return p.resolvedTs } +func (p *mockPuller) IsInitialized() bool { + return false +} + // NewMockPullerManager creates and sets up a mock puller manager func NewMockPullerManager(c *check.C, newRowFormat bool) *MockPullerManager { m := &MockPullerManager{ diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 91b1374c58a..bae8dea7c5e 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/ticdc/cdc/puller/frontier" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/txnutil" "github.com/pingcap/ticdc/pkg/util" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" @@ -45,6 +46,7 @@ type Puller interface { Run(ctx context.Context) error GetResolvedTs() uint64 Output() <-chan *model.RawKVEntry + IsInitialized() bool } type pullerImpl struct { @@ -57,6 +59,7 @@ type pullerImpl struct { outputCh chan *model.RawKVEntry tsTracker frontier.Frontier resolvedTs uint64 + initialized int64 enableOldValue bool } @@ -79,6 +82,10 @@ func NewPuller( for i := range spans { comparableSpans[i] = regionspan.ToComparableSpan(spans[i]) } + // To make puller level resolved ts initialization distinguishable, we set + // the initial ts for frontier to 0. Once the puller level resolved ts + // initialized, the ts should advance to a non-zero value. + tsTracker := frontier.NewFrontier(0, comparableSpans...) p := &pullerImpl{ pdCli: pdCli, credential: credential, @@ -87,8 +94,9 @@ func NewPuller( spans: comparableSpans, buffer: makeMemBuffer(limitter), outputCh: make(chan *model.RawKVEntry, defaultPullerOutputChanSize), - tsTracker: frontier.NewFrontier(checkpointTs, comparableSpans...), + tsTracker: tsTracker, resolvedTs: checkpointTs, + initialized: 0, enableOldValue: enableOldValue, } return p @@ -112,11 +120,12 @@ func (p *pullerImpl) Run(ctx context.Context) error { checkpointTs := p.checkpointTs eventCh := make(chan *model.RegionFeedEvent, defaultPullerEventChanSize) + lockresolver := txnutil.NewLockerResolver(p.kvStorage) for _, span := range p.spans { span := span g.Go(func() error { - return cli.EventFeed(ctx, span, checkpointTs, p.enableOldValue, eventCh) + return cli.EventFeed(ctx, span, checkpointTs, p.enableOldValue, lockresolver, p, eventCh) }) } @@ -178,6 +187,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { } }) + lastResolvedTs := p.checkpointTs g.Go(func() error { output := func(raw *model.RawKVEntry) error { if raw.CRTs < p.resolvedTs || (raw.CRTs == p.resolvedTs && raw.OpType != model.OpTypeResolved) { @@ -194,7 +204,9 @@ func (p *pullerImpl) Run(ctx context.Context) error { } return nil } - var lastResolvedTs uint64 + + start := time.Now() + initialized := false for { e, err := p.buffer.Get(ctx) if err != nil { @@ -213,7 +225,17 @@ func (p *pullerImpl) Run(ctx context.Context) error { // Forward is called in a single thread p.tsTracker.Forward(e.Resolved.Span, e.Resolved.ResolvedTs) resolvedTs := p.tsTracker.Frontier() - if resolvedTs == lastResolvedTs { + if resolvedTs > 0 && !initialized { + // Advancing to a non-zero value means the puller level + // resolved ts is initialized. + atomic.StoreInt64(&p.initialized, 1) + initialized = true + log.Info("puller is initialized", + zap.Duration("duration", time.Since(start)), + zap.String("changefeedid", changefeedID), + zap.Uint64("resolvedTs", resolvedTs)) + } + if !initialized || resolvedTs == lastResolvedTs { continue } lastResolvedTs = resolvedTs @@ -231,3 +253,7 @@ func (p *pullerImpl) Run(ctx context.Context) error { func (p *pullerImpl) GetResolvedTs() uint64 { return atomic.LoadUint64(&p.resolvedTs) } + +func (p *pullerImpl) IsInitialized() bool { + return atomic.LoadInt64(&p.initialized) > 0 +} diff --git a/pkg/txnutil/lock_resolver.go b/pkg/txnutil/lock_resolver.go new file mode 100644 index 00000000000..de64e5c61d8 --- /dev/null +++ b/pkg/txnutil/lock_resolver.go @@ -0,0 +1,124 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnutil + +import ( + "bytes" + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" + "go.uber.org/zap" +) + +// LockResolver resolves lock in the given region. +type LockResolver interface { + Resolve(ctx context.Context, regionID uint64, maxVersion uint64) error +} + +type resolver struct { + kvStorage tikv.Storage +} + +// NewLockerResolver returns a LockResolver. +func NewLockerResolver(kvStorage tikv.Storage) LockResolver { + return &resolver{ + kvStorage: kvStorage, + } +} + +const scanLockLimit = 1024 + +func (r *resolver) Resolve(ctx context.Context, regionID uint64, maxVersion uint64) error { + // TODO test whether this function will kill active transaction + + req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ + MaxVersion: maxVersion, + Limit: scanLockLimit, + }) + + bo := tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff) + var loc *tikv.KeyLocation + var key []byte + flushRegion := func() error { + var err error + loc, err = r.kvStorage.GetRegionCache().LocateRegionByID(bo, regionID) + if err != nil { + return err + } + key = loc.StartKey + return nil + } + if err := flushRegion(); err != nil { + return errors.Trace(err) + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + req.ScanLock().StartKey = key + resp, err := r.kvStorage.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + if err := flushRegion(); err != nil { + return errors.Trace(err) + } + continue + } + if resp.Resp == nil { + return errors.Trace(tikv.ErrBodyMissing) + } + locksResp := resp.Resp.(*kvrpcpb.ScanLockResponse) + if locksResp.GetError() != nil { + return errors.Errorf("unexpected scanlock error: %s", locksResp) + } + locksInfo := locksResp.GetLocks() + locks := make([]*tikv.Lock, len(locksInfo)) + for i := range locksInfo { + locks[i] = tikv.NewLock(locksInfo[i]) + } + + _, _, err1 := r.kvStorage.GetLockResolver().ResolveLocks(bo, 0, locks) + if err1 != nil { + return errors.Trace(err1) + } + if len(locks) < scanLockLimit { + key = loc.EndKey + } else { + key = locks[len(locks)-1].Key + } + + if len(key) == 0 || (len(loc.EndKey) != 0 && bytes.Compare(key, loc.EndKey) >= 0) { + break + } + bo = tikv.NewBackoffer(ctx, tikv.GcResolveLockMaxBackoff) + } + log.Info("resolve lock successfully", zap.Uint64("regionID", regionID), zap.Uint64("maxVersion", maxVersion)) + return nil +} From 208e072b8624e48cf5ea35b53baf7775ee56672e Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 1 Sep 2020 17:49:20 +0800 Subject: [PATCH 2/5] cdc, pkg/regionspan: suppress verbose log Signed-off-by: Neil Shen --- cdc/kv/client.go | 7 ++++++- pkg/regionspan/region_range_lock.go | 8 ++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 5b7b50cbd07..89548976275 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -726,7 +726,12 @@ MainLoop: }) } - log.Info("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) + logReq := log.Debug + if s.isPullerInit.IsInitialized() { + logReq = log.Info + } + logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) + err = stream.Send(req) // If Send error, the receiver should have received error too or will receive error soon. So we doesn't need diff --git a/pkg/regionspan/region_range_lock.go b/pkg/regionspan/region_range_lock.go index 869d37403f5..e62f5f46dbc 100644 --- a/pkg/regionspan/region_range_lock.go +++ b/pkg/regionspan/region_range_lock.go @@ -182,7 +182,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio version: version, }) - log.Info("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Debug("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) return LockRangeResult{ @@ -209,7 +209,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio retryRanges := make([]ComparableSpan, 0) currentRangeStartKey := startKey - log.Info("tryLockRange stale", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Debug("tryLockRange stale", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("allOverlapping", overlapStr)) //DEBUG for _, r := range overlappingRanges { @@ -237,7 +237,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } - log.Info("lock range blocked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Debug("lock range blocked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("blockedBy", overlapStr)) //DEBUG return LockRangeResult{ @@ -298,7 +298,7 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, version uint64, c panic("impossible (entry just get from BTree disappeared)") } l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) - log.Info("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), + log.Debug("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) } From 6955117a9bc4bc64a4f5ad76159b3af9ceff1d73 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 2 Sep 2020 20:28:19 +0800 Subject: [PATCH 3/5] pkg/regionspan: promote debug to info as it is helpful Signed-off-by: Neil Shen --- pkg/regionspan/region_range_lock.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/regionspan/region_range_lock.go b/pkg/regionspan/region_range_lock.go index e62f5f46dbc..869d37403f5 100644 --- a/pkg/regionspan/region_range_lock.go +++ b/pkg/regionspan/region_range_lock.go @@ -182,7 +182,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio version: version, }) - log.Debug("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("range locked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) return LockRangeResult{ @@ -209,7 +209,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio retryRanges := make([]ComparableSpan, 0) currentRangeStartKey := startKey - log.Debug("tryLockRange stale", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("tryLockRange stale", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("allOverlapping", overlapStr)) //DEBUG for _, r := range overlappingRanges { @@ -237,7 +237,7 @@ func (l *RegionRangeLock) tryLockRange(startKey, endKey []byte, regionID, versio } - log.Debug("lock range blocked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), + log.Info("lock range blocked", zap.Uint64("lockID", l.id), zap.Uint64("regionID", regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey)), zap.Strings("blockedBy", overlapStr)) //DEBUG return LockRangeResult{ @@ -298,7 +298,7 @@ func (l *RegionRangeLock) UnlockRange(startKey, endKey []byte, version uint64, c panic("impossible (entry just get from BTree disappeared)") } l.rangeCheckpointTs.Set(startKey, endKey, checkpointTs) - log.Debug("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), + log.Info("unlocked range", zap.Uint64("lockID", l.id), zap.Uint64("regionID", entry.regionID), zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) } From be5f577282d8bfa4f54c2e525a3f1e9646523923 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 4 Sep 2020 13:22:53 +0800 Subject: [PATCH 4/5] address comments Signed-off-by: Neil Shen --- cdc/kv/client.go | 2 +- cdc/puller/puller.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index a5fd1c6927a..dd6f6d58376 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1378,7 +1378,7 @@ func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pending case cdcpb.Event_Row_PUT: opType = model.OpTypePut default: - return nil, errors.Errorf("unknow tp: %v", entry.GetOpType()) + return nil, errors.Errorf("unknow tp: %v, %v", entry.GetOpType(), entry) } revent := &model.RegionFeedEvent{ diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index 459fb98e11f..aabfb800760 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -230,9 +230,16 @@ func (p *pullerImpl) Run(ctx context.Context) error { // resolved ts is initialized. atomic.StoreInt64(&p.initialized, 1) initialized = true + + spans := make([]string, 0, len(p.spans)) + for i := range p.spans { + spans = append(spans, p.spans[i].String()) + } log.Info("puller is initialized", zap.Duration("duration", time.Since(start)), zap.String("changefeedid", changefeedID), + zap.Int64("tableID", tableID), + zap.Strings("spans", spans), zap.Uint64("resolvedTs", resolvedTs)) } if !initialized || resolvedTs == lastResolvedTs { From c343af5d8f5dc259065e29e3b3785f52fa8bd08e Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 4 Sep 2020 19:30:08 +0800 Subject: [PATCH 5/5] Update cdc/kv/client.go --- cdc/kv/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 600147d816c..7a1dfd93200 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -1378,7 +1378,7 @@ func assembleCommitEvent(regionID uint64, entry *cdcpb.Event_Row, value *pending case cdcpb.Event_Row_PUT: opType = model.OpTypePut default: - return nil, errors.Errorf("unknow tp: %v, %v", entry.GetOpType(), entry) + return nil, errors.Errorf("unknown tp: %v, entry: %v", entry.GetOpType(), entry) } revent := &model.RegionFeedEvent{