diff --git a/pkg/core/region.go b/pkg/core/region.go index 41bfb4d31ad..713e82cc36d 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -856,6 +856,24 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } } +// RegionHeartbeatStageName is the name of the stage of the region heartbeat. +const ( + HandleStatsAsync = "HandleStatsAsync" + ObserveRegionStatsAsync = "ObserveRegionStatsAsync" + UpdateSubTree = "UpdateSubTree" + HandleOverlaps = "HandleOverlaps" + CollectRegionStatsAsync = "CollectRegionStatsAsync" + SaveRegionToKV = "SaveRegionToKV" +) + +// ExtraTaskOpts returns the task options for the task. +func ExtraTaskOpts(ctx *MetaProcessContext, name string) ratelimit.TaskOpts { + return ratelimit.TaskOpts{ + TaskName: name, + Limit: ctx.Limiter, + } +} + // RWLockStats is a read-write lock with statistics. type RWLockStats struct { syncutil.RWMutex @@ -1004,6 +1022,161 @@ func (r *RegionsInfo) AtomicCheckAndPutRegion(ctx *MetaProcessContext, region *R return overlaps, nil } +// CheckAndPutRootTree checks if the region is valid to put to the root, if valid then return error. +// Usually used with CheckAndPutSubTree together. +func (r *RegionsInfo) CheckAndPutRootTree(ctx *MetaProcessContext, region *RegionInfo) ([]*RegionInfo, error) { + tracer := ctx.Tracer + r.t.Lock() + var ols []*regionItem + origin := r.getRegionLocked(region.GetID()) + if origin == nil || !bytes.Equal(origin.GetStartKey(), region.GetStartKey()) || !bytes.Equal(origin.GetEndKey(), region.GetEndKey()) { + ols = r.tree.overlaps(®ionItem{RegionInfo: region}) + } + tracer.OnCheckOverlapsFinished() + err := check(region, origin, convertItemsToRegions(ols)) + if err != nil { + r.t.Unlock() + tracer.OnValidateRegionFinished() + return nil, err + } + tracer.OnValidateRegionFinished() + _, overlaps, _ := r.setRegionLocked(region, true, ols...) + r.t.Unlock() + tracer.OnSetRegionFinished() + return overlaps, nil +} + +// CheckAndPutSubTree checks if the region is valid to put to the sub tree, if valid then return error. +// Usually used with CheckAndPutRootTree together. +func (r *RegionsInfo) CheckAndPutSubTree(region *RegionInfo) { + // new region get from root tree again + var newRegion *RegionInfo + newRegion = r.GetRegion(region.GetID()) + if newRegion == nil { + newRegion = region + } + r.UpdateSubTreeOrderInsensitive(newRegion) +} + +// UpdateSubTreeOrderInsensitive updates the subtree. +// It's can used to update the subtree concurrently. +// because it can use concurrently, check region version to make sure the order. +// 1. if the version is stale, drop this update. +// 2. if the version is same, then only some statistic info need to be updated. +// in this situation, the order of update is not important. +// +// in another hand, the overlap regions need re-check, because the region tree and the subtree update is not atomic. +func (r *RegionsInfo) UpdateSubTreeOrderInsensitive(region *RegionInfo) { + var origin *RegionInfo + r.st.Lock() + defer r.st.Unlock() + originItem, ok := r.subRegions[region.GetID()] + if ok { + origin = originItem.RegionInfo + } + rangeChanged := true + + if origin != nil { + re := region.GetRegionEpoch() + oe := origin.GetRegionEpoch() + isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm() + if (isTermBehind || re.GetVersion() < oe.GetVersion() || re.GetConfVer() < oe.GetConfVer()) && !region.isRegionRecreated() { + // Region meta is stale, skip. + return + } + rangeChanged = !origin.rangeEqualsTo(region) + + if rangeChanged || !origin.peersEqualTo(region) { + // If the range or peers have changed, the sub regionTree needs to be cleaned up. + // TODO: Improve performance by deleting only the different peers. + r.removeRegionFromSubTreeLocked(origin) + } else { + // The region tree and the subtree update is not atomic and the region tree is updated first. + // If there are two thread needs to update region tree, + // t1: thread-A update region tree + // t2: thread-B: update region tree again + // t3: thread-B: update subtree + // t4: thread-A: update region subtree + // to keep region tree consistent with subtree, we need to drop this update. + if tree, ok := r.subRegions[region.GetID()]; ok { + r.updateSubTreeStat(origin, region) + tree.RegionInfo = region + } + return + } + } + + if rangeChanged { + overlaps := r.getOverlapRegionFromSubTreeLocked(region) + for _, re := range overlaps { + r.removeRegionFromSubTreeLocked(re) + } + } + + item := ®ionItem{region} + r.subRegions[region.GetID()] = item + // It has been removed and all information needs to be updated again. + // Set peers then. + setPeer := func(peersMap map[uint64]*regionTree, storeID uint64, item *regionItem) { + store, ok := peersMap[storeID] + if !ok { + store = newRegionTree() + peersMap[storeID] = store + } + store.update(item, false) + } + + // Add to leaders and followers. + for _, peer := range region.GetVoters() { + storeID := peer.GetStoreId() + if peer.GetId() == region.leader.GetId() { + // Add leader peer to leaders. + setPeer(r.leaders, storeID, item) + } else { + // Add follower peer to followers. + setPeer(r.followers, storeID, item) + } + } + + setPeers := func(peersMap map[uint64]*regionTree, peers []*metapb.Peer) { + for _, peer := range peers { + storeID := peer.GetStoreId() + setPeer(peersMap, storeID, item) + } + } + // Add to learners. + setPeers(r.learners, region.GetLearners()) + // Add to witnesses. + setPeers(r.witnesses, region.GetWitnesses()) + // Add to PendingPeers + setPeers(r.pendingPeers, region.GetPendingPeers()) +} + +func (r *RegionsInfo) getOverlapRegionFromSubTreeLocked(region *RegionInfo) []*RegionInfo { + it := ®ionItem{RegionInfo: region} + overlaps := make([]*RegionInfo, 0) + overlapsMap := make(map[uint64]struct{}) + collectFromItemSlice := func(peersMap map[uint64]*regionTree, storeID uint64) { + if tree, ok := peersMap[storeID]; ok { + items := tree.overlaps(it) + for _, item := range items { + if _, ok := overlapsMap[item.GetID()]; !ok { + overlapsMap[item.GetID()] = struct{}{} + overlaps = append(overlaps, item.RegionInfo) + } + } + } + } + for _, peer := range region.GetMeta().GetPeers() { + storeID := peer.GetStoreId() + collectFromItemSlice(r.leaders, storeID) + collectFromItemSlice(r.followers, storeID) + collectFromItemSlice(r.learners, storeID) + collectFromItemSlice(r.witnesses, storeID) + } + return overlaps +} + // GetRelevantRegions returns the relevant regions for a given region. func (r *RegionsInfo) GetRelevantRegions(region *RegionInfo) (origin *RegionInfo, overlaps []*RegionInfo) { r.t.RLock() diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index ae91886369f..88683968f3f 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -789,14 +789,15 @@ func randomBytes(n int) []byte { return bytes } -func newRegionInfoID(idAllocator id.Allocator) *RegionInfo { +func newRegionInfoIDRandom(idAllocator id.Allocator) *RegionInfo { var ( peers []*metapb.Peer leader *metapb.Peer ) + storeNum := 10 for i := 0; i < 3; i++ { id, _ := idAllocator.Alloc() - p := &metapb.Peer{Id: id, StoreId: id} + p := &metapb.Peer{Id: id, StoreId: uint64(i%storeNum + 1)} if i == 0 { leader = p } @@ -811,6 +812,8 @@ func newRegionInfoID(idAllocator id.Allocator) *RegionInfo { Peers: peers, }, leader, + SetApproximateSize(10), + SetApproximateKeys(10), ) } @@ -819,7 +822,7 @@ func BenchmarkAddRegion(b *testing.B) { idAllocator := mockid.NewIDAllocator() var items []*RegionInfo for i := 0; i < 10000000; i++ { - items = append(items, newRegionInfoID(idAllocator)) + items = append(items, newRegionInfoIDRandom(idAllocator)) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -858,3 +861,80 @@ func BenchmarkRegionFromHeartbeat(b *testing.B) { RegionFromHeartbeat(regionReq) } } + +func TestUpdateRegionEquivalence(t *testing.T) { + re := require.New(t) + regionsOld := NewRegionsInfo() + regionsNew := NewRegionsInfo() + storeNums := 5 + items := generateTestRegions(1000, storeNums) + + updateRegion := func(item *RegionInfo) { + // old way + ctx := ContextTODO() + regionsOld.AtomicCheckAndPutRegion(ctx, item) + // new way + ctx = ContextTODO() + regionsNew.CheckAndPutRootTree(ctx, item) + regionsNew.CheckAndPutSubTree(item) + } + checksEquivalence := func() { + re.Equal(regionsOld.GetRegionCount([]byte(""), []byte("")), regionsNew.GetRegionCount([]byte(""), []byte(""))) + re.Equal(regionsOld.GetRegionSizeByRange([]byte(""), []byte("")), regionsNew.GetRegionSizeByRange([]byte(""), []byte(""))) + checkRegions(re, regionsOld) + checkRegions(re, regionsNew) + + for i := 1; i <= storeNums; i++ { + re.Equal(regionsOld.GetStoreRegionCount(uint64(i)), regionsNew.GetStoreRegionCount(uint64(i))) + re.Equal(regionsOld.GetStoreLeaderCount(uint64(i)), regionsNew.GetStoreLeaderCount(uint64(i))) + re.Equal(regionsOld.GetStorePendingPeerCount(uint64(i)), regionsNew.GetStorePendingPeerCount(uint64(i))) + re.Equal(regionsOld.GetStoreLearnerRegionSize(uint64(i)), regionsNew.GetStoreLearnerRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreRegionSize(uint64(i)), regionsNew.GetStoreRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreLeaderRegionSize(uint64(i)), regionsNew.GetStoreLeaderRegionSize(uint64(i))) + re.Equal(regionsOld.GetStoreFollowerRegionSize(uint64(i)), regionsNew.GetStoreFollowerRegionSize(uint64(i))) + } + } + + // Add a region. + for _, item := range items { + updateRegion(item) + } + checksEquivalence() + + // Merge regions. + itemA, itemB := items[10], items[11] + itemMergedAB := itemA.Clone(WithEndKey(itemB.GetEndKey()), WithIncVersion()) + updateRegion(itemMergedAB) + checksEquivalence() + + // Split + itemA = itemA.Clone(WithIncVersion(), WithIncVersion()) + itemB = itemB.Clone(WithIncVersion(), WithIncVersion()) + updateRegion(itemA) + updateRegion(itemB) + checksEquivalence() +} + +func generateTestRegions(count int, storeNum int) []*RegionInfo { + var items []*RegionInfo + for i := 0; i < count; i++ { + peer1 := &metapb.Peer{StoreId: uint64(i%storeNum + 1), Id: uint64(i*storeNum + 1)} + peer2 := &metapb.Peer{StoreId: uint64((i+1)%storeNum + 1), Id: uint64(i*storeNum + 2)} + peer3 := &metapb.Peer{StoreId: uint64((i+2)%storeNum + 1), Id: uint64(i*storeNum + 3)} + if i%3 == 0 { + peer2.IsWitness = true + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer1, peer2, peer3}, + StartKey: []byte(fmt.Sprintf("%20d", i*10)), + EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + }, + peer1, + SetApproximateKeys(10), + SetApproximateSize(10)) + items = append(items, region) + } + return items +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 42e8c3a35cb..94b24f4ca16 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -590,10 +590,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -610,10 +607,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -632,16 +626,21 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // However, it can't solve the race condition of concurrent heartbeats from the same region. // Async task in next PR. - if overlaps, err = c.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) }, + ) + tracer.OnUpdateSubTreeFinished() + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -651,10 +650,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c // handle region stats ctx.TaskRunner.RunTask( ctx, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { cluster.Collect(c, region, hasRegionStats) }, diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go new file mode 100644 index 00000000000..3c5020554a8 --- /dev/null +++ b/pkg/ratelimit/metrics.go @@ -0,0 +1,52 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ratelimit + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +const nameStr = "runner_name" + +var ( + RunnerTaskMaxWaitingDuration = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_max_waiting_duration_seconds", + Help: "The duration of tasks waiting in the runner.", + }, []string{nameStr}) + + RunnerTaskPendingTasks = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_pending_tasks", + Help: "The number of pending tasks in the runner.", + }, []string{nameStr}) + RunnerTaskFailedTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_failed_tasks_total", + Help: "The number of failed tasks in the runner.", + }, []string{nameStr}) +) + +func init() { + prometheus.MustRegister(RunnerTaskMaxWaitingDuration) + prometheus.MustRegister(RunnerTaskPendingTasks) + prometheus.MustRegister(RunnerTaskFailedTasks) +} diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 7f0ef21f791..c4f2d5bc5ac 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -53,6 +54,8 @@ type ConcurrentRunner struct { pendingMu sync.Mutex stopChan chan struct{} wg sync.WaitGroup + failedTaskCount prometheus.Counter + maxWaitingDuration prometheus.Gauge } // NewConcurrentRunner creates a new ConcurrentRunner. @@ -62,6 +65,8 @@ func NewConcurrentRunner(name string, maxPendingDuration time.Duration) *Concurr maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), + failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), + maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } @@ -77,6 +82,7 @@ type TaskOpts struct { func (s *ConcurrentRunner) Start() { s.stopChan = make(chan struct{}) s.wg.Add(1) + ticker := time.NewTicker(5 * time.Second) go func() { defer s.wg.Done() for { @@ -92,8 +98,19 @@ func (s *ConcurrentRunner) Start() { go s.run(task.Ctx, task.f, nil) } case <-s.stopChan: + s.pendingMu.Lock() + s.pendingTasks = make([]*Task, 0, initialCapacity) + s.pendingMu.Unlock() log.Info("stopping async task runner", zap.String("name", s.name)) return + case <-ticker.C: + maxDuration := time.Duration(0) + s.pendingMu.Lock() + if len(s.pendingTasks) > 0 { + maxDuration = time.Since(s.pendingTasks[0].submittedAt) + } + s.pendingMu.Unlock() + s.maxWaitingDuration.Set(maxDuration.Seconds()) } } }() @@ -144,6 +161,7 @@ func (s *ConcurrentRunner) RunTask(ctx context.Context, opt TaskOpts, f func(con if len(s.pendingTasks) > 0 { maxWait := time.Since(s.pendingTasks[0].submittedAt) if maxWait > s.maxPendingDuration { + s.failedTaskCount.Inc() return ErrMaxWaitingTasksExceeded } } diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index 1f370176383..5a67a547483 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -52,7 +52,7 @@ const ( defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableHeartbeatBreakdownMetrics = true - defaultEnableHeartbeatConcurrentRunner = false + defaultEnableHeartbeatConcurrentRunner = true defaultEnableCrossTableMerge = true defaultEnableDiagnostic = true defaultStrictlyMatchLabel = false diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index dd59b63240f..dbc6a6cadf3 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1015,10 +1015,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleStatsAsync), func(_ context.Context) { cluster.HandleStatsAsync(c, region) }, @@ -1039,10 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "ObserveRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.ObserveRegionStatsAsync), func(_ context.Context) { if c.regionStats.RegionStatsNeedUpdate(region) { cluster.Collect(c, region, hasRegionStats) @@ -1065,17 +1059,23 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // check its validation again here. // // However, it can't solve the race condition of concurrent heartbeats from the same region. - if overlaps, err = c.core.AtomicCheckAndPutRegion(ctx, region); err != nil { + if overlaps, err = c.core.CheckAndPutRootTree(ctx, region); err != nil { tracer.OnSaveCacheFinished() return err } + ctx.TaskRunner.RunTask( + ctx, + core.ExtraTaskOpts(ctx, core.UpdateSubTree), + func(_ context.Context) { + c.CheckAndPutSubTree(region) + }, + ) + tracer.OnUpdateSubTreeFinished() + if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "HandleOverlaps", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.HandleOverlaps), func(_ context.Context) { cluster.HandleOverlaps(c, overlaps) }, @@ -1088,10 +1088,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // handle region stats ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "CollectRegionStatsAsync", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.CollectRegionStatsAsync), func(_ context.Context) { // TODO: Due to the accuracy requirements of the API "/regions/check/xxx", // region stats needs to be collected in API mode. @@ -1105,10 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio if saveKV { ctx.TaskRunner.RunTask( ctx.Context, - ratelimit.TaskOpts{ - TaskName: "SaveRegionToKV", - Limit: ctx.Limiter, - }, + core.ExtraTaskOpts(ctx, core.SaveRegionToKV), func(_ context.Context) { // If there are concurrent heartbeats from the same region, the last write will win even if // writes to storage in the critical area. So don't use mutex to protect it.