diff --git a/Makefile b/Makefile index db522712581..5edcd3d75d5 100644 --- a/Makefile +++ b/Makefile @@ -315,6 +315,11 @@ check: check-copyright fmt check-static tidy terror_check errdoc \ swagger-spec check-makefiles check_engine_integration_test @git --no-pager diff --exit-code || (echo "Please add changed files!" && false) +fast_check: check-copyright fmt check-static tidy terror_check errdoc \ + check-merge-conflicts check-ticdc-dashboard check-diff-line-width swagger-spec check-makefiles \ + check_cdc_integration_test check_dm_integration_test check_engine_integration_test + @git --no-pager diff --exit-code || (echo "Please add changed files!" && false) + integration_test_coverage: tools/bin/gocovmerge tools/bin/goveralls tools/bin/gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out" ifeq ("$(JenkinsCI)", "1") diff --git a/cdc/kv/client.go b/cdc/kv/client.go index fcb8d1995f3..739fc05fb54 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/chann" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pdutil" @@ -44,7 +45,6 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -70,20 +70,6 @@ const ( // failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we // don't need to force reload region anymore. regionScheduleReload = false - - // defaultRegionChanSize is the default channel size for region channel, including - // range request, region request and region error. - // Note the producer of region error channel, and the consumer of range request - // channel work in an asynchronous way, the larger channel can decrease the - // frequency of creating new goroutine. - defaultRegionChanSize = 128 - - // initial size for region rate limit queue. - defaultRegionRateLimitQueueSize = 128 - // Interval of check region retry rate limit queue. - defaultCheckRegionRateLimitInterval = 50 * time.Millisecond - // Duration of warning region retry rate limited too long. - defaultLogRegionRateLimitDuration = 10 * time.Second ) // time interval to force kv client to terminate gRPC stream and reconnect @@ -129,59 +115,13 @@ var ( type regionErrorInfo struct { singleRegionInfo err error - - retryLimitTime *time.Time - logRateLimitDuration time.Duration } func newRegionErrorInfo(info singleRegionInfo, err error) regionErrorInfo { return regionErrorInfo{ singleRegionInfo: info, err: err, - - logRateLimitDuration: defaultLogRegionRateLimitDuration, - } -} - -func (r *regionErrorInfo) logRateLimitedHint() bool { - now := time.Now() - if r.retryLimitTime == nil { - // Caller should log on the first rate limited. - r.retryLimitTime = &now - return true } - if now.Sub(*r.retryLimitTime) > r.logRateLimitDuration { - // Caller should log if it lasts too long. - r.retryLimitTime = &now - return true - } - return false -} - -type regionEventFeedLimiters struct { - sync.Mutex - // TODO replace with a LRU cache. - limiters map[uint64]*rate.Limiter -} - -var defaultRegionEventFeedLimiters = ®ionEventFeedLimiters{ - limiters: make(map[uint64]*rate.Limiter), -} - -func (rl *regionEventFeedLimiters) getLimiter(regionID uint64) *rate.Limiter { - var limiter *rate.Limiter - var ok bool - - rl.Lock() - limiter, ok = rl.limiters[regionID] - if !ok { - // In most cases, region replica count is 3. - replicaCount := 3 - limiter = rate.NewLimiter(rate.Every(100*time.Millisecond), replicaCount) - rl.limiters[regionID] = limiter - } - rl.Unlock() - return limiter } // eventFeedStream stores an EventFeed stream and pointer to the underlying gRPC connection @@ -220,9 +160,8 @@ type CDCClient struct { grpcPool GrpcPool - regionCache *tikv.RegionCache - pdClock pdutil.Clock - regionLimiters *regionEventFeedLimiters + regionCache *tikv.RegionCache + pdClock pdutil.Clock changefeed model.ChangeFeedID tableID model.TableID @@ -255,13 +194,12 @@ func NewCDCClient( clusterID := pd.GetClusterID(ctx) c = &CDCClient{ - clusterID: clusterID, - config: cfg, - pd: pd, - grpcPool: grpcPool, - regionCache: regionCache, - pdClock: pdClock, - regionLimiters: defaultRegionEventFeedLimiters, + clusterID: clusterID, + config: cfg, + pd: pd, + grpcPool: grpcPool, + regionCache: regionCache, + pdClock: pdClock, changefeed: changefeed, tableID: tableID, @@ -277,10 +215,6 @@ func NewCDCClient( return } -func (c *CDCClient) getRegionLimiter(regionID uint64) *rate.Limiter { - return c.regionLimiters.getLimiter(regionID) -} - func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) (stream *eventFeedStream, newStreamErr error) { newStreamErr = retry.Do(ctx, func() (err error) { var conn *sharedConn @@ -333,7 +267,7 @@ func (c *CDCClient) EventFeed( c.regionCounts.counts.PushBack(®ionCount) c.regionCounts.Unlock() s := newEventFeedSession( - ctx, c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName) + c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName) return s.eventFeed(ctx, ts, ®ionCount) } @@ -379,19 +313,15 @@ type eventFeedSession struct { totalSpan regionspan.ComparableSpan // The channel to send the processed events. - eventCh chan<- model.RegionFeedEvent - // The token based region router, it controls the uninitialized regions with - // a given size limit. - regionRouter LimitRegionRouter + eventCh chan<- model.RegionFeedEvent + regionRouter *chann.DrainableChann[singleRegionInfo] // The channel to put the region that will be sent requests. - regionCh chan singleRegionInfo + regionCh *chann.DrainableChann[singleRegionInfo] // The channel to notify that an error is happening, so that the error will be handled and the affected region // will be re-requested. - errCh chan regionErrorInfo + errCh *chann.DrainableChann[regionErrorInfo] // The channel to schedule scanning and requesting regions in a specified range. - requestRangeCh chan rangeRequestTask - // The queue is used to store region that reaches limit - rateLimitQueue []regionErrorInfo + requestRangeCh *chann.DrainableChann[rangeRequestTask] rangeLock *regionspan.RegionRangeLock @@ -420,7 +350,6 @@ type rangeRequestTask struct { } func newEventFeedSession( - ctx context.Context, client *CDCClient, totalSpan regionspan.ComparableSpan, lockResolver txnutil.LockResolver, @@ -438,11 +367,6 @@ func newEventFeedSession( client: client, totalSpan: totalSpan, eventCh: eventCh, - regionRouter: NewSizedRegionRouter(ctx, client.config.RegionScanLimit), - regionCh: make(chan singleRegionInfo, defaultRegionChanSize), - errCh: make(chan regionErrorInfo, defaultRegionChanSize), - requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize), - rateLimitQueue: make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize), rangeLock: rangeLock, lockResolver: lockResolver, id: id, @@ -466,8 +390,19 @@ func newEventFeedSession( } func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error { + s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]() + s.regionCh = chann.NewDrainableChann[singleRegionInfo]() + s.regionRouter = chann.NewDrainableChann[singleRegionInfo]() + s.errCh = chann.NewDrainableChann[regionErrorInfo]() + eventFeedGauge.Inc() - defer eventFeedGauge.Dec() + defer func() { + eventFeedGauge.Dec() + s.regionRouter.CloseAndDrain() + s.regionCh.CloseAndDrain() + s.errCh.CloseAndDrain() + s.requestRangeCh.CloseAndDrain() + }() g, ctx := errgroup.WithContext(ctx) @@ -484,7 +419,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount select { case <-ctx.Done(): return ctx.Err() - case task := <-s.requestRangeCh: + case task := <-s.requestRangeCh.Out(): s.rangeChSizeGauge.Dec() // divideAndSendEventFeedToRegions could be blocked for some time, // since it must wait for the region lock available. In order to @@ -502,53 +437,21 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount }) g.Go(func() error { - timer := time.NewTimer(defaultCheckRegionRateLimitInterval) - defer timer.Stop() for { select { case <-ctx.Done(): return ctx.Err() - case <-timer.C: - s.handleRateLimit(ctx) - timer.Reset(defaultCheckRegionRateLimitInterval) - case errInfo := <-s.errCh: + case errInfo := <-s.errCh.Out(): s.errChSizeGauge.Dec() - allowed := s.checkRateLimit(errInfo.singleRegionInfo.verID.GetID()) - if allowed { - if err := s.handleError(ctx, errInfo); err != nil { - return err - } - continue - } - if errInfo.logRateLimitedHint() { - zapFieldAddr := zap.Skip() - if errInfo.singleRegionInfo.rpcCtx != nil { - // rpcCtx may be nil if we failed to get region info - // from pd. It could cause by pd down or the region - // has been merged. - zapFieldAddr = zap.String("addr", errInfo.singleRegionInfo.rpcCtx.Addr) - } - log.Info("EventFeed retry rate limited", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID), - zap.Int64("tableID", s.tableID), - zap.String("tableName", s.tableName), - zap.Uint64("regionID", errInfo.singleRegionInfo.verID.GetID()), - zap.Uint64("resolvedTs", errInfo.singleRegionInfo.resolvedTs), - zap.Error(errInfo.err), - zapFieldAddr) + if err := s.handleError(ctx, errInfo); err != nil { + return err } - // rate limit triggers, add the error info to the rate limit queue. - s.rateLimitQueue = append(s.rateLimitQueue, errInfo) + continue } } }) - g.Go(func() error { - return s.regionRouter.Run(ctx) - }) - - s.requestRangeCh <- rangeRequestTask{span: s.totalSpan, ts: ts} + s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts} s.rangeChSizeGauge.Inc() log.Info("event feed started", @@ -567,7 +470,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount func (s *eventFeedSession) scheduleDivideRegionAndRequest(ctx context.Context, span regionspan.ComparableSpan, ts uint64) { task := rangeRequestTask{span: span, ts: ts} select { - case s.requestRangeCh <- task: + case s.requestRangeCh.In() <- task: s.rangeChSizeGauge.Inc() case <-ctx.Done(): } @@ -581,7 +484,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single case regionspan.LockRangeStatusSuccess: sri.resolvedTs = res.CheckpointTs select { - case s.regionCh <- sri: + case s.regionCh.In() <- sri: s.regionChSizeGauge.Inc() case <-ctx.Done(): } @@ -637,12 +540,9 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for // error handling. This function is non-blocking even if error channel is full. // CAUTION: Note that this should only be called in a context that the region has locked its range. -func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) { +func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) { s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs) - if revokeToken { - s.regionRouter.Release(errorInfo.rpcCtx.Addr) - } s.enqueueError(ctx, errorInfo) } @@ -674,7 +574,7 @@ func (s *eventFeedSession) requestRegionToStore( select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case sri = <-s.regionRouter.Chan(): + case sri = <-s.regionRouter.Out(): } requestID := allocID() @@ -732,7 +632,7 @@ func (s *eventFeedSession) requestRegionToStore( bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) errInfo := newRegionErrorInfo(sri, &connectToStoreErr{}) - s.onRegionFail(ctx, errInfo, false /* revokeToken */) + s.onRegionFail(ctx, errInfo) continue } s.addStream(storeAddr, stream, streamCancel) @@ -813,16 +713,11 @@ func (s *eventFeedSession) requestRegionToStore( // `receiveFromStream`, so no need to retry here. _, ok := pendingRegions.takeByRequestID(requestID) if !ok { - // since this pending region has been removed, the token has been - // released in advance, re-add one token here. - s.regionRouter.Acquire(storeAddr) continue } errInfo := newRegionErrorInfo(sri, &sendRequestToStoreErr{}) - s.onRegionFail(ctx, errInfo, false /* revokeToken */) - } else { - s.regionRouter.Acquire(storeAddr) + s.onRegionFail(ctx, errInfo) } } } @@ -841,7 +736,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { select { case <-ctx.Done(): return errors.Trace(ctx.Err()) - case sri = <-s.regionCh: + case sri = <-s.regionCh.Out(): s.regionChSizeGauge.Dec() } @@ -887,11 +782,11 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error { zap.Stringer("span", sri.span), zap.Uint64("resolvedTs", sri.resolvedTs)) errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID}) - s.onRegionFail(ctx, errInfo, false /* revokeToken */) + s.onRegionFail(ctx, errInfo) continue } sri.rpcCtx = rpcCtx - s.regionRouter.AddRegion(sri) + s.regionRouter.In() <- sri } } @@ -963,50 +858,12 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions( // TODO: refactor enqueueError to avoid too many goroutines spawned when a lot of regions meet error. func (s *eventFeedSession) enqueueError(ctx context.Context, errorInfo regionErrorInfo) { select { - case s.errCh <- errorInfo: + case s.errCh.In() <- errorInfo: s.errChSizeGauge.Inc() - default: - go func() { - select { - case s.errCh <- errorInfo: - s.errChSizeGauge.Inc() - case <-ctx.Done(): - } - }() - } -} - -func (s *eventFeedSession) handleRateLimit(ctx context.Context) { - var ( - i int - errInfo regionErrorInfo - ) - if len(s.rateLimitQueue) == 0 { - return - } - for i, errInfo = range s.rateLimitQueue { - s.enqueueError(ctx, errInfo) - // to avoid too many goroutines spawn, since if the error region count - // exceeds the size of errCh, new goroutine will be spawned - if i == defaultRegionChanSize-1 { - break - } - } - if i == len(s.rateLimitQueue)-1 { - s.rateLimitQueue = make([]regionErrorInfo, 0, defaultRegionRateLimitQueueSize) - } else { - s.rateLimitQueue = append(make([]regionErrorInfo, 0, len(s.rateLimitQueue)-i-1), s.rateLimitQueue[i+1:]...) + case <-ctx.Done(): } } -// checkRateLimit checks whether a region can be reconnected based on its rate limiter -func (s *eventFeedSession) checkRateLimit(regionID uint64) bool { - limiter := s.client.getRegionLimiter(regionID) - // use Limiter.Allow here since if exceed the rate limit, we skip this region - // and try it later. - return limiter.Allow() -} - // handleError handles error returned by a region. If some new EventFeed connection should be established, the region // info will be sent to `regionCh`. Note if region channel is full, this function will be blocked. // CAUTION: Note that this should only be invoked in a context that the region is not locked, otherwise use onRegionFail @@ -1116,7 +973,7 @@ func (s *eventFeedSession) receiveFromStream( remainingRegions := pendingRegions.takeAll() for _, state := range remainingRegions { errInfo := newRegionErrorInfo(state.sri, cerror.ErrPendingRegionCancel.FastGenByArgs()) - s.onRegionFail(ctx, errInfo, true /* revokeToken */) + s.onRegionFail(ctx, errInfo) } }() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 38e91cb2529..2d5bfdb9382 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -3556,77 +3556,12 @@ func TestPrewriteNotMatchError(t *testing.T) { cancel() } -func createFakeEventFeedSession(ctx context.Context) *eventFeedSession { - return newEventFeedSession(ctx, - &CDCClient{ - regionLimiters: defaultRegionEventFeedLimiters, - config: config.GetDefaultServerConfig().KVClient, - }, +func createFakeEventFeedSession() *eventFeedSession { + return newEventFeedSession( + &CDCClient{config: config.GetDefaultServerConfig().KVClient}, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, nil, /*lockResolver*/ 100, /*startTs*/ nil, /*eventCh*/ model.DefaultChangeFeedID("changefeed-test"), 0, "") } - -func TestCheckRateLimit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - session := createFakeEventFeedSession(ctx) - // to avoid execution too slow and enter dead loop - maxTrigger := 1000 - trigger := 0 - burst := 3 - for trigger = 0; trigger < maxTrigger; trigger++ { - allowed := session.checkRateLimit(1) - if !allowed { - break - } - } - if trigger == maxTrigger { - require.Fail(t, "get rate limiter too slow") - } - require.GreaterOrEqual(t, trigger, burst) - time.Sleep(100 * time.Millisecond) - allowed := session.checkRateLimit(1) - require.True(t, allowed) -} - -func TestHandleRateLimit(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - session := createFakeEventFeedSession(ctx) - - // empty rate limit item, do nothing - session.handleRateLimit(ctx) - require.Len(t, session.rateLimitQueue, 0) - require.Equal(t, defaultRegionRateLimitQueueSize, cap(session.rateLimitQueue)) - - for i := 0; i < defaultRegionRateLimitQueueSize+1; i++ { - session.rateLimitQueue = append(session.rateLimitQueue, regionErrorInfo{}) - } - session.handleRateLimit(ctx) - require.Len(t, session.rateLimitQueue, 1) - require.Equal(t, 1, cap(session.rateLimitQueue)) - session.handleRateLimit(ctx) - require.Len(t, session.rateLimitQueue, 0) - require.Equal(t, 128, cap(session.rateLimitQueue)) -} - -func TestRegionErrorInfoLogRateLimitedHint(t *testing.T) { - t.Parallel() - - errInfo := newRegionErrorInfo(singleRegionInfo{}, nil) - errInfo.logRateLimitDuration = time.Second - - // True on the first rate limited. - require.True(t, errInfo.logRateLimitedHint()) - require.False(t, errInfo.logRateLimitedHint()) - - // True if it lasts too long. - time.Sleep(2 * errInfo.logRateLimitDuration) - require.True(t, errInfo.logRateLimitedHint()) - require.False(t, errInfo.logRateLimitedHint()) -} diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index bfbf7ed1ce9..d0dce75e950 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -228,11 +228,10 @@ func (w *regionWorker) handleSingleRegionError(err error, state *regionFeedState w.cancelStream(time.Second) } - revokeToken := !state.isInitialized() // since the context used in region worker will be cancelled after region // worker exits, we must use the parent context to prevent regionErrorInfo loss. errInfo := newRegionErrorInfo(state.getRegionInfo(), err) - w.session.onRegionFail(w.parentCtx, errInfo, revokeToken) + w.session.onRegionFail(w.parentCtx, errInfo) return retErr } @@ -618,7 +617,7 @@ func (w *regionWorker) handleEventEntry( x *cdcpb.Event_Entries_, state *regionFeedState, ) error { - regionID, regionSpan, startTime, storeAddr := state.getRegionMeta() + regionID, regionSpan, startTime, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { // if a region with kv range [a, z), and we only want the get [b, c) from this region, // tikv will return all key events in the region, although specified [b, c) int the request. @@ -642,7 +641,6 @@ func (w *regionWorker) handleEventEntry( w.metrics.metricPullEventInitializedCounter.Inc() state.setInitialized() - w.session.regionRouter.Release(storeAddr) // state is just initialized, so we know this must be true cachedEvents := state.matcher.matchCachedRow(true) for _, cachedEvent := range cachedEvents { @@ -814,13 +812,12 @@ func (w *regionWorker) evictAllRegions() { for _, del := range deletes { w.delRegionState(del.regionID) del.regionState.setRegionInfoResolvedTs() - revokeToken := !del.regionState.isInitialized() // since the context used in region worker will be cancelled after // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. errInfo := newRegionErrorInfo( del.regionState.sri, cerror.ErrEventFeedAborted.FastGenByArgs()) - w.session.onRegionFail(w.parentCtx, errInfo, revokeToken) + w.session.onRegionFail(w.parentCtx, errInfo) } } } diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index edea693091e..d383b22d40e 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -145,7 +145,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() eventCh := make(chan model.RegionFeedEvent, 2) - s := createFakeEventFeedSession(ctx) + s := createFakeEventFeedSession() s.eventCh = eventCh span := regionspan.Span{Start: []byte{}, End: regionspan.UpperBoundKey} state := newRegionFeedState(newSingleRegionInfo( diff --git a/cdc/kv/token_region.go b/cdc/kv/token_region.go deleted file mode 100644 index 8c58b8f6519..00000000000 --- a/cdc/kv/token_region.go +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "sync" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/tiflow/cdc/contextutil" - "github.com/pingcap/tiflow/cdc/model" - "github.com/prometheus/client_golang/prometheus" -) - -const ( - // buffer size for ranged region consumer - regionRouterChanSize = 16 - // sizedRegionRouter checks region buffer every 100ms - sizedRegionCheckInterval = 100 * time.Millisecond -) - -// LimitRegionRouter defines an interface that can buffer singleRegionInfo -// and provide token based consumption -type LimitRegionRouter interface { - // Chan returns a singleRegionInfo channel that can be consumed from - Chan() <-chan singleRegionInfo - // AddRegion adds an singleRegionInfo to buffer, this function is thread-safe - AddRegion(task singleRegionInfo) - // Acquire acquires one token - Acquire(id string) - // Release gives back one token, this function is thread-safe - Release(id string) - // Run runs in background and does some logic work - Run(ctx context.Context) error -} - -// srrMetrics keeps metrics of a Sized Region Router -type srrMetrics struct { - changefeed model.ChangeFeedID - // mapping from id(TiKV store address) to token used - tokens map[string]prometheus.Gauge - // mapping from id(TiKV store address) to cached regions - cachedRegions map[string]prometheus.Gauge -} - -func newSrrMetrics(ctx context.Context) *srrMetrics { - changefeed := contextutil.ChangefeedIDFromCtx(ctx) - return &srrMetrics{ - changefeed: changefeed, - tokens: make(map[string]prometheus.Gauge), - cachedRegions: make(map[string]prometheus.Gauge), - } -} - -// each changefeed on a capture maintains a sizedRegionRouter -type sizedRegionRouter struct { - buffer map[string][]singleRegionInfo - output chan singleRegionInfo - lock sync.Mutex - metrics *srrMetrics - tokens map[string]int - sizeLimit int -} - -// NewSizedRegionRouter creates a new sizedRegionRouter -func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter { - return &sizedRegionRouter{ - buffer: make(map[string][]singleRegionInfo), - output: make(chan singleRegionInfo, regionRouterChanSize), - sizeLimit: sizeLimit, - tokens: make(map[string]int), - metrics: newSrrMetrics(ctx), - } -} - -func (r *sizedRegionRouter) Chan() <-chan singleRegionInfo { - return r.output -} - -func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) { - r.lock.Lock() - var id string - // if rpcCtx is not provided, use the default "" bucket - if sri.rpcCtx != nil { - id = sri.rpcCtx.Addr - } - if r.sizeLimit > r.tokens[id] && len(r.output) < regionRouterChanSize { - r.output <- sri - } else { - r.buffer[id] = append(r.buffer[id], sri) - if _, ok := r.metrics.cachedRegions[id]; !ok { - r.metrics.cachedRegions[id] = cachedRegionSize. - WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID) - } - r.metrics.cachedRegions[id].Inc() - } - r.lock.Unlock() -} - -// Acquire implements LimitRegionRouter.Acquire -// param: id is TiKV store address -func (r *sizedRegionRouter) Acquire(id string) { - r.lock.Lock() - defer r.lock.Unlock() - r.tokens[id]++ - if _, ok := r.metrics.tokens[id]; !ok { - r.metrics.tokens[id] = clientRegionTokenSize. - WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID) - } - r.metrics.tokens[id].Inc() -} - -// Release implements LimitRegionRouter.Release -// param: id is TiKV store address -func (r *sizedRegionRouter) Release(id string) { - r.lock.Lock() - defer r.lock.Unlock() - r.tokens[id]-- - if _, ok := r.metrics.tokens[id]; !ok { - r.metrics.tokens[id] = clientRegionTokenSize. - WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID) - } - r.metrics.tokens[id].Dec() -} - -func (r *sizedRegionRouter) Run(ctx context.Context) error { - ticker := time.NewTicker(sizedRegionCheckInterval) - defer func() { - ticker.Stop() - r.lock.Lock() - defer r.lock.Unlock() - for id, buf := range r.buffer { - r.metrics.cachedRegions[id].Sub(float64(len(buf))) - } - }() - for { - select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case <-ticker.C: - r.lock.Lock() - for id, buf := range r.buffer { - available := r.sizeLimit - r.tokens[id] - // the tokens used could be more than size limit, since we have - // a sized channel as level1 cache - if available <= 0 { - continue - } - if available > len(buf) { - available = len(buf) - } - // to avoid deadlock because when consuming from the output channel. - // onRegionFail could decrease tokens, which requires lock protection. - if available > regionRouterChanSize-len(r.output) { - available = regionRouterChanSize - len(r.output) - } - if available == 0 { - continue - } - for i := 0; i < available; i++ { - select { - case <-ctx.Done(): - r.lock.Unlock() - return errors.Trace(ctx.Err()) - case r.output <- buf[i]: - } - } - r.buffer[id] = r.buffer[id][available:] - r.metrics.cachedRegions[id].Sub(float64(available)) - } - r.lock.Unlock() - } - } -} diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go deleted file mode 100644 index 422080a2d8f..00000000000 --- a/cdc/kv/token_region_test.go +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package kv - -import ( - "context" - "fmt" - "sync/atomic" - "testing" - "time" - - "github.com/pingcap/errors" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/tikv" - "golang.org/x/sync/errgroup" -) - -func TestRouter(t *testing.T) { - t.Parallel() - store := "store-1" - limit := 10 - r := NewSizedRegionRouter(context.Background(), limit) - for i := 0; i < limit; i++ { - r.AddRegion(singleRegionInfo{resolvedTs: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) - } - regions := make([]singleRegionInfo, 0, limit) - // limit is less than regionScanLimitPerTable - for i := 0; i < limit; i++ { - select { - case sri := <-r.Chan(): - require.Equal(t, uint64(i), sri.resolvedTs) - r.Acquire(store) - regions = append(regions, sri) - default: - t.Error("expect region info from router") - } - } - require.Equal(t, limit, r.tokens[store]) - for range regions { - r.Release(store) - } - require.Equal(t, 0, r.tokens[store]) -} - -func TestRouterWithFastConsumer(t *testing.T) { - t.Parallel() - testRouterWithConsumer(t, func() {}) -} - -func TestRouterWithSlowConsumer(t *testing.T) { - t.Parallel() - testRouterWithConsumer(t, func() { time.Sleep(time.Millisecond * 15) }) -} - -func testRouterWithConsumer(t *testing.T, funcDoSth func()) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - store := "store-1" - limit := 20 - r := NewSizedRegionRouter(context.Background(), limit) - for i := 0; i < limit*2; i++ { - r.AddRegion(singleRegionInfo{resolvedTs: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) - } - received := uint64(0) - for i := 0; i < regionRouterChanSize; i++ { - <-r.Chan() - atomic.AddUint64(&received, 1) - r.Acquire(store) - } - - wg, ctx := errgroup.WithContext(ctx) - wg.Go(func() error { - return r.Run(ctx) - }) - - wg.Go(func() error { - for i := 0; i < regionRouterChanSize; i++ { - r.Release(store) - } - return nil - }) - - wg.Go(func() error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-r.Chan(): - r.Acquire(store) - atomic.AddUint64(&received, 1) - r.Release(store) - funcDoSth() - if atomic.LoadUint64(&received) == uint64(limit*4) { - cancel() - } - } - } - }) - - for i := 0; i < limit*2; i++ { - r.AddRegion(singleRegionInfo{resolvedTs: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) - } - - err := wg.Wait() - require.Equal(t, context.Canceled, errors.Cause(err)) - require.Equal(t, 0, r.tokens[store]) -} - -func TestRouterWithMultiStores(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - storeN := 5 - stores := make([]string, 0, storeN) - for i := 0; i < storeN; i++ { - stores = append(stores, fmt.Sprintf("store-%d", i)) - } - limit := 20 - r := NewSizedRegionRouter(context.Background(), limit) - - for _, store := range stores { - for j := 0; j < limit*2; j++ { - r.AddRegion(singleRegionInfo{resolvedTs: uint64(j), rpcCtx: &tikv.RPCContext{Addr: store}}) - } - } - received := uint64(0) - wg, ctx := errgroup.WithContext(ctx) - wg.Go(func() error { - return r.Run(ctx) - }) - - for _, store := range stores { - store := store - wg.Go(func() error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-r.Chan(): - r.Acquire(store) - atomic.AddUint64(&received, 1) - r.Release(store) - if atomic.LoadUint64(&received) == uint64(limit*4*storeN) { - cancel() - } - } - } - }) - } - - for _, store := range stores { - for i := 0; i < limit*2; i++ { - r.AddRegion(singleRegionInfo{resolvedTs: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) - } - } - - err := wg.Wait() - require.Equal(t, context.Canceled, errors.Cause(err)) - for _, store := range stores { - require.Equal(t, 0, r.tokens[store]) - } -}