diff --git a/cdc/kv/client.go b/cdc/kv/client.go index b93cc8ea8ce..07785ed4485 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" @@ -92,6 +93,8 @@ var ( metricFeedDuplicateRequestCounter = eventFeedErrorCounter.WithLabelValues("DuplicateRequest") metricFeedUnknownErrorCounter = eventFeedErrorCounter.WithLabelValues("Unknown") metricFeedRPCCtxUnavailable = eventFeedErrorCounter.WithLabelValues("RPCCtxUnavailable") + metricStoreSendRequestErr = eventFeedErrorCounter.WithLabelValues("SendRequestToStore") + metricConnectToStoreErr = eventFeedErrorCounter.WithLabelValues("ConnectToStore") ) var ( @@ -115,9 +118,13 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, // happens, kv client needs to recover region request from singleRegionInfo func (s *singleRegionInfo) partialClone() singleRegionInfo { sri := singleRegionInfo{ - verID: s.verID, - span: s.span.Clone(), - ts: s.ts, + verID: s.verID, + span: s.span.Clone(), + ts: s.ts, + rpcCtx: &tikv.RPCContext{}, + } + if s.rpcCtx != nil { + sri.rpcCtx.Addr = s.rpcCtx.Addr } return sri } @@ -473,7 +480,7 @@ func (c *CDCClient) EventFeed( isPullerInit PullerInitialization, eventCh chan<- *model.RegionFeedEvent, ) error { - s := newEventFeedSession(c, c.regionCache, c.kvStorage, span, + s := newEventFeedSession(ctx, c, c.regionCache, c.kvStorage, span, lockResolver, isPullerInit, enableOldValue, ts, eventCh) return s.eventFeed(ctx, ts) @@ -503,6 +510,9 @@ type eventFeedSession struct { // The channel to send the processed events. eventCh chan<- *model.RegionFeedEvent + // The token based region router, it controls the uninitialzied regions with + // a given size limit. + regionRouter LimitRegionRouter // The channel to put the region that will be sent requests. regionCh chan singleRegionInfo // The channel to notify that an error is happening, so that the error will be handled and the affected region @@ -535,6 +545,7 @@ type rangeRequestTask struct { } func newEventFeedSession( + ctx context.Context, client *CDCClient, regionCache *tikv.RegionCache, kvStorage tikv.Storage, @@ -546,12 +557,14 @@ func newEventFeedSession( eventCh chan<- *model.RegionFeedEvent, ) *eventFeedSession { id := strconv.FormatUint(allocID(), 10) + kvClientCfg := config.GetGlobalServerConfig().KVClient return &eventFeedSession{ client: client, regionCache: regionCache, kvStorage: kvStorage, totalSpan: totalSpan, eventCh: eventCh, + regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit), regionCh: make(chan singleRegionInfo, 16), errCh: make(chan regionErrorInfo, 16), requestRangeCh: make(chan rangeRequestTask, 16), @@ -582,6 +595,10 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { return s.dispatchRequest(ctx, g) }) + g.Go(func() error { + return s.requestRegionToStore(ctx, g) + }) + g.Go(func() error { for { select { @@ -619,6 +636,10 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { } }) + g.Go(func() error { + return s.regionRouter.Run(ctx) + }) + s.requestRangeCh <- rangeRequestTask{span: s.totalSpan, ts: ts} s.rangeChSizeGauge.Inc() @@ -648,7 +669,6 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single s.regionChSizeGauge.Inc() case <-ctx.Done(): } - case regionspan.LockRangeStatusStale: log.Info("request expired", zap.Uint64("regionID", sri.verID.GetID()), @@ -697,9 +717,12 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single // onRegionFail handles a region's failure, which means, unlock the region's range and send the error to the errCh for // error handling. This function is non blocking even if error channel is full. // CAUTION: Note that this should only be called in a context that the region has locked it's range. -func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) error { +func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo, revokeToken bool) error { log.Debug("region failed", zap.Uint64("regionID", errorInfo.verID.GetID()), zap.Error(errorInfo.err)) s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End, errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.ts) + if revokeToken { + s.regionRouter.Release(errorInfo.rpcCtx.Addr) + } select { case s.errCh <- errorInfo: s.errChSizeGauge.Inc() @@ -715,14 +738,13 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr return nil } -// dispatchRequest manages a set of streams and dispatch event feed requests -// to these streams. Streams to each store will be created on need. After -// establishing new stream, a goroutine will be spawned to handle events from -// the stream. -// Regions from `regionCh` will be connected. If any error happens to a -// region, the error will be send to `errCh` and the receiver of `errCh` is -// responsible for handling the error. -func (s *eventFeedSession) dispatchRequest( +// requestRegionToStore gets singleRegionInfo from regionRouter, which is a token +// based limitter, sends request to TiKV. +// If the send request to TiKV returns error, fail the region with sendRequestToStoreErr +// and kv client will redispatch the region. +// If initialize gPRC stream with an error, fail the region with connectToStoreErr +// and kv client will also redispatch the region. +func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, ) error { @@ -731,179 +753,202 @@ func (s *eventFeedSession) dispatchRequest( // to pass the region info to the receiver since the region info cannot be inferred from the response from TiKV. storePendingRegions := make(map[string]*syncRegionFeedStateMap) -MainLoop: + var sri singleRegionInfo for { - // Note that when a region is received from the channel, it's range has been already locked. - var sri singleRegionInfo select { case <-ctx.Done(): - return ctx.Err() - case sri = <-s.regionCh: - s.regionChSizeGauge.Dec() + return errors.Trace(ctx.Err()) + case sri = <-s.regionRouter.Chan(): } + requestID := allocID() - log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID())) + extraOp := kvrpcpb.ExtraOp_Noop + if s.enableOldValue { + extraOp = kvrpcpb.ExtraOp_ReadOldValue + } - // Loop for retrying in case the stream has disconnected. - // TODO: Should we break if retries and fails too many times? - for { - rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID) - if err != nil { - return errors.Trace(err) + rpcCtx := sri.rpcCtx + regionID := rpcCtx.Meta.GetId() + req := &cdcpb.ChangeDataRequest{ + Header: &cdcpb.Header{ + ClusterId: s.client.clusterID, + TicdcVersion: version.ReleaseSemver(), + }, + RegionId: regionID, + RequestId: requestID, + RegionEpoch: rpcCtx.Meta.RegionEpoch, + CheckpointTs: sri.ts, + StartKey: sri.span.Start, + EndKey: sri.span.End, + ExtraOp: extraOp, + } + + failpoint.Inject("kvClientPendingRegionDelay", nil) + + // each TiKV store has an independent pendingRegions. + var pendingRegions *syncRegionFeedStateMap + + var err error + stream, ok := s.getStream(rpcCtx.Addr) + if ok { + var ok bool + pendingRegions, ok = storePendingRegions[rpcCtx.Addr] + if !ok { + // Should never happen + log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr)) } - if rpcCtx == nil { - // The region info is invalid. Retry the span. - log.Info("cannot get rpcCtx, retry span", + } else { + // when a new stream is established, always create a new pending + // regions map, the old map will be used in old `receiveFromStream` + // and won't be deleted until that goroutine exits. + pendingRegions = newSyncRegionFeedStateMap() + storePendingRegions[rpcCtx.Addr] = pendingRegions + storeID := rpcCtx.Peer.GetStoreId() + log.Info("creating new stream to store to send request", + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("requestID", requestID), + zap.Uint64("storeID", storeID), + zap.String("addr", rpcCtx.Addr)) + streamCtx, streamCancel := context.WithCancel(ctx) + _ = streamCancel // to avoid possible context leak warning from govet + stream, err = s.client.newStream(streamCtx, rpcCtx.Addr, storeID) + if err != nil { + // if get stream failed, maybe the store is down permanently, we should try to relocate the active store + log.Warn("get grpc stream client failed", zap.Uint64("regionID", sri.verID.GetID()), - zap.Stringer("span", sri.span)) + zap.Uint64("requestID", requestID), + zap.Uint64("storeID", storeID), + zap.String("error", err.Error())) + if cerror.ErrVersionIncompatible.Equal(err) { + // It often occurs on rolling update. Sleep 20s to reduce logs. + delay := 20 * time.Second + failpoint.Inject("kvClientDelayWhenIncompatible", func() { + delay = 100 * time.Millisecond + }) + time.Sleep(delay) + } + bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) + s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) err = s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: sri, - err: &rpcCtxUnavailableErr{ - verID: sri.verID, - }, - }) + err: &connectToStoreErr{}, + }, false /* revokeToken */) if err != nil { return errors.Trace(err) } - continue MainLoop - } - sri.rpcCtx = rpcCtx - - requestID := allocID() - - extraOp := kvrpcpb.ExtraOp_Noop - if s.enableOldValue { - extraOp = kvrpcpb.ExtraOp_ReadOldValue + continue } + s.addStream(rpcCtx.Addr, stream, streamCancel) - regionID := rpcCtx.Meta.GetId() - req := &cdcpb.ChangeDataRequest{ - Header: &cdcpb.Header{ - ClusterId: s.client.clusterID, - TicdcVersion: version.ReleaseSemver(), - }, - RegionId: regionID, - RequestId: requestID, - RegionEpoch: rpcCtx.Meta.RegionEpoch, - CheckpointTs: sri.ts, - StartKey: sri.span.Start, - EndKey: sri.span.End, - ExtraOp: extraOp, - } + limiter := s.client.getRegionLimiter(regionID) + g.Go(func() error { + if !s.enableKVClientV2 { + return s.receiveFromStream(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) + } + return s.receiveFromStreamV2(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) + }) + } - failpoint.Inject("kvClientPendingRegionDelay", nil) + state := newRegionFeedState(sri, requestID) + pendingRegions.insert(requestID, state) - // each TiKV store has an independent pendingRegions. - var pendingRegions *syncRegionFeedStateMap + logReq := log.Debug + if s.isPullerInit.IsInitialized() { + logReq = log.Info + } + logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) - stream, ok := s.getStream(rpcCtx.Addr) - if ok { - var ok bool - pendingRegions, ok = storePendingRegions[rpcCtx.Addr] - if !ok { - // Should never happen - log.Panic("pending regions is not found for store", zap.String("store", rpcCtx.Addr)) - } - } else { - // when a new stream is established, always create a new pending - // regions map, the old map will be used in old `receiveFromStream` - // and won't be deleted until that goroutine exits. - pendingRegions = newSyncRegionFeedStateMap() - storePendingRegions[rpcCtx.Addr] = pendingRegions - storeID := rpcCtx.Peer.GetStoreId() - log.Info("creating new stream to store to send request", - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Uint64("storeID", storeID), - zap.String("addr", rpcCtx.Addr)) - streamCtx, streamCancel := context.WithCancel(ctx) - _ = streamCancel // to avoid possible context leak warning from govet - stream, err = s.client.newStream(streamCtx, rpcCtx.Addr, storeID) - if err != nil { - // if get stream failed, maybe the store is down permanently, we should try to relocate the active store - log.Warn("get grpc stream client failed", - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Uint64("storeID", storeID), - zap.String("error", err.Error())) - if cerror.ErrVersionIncompatible.Equal(err) { - // It often occurs on rolling update. Sleep 20s to reduce logs. - delay := 20 * time.Second - failpoint.Inject("kvClientDelayWhenIncompatible", func() { - delay = 100 * time.Millisecond - }) - time.Sleep(delay) - } - bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - s.client.regionCache.OnSendFail(bo, rpcCtx, regionScheduleReload, err) - continue - } - s.addStream(rpcCtx.Addr, stream, streamCancel) + err = stream.Send(req) - limiter := s.client.getRegionLimiter(regionID) - g.Go(func() error { - if !s.enableKVClientV2 { - return s.receiveFromStream(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) - } - return s.receiveFromStreamV2(ctx, g, rpcCtx.Addr, getStoreID(rpcCtx), stream, pendingRegions, limiter) - }) + // If Send error, the receiver should have received error too or will receive error soon. So we doesn't need + // to do extra work here. + if err != nil { + log.Error("send request to stream failed", + zap.String("addr", rpcCtx.Addr), + zap.Uint64("storeID", getStoreID(rpcCtx)), + zap.Uint64("regionID", sri.verID.GetID()), + zap.Uint64("requestID", requestID), + zap.Error(err)) + err1 := stream.CloseSend() + if err1 != nil { + log.Error("failed to close stream", zap.Error(err1)) } - - state := newRegionFeedState(sri, requestID) - pendingRegions.insert(requestID, state) - - logReq := log.Debug - if s.isPullerInit.IsInitialized() { - logReq = log.Info + // Delete the stream from the map so that the next time the store is accessed, the stream will be + // re-established. + s.deleteStream(rpcCtx.Addr) + // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is + // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all + // pending regions, the new pending regions that are requested after reconnecting won't be stopped + // incorrectly. + delete(storePendingRegions, rpcCtx.Addr) + + // Remove the region from pendingRegions. If it's already removed, it should be already retried by + // `receiveFromStream`, so no need to retry here. + _, ok := pendingRegions.take(requestID) + if !ok { + continue } - logReq("start new request", zap.Reflect("request", req), zap.String("addr", rpcCtx.Addr)) - err = stream.Send(req) - - // If Send error, the receiver should have received error too or will receive error soon. So we doesn't need - // to do extra work here. + // Wait for a while and retry sending the request + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + err = s.onRegionFail(ctx, regionErrorInfo{ + singleRegionInfo: sri, + err: &sendRequestToStoreErr{}, + }, false /* revokeToken */) if err != nil { + return errors.Trace(err) + } + } else { + s.regionRouter.Acquire(rpcCtx.Addr) + } + } +} - log.Error("send request to stream failed", - zap.String("addr", rpcCtx.Addr), - zap.Uint64("storeID", getStoreID(rpcCtx)), - zap.Uint64("regionID", sri.verID.GetID()), - zap.Uint64("requestID", requestID), - zap.Error(err)) - err1 := stream.CloseSend() - if err1 != nil { - log.Error("failed to close stream", zap.Error(err1)) - } - // Delete the stream from the map so that the next time the store is accessed, the stream will be - // re-established. - s.deleteStream(rpcCtx.Addr) - // Delete `pendingRegions` from `storePendingRegions` so that the next time a region of this store is - // requested, it will create a new one. So if the `receiveFromStream` goroutine tries to stop all - // pending regions, the new pending regions that are requested after reconnecting won't be stopped - // incorrectly. - delete(storePendingRegions, rpcCtx.Addr) - - // Remove the region from pendingRegions. If it's already removed, it should be already retried by - // `receiveFromStream`, so no need to retry here. - _, ok := pendingRegions.take(requestID) - if !ok { - break - } +// dispatchRequest manages a set of streams and dispatch event feed requests +// to these streams. Streams to each store will be created on need. After +// establishing new stream, a goroutine will be spawned to handle events from +// the stream. +// Regions from `regionCh` will be connected. If any error happens to a +// region, the error will be send to `errCh` and the receiver of `errCh` is +// responsible for handling the error. +func (s *eventFeedSession) dispatchRequest( + ctx context.Context, + g *errgroup.Group, +) error { + for { + // Note that when a region is received from the channel, it's range has been already locked. + var sri singleRegionInfo + select { + case <-ctx.Done(): + return ctx.Err() + case sri = <-s.regionCh: + s.regionChSizeGauge.Dec() + } - // Wait for a while and retry sending the request - time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) - // Break if ctx has been canceled. - select { - case <-ctx.Done(): - return ctx.Err() - default: - } + log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID())) - continue + rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID) + if err != nil { + return errors.Trace(err) + } + if rpcCtx == nil { + // The region info is invalid. Retry the span. + log.Info("cannot get rpcCtx, retry span", + zap.Uint64("regionID", sri.verID.GetID()), + zap.Stringer("span", sri.span)) + err = s.onRegionFail(ctx, regionErrorInfo{ + singleRegionInfo: sri, + err: &rpcCtxUnavailableErr{ + verID: sri.verID, + }, + }, false /* revokeToken */) + if err != nil { + return errors.Trace(err) } - - break + continue } + sri.rpcCtx = rpcCtx + s.regionRouter.AddRegion(sri) } } @@ -933,7 +978,8 @@ func (s *eventFeedSession) partialRegionFeed( }() ts := state.sri.ts - maxTs, err := s.singleEventFeed(ctx, state.sri.verID.GetID(), state.sri.span, state.sri.ts, receiver) + maxTs, err := s.singleEventFeed(ctx, state.sri.verID.GetID(), state.sri.span, + state.sri.ts, state.sri.rpcCtx.Addr, receiver) log.Debug("singleEventFeed quit") if err == nil || errors.Cause(err) == context.Canceled { @@ -994,10 +1040,11 @@ func (s *eventFeedSession) partialRegionFeed( } } + revokeToken := !state.initialized return s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: err, - }) + }, revokeToken) } // divideAndSendEventFeedToRegions split up the input span into spans aligned @@ -1114,6 +1161,10 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI metricFeedRPCCtxUnavailable.Inc() s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.ts) return nil + case *connectToStoreErr: + metricConnectToStoreErr.Inc() + case *sendRequestToStoreErr: + metricStoreSendRequestErr.Inc() default: bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) if errInfo.rpcCtx.Meta != nil { @@ -1157,7 +1208,7 @@ func (s *eventFeedSession) receiveFromStream( err := s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(), - }) + }, true /* revokeToken */) if err != nil { // The only possible is that the ctx is cancelled. Simply return. return @@ -1360,6 +1411,7 @@ func (s *eventFeedSession) singleEventFeed( regionID uint64, span regionspan.ComparableSpan, startTs uint64, + storeAddr string, receiverCh <-chan *regionEvent, ) (uint64, error) { captureAddr := util.CaptureAddrFromCtx(ctx) @@ -1500,12 +1552,13 @@ func (s *eventFeedSession) singleEventFeed( switch entry.Type { case cdcpb.Event_INITIALIZED: if time.Since(startFeedTime) > 20*time.Second { - log.Warn("The time cost of initializing is too mush", + log.Warn("The time cost of initializing is too much", zap.Duration("timeCost", time.Since(startFeedTime)), zap.Uint64("regionID", regionID)) } metricPullEventInitializedCounter.Inc() initialized = true + s.regionRouter.Release(storeAddr) cachedEvents := matcher.matchCachedRow() for _, cachedEvent := range cachedEvents { revent, err := assembleRowEvent(regionID, cachedEvent, s.enableOldValue) @@ -1675,6 +1728,14 @@ func (e *rpcCtxUnavailableErr) Error() string { e.verID.GetID(), e.verID.GetVer(), e.verID.GetConfVer()) } +type connectToStoreErr struct{} + +func (e *connectToStoreErr) Error() string { return "connect to store error" } + +type sendRequestToStoreErr struct{} + +func (e *sendRequestToStoreErr) Error() string { return "send request to store error" } + func getStoreID(rpcCtx *tikv.RPCContext) uint64 { if rpcCtx != nil && rpcCtx.Peer != nil { return rpcCtx.Peer.GetStoreId() diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 3ad3a6d18c5..b3755f6bc7d 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -1479,7 +1479,7 @@ ReceiveLoop: } } -// TestStreamSendWithErrorNormal mainly tests the scenario that the `Recv` call +// TestStreamRecvWithErrorNormal mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets a **logical related** error, and kv client // logs the error and re-establish new request. func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { @@ -1497,7 +1497,7 @@ func (s *etcdSuite) TestStreamRecvWithErrorNormal(c *check.C) { s.testStreamRecvWithError(c, "1*return(\"injected stream recv error\")") } -// TestStreamSendWithErrorIOEOF mainly tests the scenario that the `Recv` call +// TestStreamRecvWithErrorIOEOF mainly tests the scenario that the `Recv` call // of a gPRC stream in kv client meets error io.EOF, and kv client logs the error // and re-establish new request func (s *etcdSuite) TestStreamRecvWithErrorIOEOF(c *check.C) { @@ -2422,7 +2422,7 @@ func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { c.Assert(sri.span.String(), check.Equals, "[61, 63)") c.Assert(sri2.ts, check.Equals, uint64(2000)) c.Assert(sri2.span.String(), check.Equals, "[61, 62)") - c.Assert(sri2.rpcCtx, check.IsNil) + c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{}) } // TestResolveLockNoCandidate tests the resolved ts manager can work normally diff --git a/cdc/kv/client_v2.go b/cdc/kv/client_v2.go index 4eb18d121fa..3a792581471 100644 --- a/cdc/kv/client_v2.go +++ b/cdc/kv/client_v2.go @@ -178,7 +178,7 @@ func (s *eventFeedSession) receiveFromStreamV2( err := s.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: cerror.ErrPendingRegionCancel.GenWithStackByArgs(), - }) + }, false /* initialized */) if err != nil { // The only possible is that the ctx is cancelled. Simply return. return diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index bacfe8d0489..b9fe36c12c3 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -67,6 +67,13 @@ var ( Name: "channel_size", Help: "size of each channel in kv client", }, []string{"id", "channel"}) + clientRegionTokenSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "region_token", + Help: "size of region token in kv client", + }, []string{"store", "table", "changefeed"}) batchResolvedEventSize = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -93,6 +100,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(pullEventCounter) registry.MustRegister(sendEventCounter) registry.MustRegister(clientChannelSize) + registry.MustRegister(clientRegionTokenSize) registry.MustRegister(batchResolvedEventSize) registry.MustRegister(etcdRequestCounter) } diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 8e2dd09c121..7ab47f67220 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -199,10 +199,11 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s } }) + revokeToken := !state.initialized return w.session.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: state.sri, err: err, - }) + }, revokeToken) } func (w *regionWorker) checkUnInitRegions(ctx context.Context) error { @@ -452,7 +453,7 @@ func (w *regionWorker) handleEventEntry( switch entry.Type { case cdcpb.Event_INITIALIZED: if time.Since(state.startFeedTime) > 20*time.Second { - log.Warn("The time cost of initializing is too mush", + log.Warn("The time cost of initializing is too much", zap.Duration("timeCost", time.Since(state.startFeedTime)), zap.Uint64("regionID", regionID)) } @@ -468,7 +469,11 @@ func (w *regionWorker) handleEventEntry( metricPullEventInitializedCounter.Inc() state.initialized = true +<<<<<<< HEAD +======= + w.session.regionRouter.Release(state.sri.rpcCtx.Addr) +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) cachedEvents := state.matcher.matchCachedRow() for _, cachedEvent := range cachedEvents { revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue) @@ -605,13 +610,14 @@ func (w *regionWorker) evictAllRegions(ctx context.Context) error { if state.lastResolvedTs > singleRegionInfo.ts { singleRegionInfo.ts = state.lastResolvedTs } + revokeToken := !state.initialized state.lock.Unlock() err = w.session.onRegionFail(ctx, regionErrorInfo{ singleRegionInfo: singleRegionInfo, err: &rpcCtxUnavailableErr{ verID: singleRegionInfo.verID, }, - }) + }, revokeToken) return err == nil }) } diff --git a/cdc/kv/token_region.go b/cdc/kv/token_region.go new file mode 100644 index 00000000000..fee545ce375 --- /dev/null +++ b/cdc/kv/token_region.go @@ -0,0 +1,163 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + // buffer size for ranged region consumer + regionRouterChanSize = 16 + // sizedRegionRouter checks region buffer every 100ms + sizedRegionCheckInterval = 100 * time.Millisecond +) + +// LimitRegionRouter defines an interface that can buffer singleRegionInfo +// and provide token based consumption +type LimitRegionRouter interface { + // Chan returns a singleRegionInfo channel that can be consumed from + Chan() <-chan singleRegionInfo + // AddRegion adds an singleRegionInfo to buffer, this function is thread-safe + AddRegion(task singleRegionInfo) + // Acquire acquires one token + Acquire(id string) + // Release gives back one token, this function is thread-safe + Release(id string) + // Run runs in background and does some logic work + Run(ctx context.Context) error +} + +type srrMetrics struct { + changefeed string + table string + tokens map[string]prometheus.Gauge +} + +func newSrrMetrics(ctx context.Context) *srrMetrics { + changefeed := util.ChangefeedIDFromCtx(ctx) + _, table := util.TableIDFromCtx(ctx) + return &srrMetrics{ + changefeed: changefeed, + table: table, + tokens: make(map[string]prometheus.Gauge), + } +} + +type sizedRegionRouter struct { + buffer map[string][]singleRegionInfo + output chan singleRegionInfo + lock sync.Mutex + metrics *srrMetrics + tokens map[string]int + sizeLimit int +} + +// NewSizedRegionRouter creates a new sizedRegionRouter +func NewSizedRegionRouter(ctx context.Context, sizeLimit int) *sizedRegionRouter { + return &sizedRegionRouter{ + buffer: make(map[string][]singleRegionInfo), + output: make(chan singleRegionInfo, regionRouterChanSize), + sizeLimit: sizeLimit, + tokens: make(map[string]int), + metrics: newSrrMetrics(ctx), + } +} + +func (r *sizedRegionRouter) Chan() <-chan singleRegionInfo { + return r.output +} + +func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) { + r.lock.Lock() + var id string + // if rpcCtx is not provided, use the default "" bucket + if sri.rpcCtx != nil { + id = sri.rpcCtx.Addr + } + if r.sizeLimit > r.tokens[id] && len(r.output) < regionRouterChanSize { + r.output <- sri + } else { + r.buffer[id] = append(r.buffer[id], sri) + } + r.lock.Unlock() +} + +func (r *sizedRegionRouter) Acquire(id string) { + r.lock.Lock() + defer r.lock.Unlock() + r.tokens[id]++ + if _, ok := r.metrics.tokens[id]; !ok { + r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed) + } + r.metrics.tokens[id].Inc() +} + +func (r *sizedRegionRouter) Release(id string) { + r.lock.Lock() + defer r.lock.Unlock() + r.tokens[id]-- + if _, ok := r.metrics.tokens[id]; !ok { + r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.table, r.metrics.changefeed) + } + r.metrics.tokens[id].Dec() +} + +func (r *sizedRegionRouter) Run(ctx context.Context) error { + ticker := time.NewTicker(sizedRegionCheckInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-ticker.C: + r.lock.Lock() + for id, buf := range r.buffer { + available := r.sizeLimit - r.tokens[id] + // the tokens used could be more then size limit, since we have + // a sized channel as level1 cache + if available <= 0 { + continue + } + if available > len(buf) { + available = len(buf) + } + // to avoid deadlock because when consuming from the output channel. + // onRegionFail could decrease tokens, which requires lock protection. + if available > regionRouterChanSize-len(r.output) { + available = regionRouterChanSize - len(r.output) + } + if available == 0 { + continue + } + for i := 0; i < available; i++ { + select { + case <-ctx.Done(): + r.lock.Unlock() + return errors.Trace(ctx.Err()) + case r.output <- buf[i]: + } + } + r.buffer[id] = r.buffer[id][available:] + } + r.lock.Unlock() + } + } +} diff --git a/cdc/kv/token_region_test.go b/cdc/kv/token_region_test.go new file mode 100644 index 00000000000..8545ccdf721 --- /dev/null +++ b/cdc/kv/token_region_test.go @@ -0,0 +1,181 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/store/tikv" + "golang.org/x/sync/errgroup" +) + +type tokenRegionSuite struct { +} + +var _ = check.Suite(&tokenRegionSuite{}) + +func (s *tokenRegionSuite) TestRouter(c *check.C) { + defer testleak.AfterTest(c)() + store := "store-1" + limit := 10 + r := NewSizedRegionRouter(context.Background(), limit) + for i := 0; i < limit; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + regions := make([]singleRegionInfo, 0, limit) + // limit is less than regionScanLimitPerTable + for i := 0; i < limit; i++ { + select { + case sri := <-r.Chan(): + c.Assert(sri.ts, check.Equals, uint64(i)) + r.Acquire(store) + regions = append(regions, sri) + default: + c.Error("expect region info from router") + } + } + c.Assert(r.tokens[store], check.Equals, limit) + for range regions { + r.Release(store) + } + c.Assert(r.tokens[store], check.Equals, 0) +} + +func (s *tokenRegionSuite) TestRouterWithFastConsumer(c *check.C) { + defer testleak.AfterTest(c)() + s.testRouterWithConsumer(c, func() {}) +} + +func (s *tokenRegionSuite) TestRouterWithSlowConsumer(c *check.C) { + defer testleak.AfterTest(c)() + s.testRouterWithConsumer(c, func() { time.Sleep(time.Millisecond * 15) }) +} + +func (s *tokenRegionSuite) testRouterWithConsumer(c *check.C, funcDoSth func()) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + store := "store-1" + limit := 20 + r := NewSizedRegionRouter(context.Background(), limit) + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + received := uint64(0) + for i := 0; i < regionRouterChanSize; i++ { + <-r.Chan() + atomic.AddUint64(&received, 1) + r.Acquire(store) + } + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return r.Run(ctx) + }) + + wg.Go(func() error { + for i := 0; i < regionRouterChanSize; i++ { + r.Release(store) + } + return nil + }) + + wg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.Chan(): + r.Acquire(store) + atomic.AddUint64(&received, 1) + r.Release(store) + funcDoSth() + if atomic.LoadUint64(&received) == uint64(limit*4) { + cancel() + } + } + } + }) + + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + + err := wg.Wait() + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + c.Assert(r.tokens[store], check.Equals, 0) +} + +func (s *tokenRegionSuite) TestRouterWithMultiStores(c *check.C) { + defer testleak.AfterTest(c)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + storeN := 5 + stores := make([]string, 0, storeN) + for i := 0; i < storeN; i++ { + stores = append(stores, fmt.Sprintf("store-%d", i)) + } + limit := 20 + r := NewSizedRegionRouter(context.Background(), limit) + + for _, store := range stores { + for j := 0; j < limit*2; j++ { + r.AddRegion(singleRegionInfo{ts: uint64(j), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + } + received := uint64(0) + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return r.Run(ctx) + }) + + for _, store := range stores { + store := store + wg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-r.Chan(): + r.Acquire(store) + atomic.AddUint64(&received, 1) + r.Release(store) + if atomic.LoadUint64(&received) == uint64(limit*4*storeN) { + cancel() + } + } + } + }) + } + + for _, store := range stores { + for i := 0; i < limit*2; i++ { + r.AddRegion(singleRegionInfo{ts: uint64(i), rpcCtx: &tikv.RPCContext{Addr: store}}) + } + } + + err := wg.Wait() + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + for _, store := range stores { + c.Assert(r.tokens[store], check.Equals, 0) + } +} diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 6a2a0b6da5a..504ca22da14 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -77,8 +77,14 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error { enableOldValue := ctx.Vars().Config.EnableOldValue ctxC, cancel := stdContext.WithCancel(ctx.StdContext()) ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName) +<<<<<<< HEAD plr := puller.NewPuller(ctxC, ctx.Vars().PDClient, n.credential, n.kvStorage, n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, enableOldValue) +======= + ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) + plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage, + n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue) +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) n.wg.Go(func() error { ctx.Throw(errors.Trace(plr.Run(ctxC))) return nil diff --git a/cmd/server_test.go b/cmd/server_test.go index ef0bc440cc9..99b4dec8538 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -117,6 +117,14 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { CertAllowedCN: []string{"dd", "ee"}, }, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M +<<<<<<< HEAD +======= + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + }, +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) }) // test decode config file @@ -169,6 +177,14 @@ sort-dir = "/tmp/just_a_test" }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M +<<<<<<< HEAD +======= + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + }, +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) }) configContent = configContent + ` @@ -222,5 +238,13 @@ cert-allowed-cn = ["dd","ee"] CertAllowedCN: []string{"dd", "ee"}, }, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M +<<<<<<< HEAD +======= + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + RegionScanLimit: 40, + }, +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) }) } diff --git a/pkg/config/config.go b/pkg/config/config.go index ba7c7ad8f9a..a4641e5aa00 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -157,6 +157,14 @@ var defaultServerConfig = &ServerConfig{ }, Security: &SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20MB +<<<<<<< HEAD +======= + KVClient: &KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, // 0 will use NumCPU() * 2 + RegionScanLimit: 40, + }, +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) } // ServerConfig represents a config for server @@ -283,6 +291,16 @@ func (c *ServerConfig) ValidateAndAdjust() error { return cerror.ErrInvalidServerOption.GenWithStackByArgs("per-table-memory-quota should be at least 6MB") } + if c.KVClient == nil { + c.KVClient = defaultServerConfig.KVClient + } + if c.KVClient.WorkerConcurrent <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + } + if c.KVClient.RegionScanLimit <= 0 { + return cerror.ErrInvalidServerOption.GenWithStackByArgs("region-scan-limit should be at least 1") + } + return nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 0c5869ea8ca..70461f10214 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -86,9 +86,16 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { conf.Sorter.ChunkSizeLimit = 999 b, err := conf.Marshal() c.Assert(err, check.IsNil) +<<<<<<< HEAD c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) conf2 := new(ServerConfig) err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) +======= + + c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`) + conf2 := new(ServerConfig) + err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}`)) +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } diff --git a/pkg/filter/errors.go b/pkg/filter/errors.go index 4cd65bfb67b..baac913b751 100644 --- a/pkg/filter/errors.go +++ b/pkg/filter/errors.go @@ -13,6 +13,7 @@ package filter +<<<<<<< HEAD:pkg/filter/errors.go import ( "github.com/pingcap/errors" cerror "github.com/pingcap/ticdc/pkg/errors" @@ -22,4 +23,14 @@ import ( // to retry on this error func ChangefeedFastFailError(err error) bool { return cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err)) +======= +// KVClientConfig represents config for kv client +type KVClientConfig struct { + // how many workers will be used for a single region worker + WorkerConcurrent int `toml:"worker-concurrent" json:"worker-concurrent"` + // background workerpool size, the workrpool is shared by all goroutines in cdc server + WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` + // region incremental scan limit for one table in a single store + RegionScanLimit int `toml:"region-scan-limit" json:"region-scan-limit"` +>>>>>>> f7ab5ba4 (kv/client: add incremental scan region count limit (#1899)):pkg/config/kvclient.go } diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index 5b9ca3189fc..128a6404b53 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=60000 +recordcount=6000 operationcount=0 workload=core