From 5cf2c43adc71cc287ed0a9e4fc861ede9edd5f44 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 11 Oct 2023 11:42:25 +0800 Subject: [PATCH 01/11] scheduler: fix scheduler save config (#7108) (#7164) close tikv/pd#6897 Signed-off-by: husharp Co-authored-by: husharp --- server/cluster/coordinator.go | 4 + server/cluster/coordinator_test.go | 3 +- server/schedule/scheduler.go | 13 ++- tests/server/api/testutil.go | 59 ++++++++++++++ tests/server/cluster/cluster_test.go | 118 +++++++++++++++++++++++++++ 5 files changed, 189 insertions(+), 8 deletions(-) create mode 100644 tests/server/api/testutil.go diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index a6b0783d93d..d62de672896 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -641,6 +641,10 @@ func (c *coordinator) addScheduler(scheduler schedule.Scheduler, args ...string) c.wg.Add(1) go c.runScheduler(s) c.schedulers[s.GetName()] = s + if err := schedule.SaveSchedulerConfig(c.cluster.storage, scheduler); err != nil { + log.Error("can not save scheduler config", zap.String("scheduler-name", scheduler.GetName()), errs.ZapError(err)) + return err + } c.cluster.opt.AddSchedulerCfg(s.GetType(), args) return nil } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 20ab1f4f8fa..c15302015f7 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -749,8 +749,9 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // whether the schedulers added or removed in dynamic way are recorded in opt _, newOpt, err := newTestScheduleConfig() c.Assert(err, IsNil) - _, err = schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) + shuffle, err := schedule.CreateScheduler(schedulers.ShuffleRegionType, oc, storage, schedule.ConfigJSONDecoder([]byte("null"))) c.Assert(err, IsNil) + c.Assert(co.addScheduler(shuffle), IsNil) // suppose we add a new default enable scheduler config.DefaultSchedulers = append(config.DefaultSchedulers, config.SchedulerConfig{Type: "shuffle-region"}) defer func() { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index d440326967c..7a5d3c02ad6 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -117,17 +117,16 @@ func CreateScheduler(typ string, opController *OperatorController, storage endpo if !ok { return nil, errs.ErrSchedulerCreateFuncNotRegistered.FastGenByArgs(typ) } + return fn(opController, storage, dec) +} - s, err := fn(opController, storage, dec) - if err != nil { - return nil, err - } +// SaveSchedulerConfig saves the config of the specified scheduler. +func SaveSchedulerConfig(storage endpoint.ConfigStorage, s Scheduler) error { data, err := s.EncodeConfig() if err != nil { - return nil, err + return err } - err = storage.SaveScheduleConfig(s.GetName(), data) - return s, err + return storage.SaveScheduleConfig(s.GetName(), data) } // FindSchedulerTypeByName finds the type of the specified name. diff --git a/tests/server/api/testutil.go b/tests/server/api/testutil.go new file mode 100644 index 00000000000..37f4be0bc16 --- /dev/null +++ b/tests/server/api/testutil.go @@ -0,0 +1,59 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/pingcap/check" +) + +const schedulersPrefix = "/pd/api/v1/schedulers" + +// dialClient used to dial http request. +var dialClient = &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, +} + +// MustAddScheduler adds a scheduler with HTTP API. +func MustAddScheduler( + c *check.C, serverAddr string, + schedulerName string, args map[string]interface{}, +) { + request := map[string]interface{}{ + "name": schedulerName, + } + for arg, val := range args { + request[arg] = val + } + data, err := json.Marshal(request) + c.Assert(err, check.IsNil) + + httpReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s%s", serverAddr, schedulersPrefix), bytes.NewBuffer(data)) + c.Assert(err, check.IsNil) + // Send request. + resp, err := dialClient.Do(httpReq) + c.Assert(err, check.IsNil) + defer resp.Body.Close() + _, err = io.ReadAll(resp.Body) + c.Assert(err, check.IsNil) + c.Assert(resp.StatusCode, check.Equals, http.StatusOK) +} diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index bbb6da37391..7acff3f2194 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -41,8 +41,10 @@ import ( "github.com/tikv/pd/server/id" syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" + "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/storage" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/server/api" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -1420,3 +1422,119 @@ func (s *clusterTestSuite) TestTransferLeaderBack(c *C) { c.Assert(rc.GetMetaCluster(), DeepEquals, meta) c.Assert(rc.GetStoreCount(), Equals, 3) } + +func (s *clusterTestSuite) TestTransferLeaderForScheduler(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker", `return(true)`), IsNil) + tc, err := tests.NewTestCluster(ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + // start + leaderServer := tc.GetServer(tc.GetLeader()) + c.Assert(leaderServer.BootstrapCluster(), IsNil) + rc := leaderServer.GetServer().GetRaftCluster() + c.Assert(rc, NotNil) + + storesNum := 2 + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + for i := 1; i <= storesNum; i++ { + store := &metapb.Store{ + Id: uint64(i), + Address: "127.0.0.1:" + strconv.Itoa(i), + } + resp, err := putStore(grpcPDClient, leaderServer.GetClusterID(), store) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError().GetType(), Equals, pdpb.ErrorType_OK) + } + // region heartbeat + id := leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc, id, 1) + + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + + // Add evict leader scheduler + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 1, + }) + api.MustAddScheduler(c, leaderServer.GetAddr(), schedulers.EvictLeaderName, map[string]interface{}{ + "store_id": 2, + }) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader to another PD + tc.ResignLeader() + rc.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc1 := leaderServer.GetServer().GetRaftCluster() + rc1.Start(leaderServer.GetServer()) + c.Assert(err, IsNil) + c.Assert(rc1, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc1, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated. + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + // transfer PD leader back to the previous PD + tc.ResignLeader() + rc1.Stop() + tc.WaitLeader() + leaderServer = tc.GetServer(tc.GetLeader()) + rc = leaderServer.GetServer().GetRaftCluster() + rc.Start(leaderServer.GetServer()) + c.Assert(rc, NotNil) + // region heartbeat + id = leaderServer.GetAllocator() + s.putRegionWithLeader(c, rc, id, 1) + time.Sleep(time.Second) + c.Assert(leaderServer.GetRaftCluster().IsPrepared(), IsTrue) + // Check scheduler updated + c.Assert(len(rc.GetSchedulers()), Equals, 5) + checkEvictLeaderSchedulerExist(c, rc, true) + checkEvictLeaderStoreIDs(c, rc, []uint64{1, 2}) + + c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/changeCoordinatorTicker"), IsNil) +} + +func checkEvictLeaderSchedulerExist(c *C, rc *cluster.RaftCluster, exist bool) { + isExistScheduler := func(rc *cluster.RaftCluster, name string) bool { + s := rc.GetSchedulers() + for _, scheduler := range s { + if scheduler == name { + return true + } + } + return false + } + + testutil.WaitUntil(c, func() bool { + return isExistScheduler(rc, schedulers.EvictLeaderName) == exist + }) +} + +func checkEvictLeaderStoreIDs(c *C, rc *cluster.RaftCluster, expected []uint64) { + handler, ok := rc.GetSchedulerHandlers()[schedulers.EvictLeaderName] + c.Assert(ok, IsTrue) + h, ok := handler.(interface { + EvictStoreIDs() []uint64 + }) + c.Assert(ok, IsTrue) + var evictStoreIDs []uint64 + testutil.WaitUntil(c, func() bool { + evictStoreIDs = h.EvictStoreIDs() + return len(evictStoreIDs) == len(expected) + }) +} From b092996ad7eed3e749522d10e9384c69fdd1792f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 22 Nov 2023 10:56:40 +0800 Subject: [PATCH 02/11] Scatter: make peer scatter logic same with the leader (#6965) (#7026) close tikv/pd#6962 In past, PD conside peer distribution in the different group influenece by using `TotalCountByStore` , but not include the leader distribution. The max used situation is partition table. After this pr, TIDB call scatter api will use same group not different. ref: https://github.com/tikv/pd/pull/3422 https://github.com/pingcap/tidb/pull/46156 Signed-off-by: ti-chi-bot Signed-off-by: bufferflies <1045931706@qq.com> Co-authored-by: buffer <1045931706@qq.com> Co-authored-by: bufferflies <1045931706@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/schedule/region_scatterer.go | 110 +++++++++-------------- server/schedule/region_scatterer_test.go | 50 ++--------- 2 files changed, 52 insertions(+), 108 deletions(-) diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index bf519b71bec..ae3d62e1731 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "time" "github.com/pingcap/errors" @@ -86,26 +87,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64, return s.getDistributionByGroupLocked(group) } -// TotalCountByStore counts the total count by store -func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 { - s.mu.RLock() - defer s.mu.RUnlock() - groups := s.groupDistribution.GetAllID() - totalCount := uint64(0) - for _, group := range groups { - storeDistribution, ok := s.getDistributionByGroupLocked(group) - if !ok { - continue - } - count, ok := storeDistribution[storeID] - if !ok { - continue - } - totalCount += count - } - return totalCount -} - // getDistributionByGroupLocked should be called with lock func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) { if result, ok := s.groupDistribution.Get(group); ok { @@ -309,6 +290,10 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer + filterLen := len(context.filters) + 2 + filters := make([]filter.Filter, filterLen) + copy(filters, context.filters) + filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores) for _, peer := range peers { if _, ok := selectedStores[peer.GetStoreId()]; ok { if allowLeader(oldFit, peer) { @@ -317,9 +302,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // It is both sourcePeer and targetPeer itself, no need to select. continue } + sourceStore := r.cluster.GetStore(peer.GetStoreId()) + if sourceStore == nil { + log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) + continue + } + filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore) for { - candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context) - newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context) + newPeer := r.selectNewPeer(context, group, peer, filters) targetPeers[newPeer.GetStoreId()] = newPeer selectedStores[newPeer.GetStoreId()] = struct{}{} // If the selected peer is a peer other than origin peer in this region, @@ -340,7 +330,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * // FIXME: target leader only considers the ordinary stores, maybe we need to consider the // special engine stores if the engine supports to become a leader. But now there is only // one engine, tiflash, which does not support the leader, so don't consider it for now. - targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) + targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine) if targetLeader == 0 { scatterCounter.WithLabelValues("no-leader", "").Inc() return nil @@ -373,6 +363,8 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * if op != nil { scatterCounter.WithLabelValues("success", "").Inc() r.Put(targetPeers, targetLeader, group) + op.AdditionalInfos["group"] = group + op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10) op.SetPriorityLevel(core.HighPriority) } return op @@ -404,24 +396,18 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb. return region.GetLeader().GetStoreId() == targetLeader } -func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 { - sourceStore := r.cluster.GetStore(sourceStoreID) - if sourceStore == nil { - log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore)) - return nil - } - filters := []filter.Filter{ - filter.NewExcludedFilter(r.name, nil, selectedStores), - } - scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore) - filters = append(filters, context.filters...) - filters = append(filters, scoreGuard) +// selectNewPeer return the new peer which pick the fewest picked count. +// it keeps the origin peer if the origin store's pick count is equal the fewest pick. +// it can be diveded into three steps: +// 1. found the max pick count and the min pick count. +// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer. +// 3. otherwise, select the store which pick count is the min pick count and pass all filter. +func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer { stores := r.cluster.GetStores() - candidates := make([]uint64, 0) maxStoreTotalCount := uint64(0) minStoreTotalCount := uint64(math.MaxUint64) - for _, store := range r.cluster.GetStores() { - count := context.selectedPeer.TotalCountByStore(store.GetID()) + for _, store := range stores { + count := context.selectedPeer.Get(store.GetID(), group) if count > maxStoreTotalCount { maxStoreTotalCount = count } @@ -429,42 +415,33 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI minStoreTotalCount = count } } + + var newPeer *metapb.Peer + minCount := uint64(math.MaxUint64) + originStorePickedCount := uint64(math.MaxUint64) for _, store := range stores { - storeCount := context.selectedPeer.TotalCountByStore(store.GetID()) + storeCount := context.selectedPeer.Get(store.GetID(), group) + if store.GetID() == peer.GetStoreId() { + originStorePickedCount = storeCount + } // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store // could be selected as candidate. if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { if filter.Target(r.cluster.GetOpts(), store, filters) { - candidates = append(candidates, store.GetID()) + if storeCount < minCount { + minCount = storeCount + newPeer = &metapb.Peer{ + StoreId: store.GetID(), + Role: peer.GetRole(), + } + } } } } - return candidates -} - -func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer { - if len(candidates) < 1 { + if originStorePickedCount <= minCount { return peer } - var newPeer *metapb.Peer - minCount := uint64(math.MaxUint64) - for _, storeID := range candidates { - count := context.selectedPeer.Get(storeID, group) - if count < minCount { - minCount = count - newPeer = &metapb.Peer{ - StoreId: storeID, - Role: peer.GetRole(), - } - } - } - // if the source store have the least count, we don't need to scatter this peer - for _, storeID := range candidates { - if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount { - return peer - } - } if newPeer == nil { return peer } @@ -473,11 +450,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto // selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines. -func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 { +func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, + leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) { sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId()) if sourceStore == nil { log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore)) - return 0 + return 0, 0 } minStoreGroupLeader := uint64(math.MaxUint64) id := uint64(0) @@ -492,7 +470,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core. id = storeID } } - return id + return id, minStoreGroupLeader } // Put put the final distribution in the context no matter the operator was created diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 36307741e23..b3f6ec8c5ff 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "math" - "math/rand" "strconv" "testing" "time" @@ -485,47 +484,11 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) { c.Assert(ok, IsFalse) } -// TestRegionFromDifferentGroups test the multi regions. each region have its own group. -// After scatter, the distribution for the whole cluster should be well. -func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - opt := config.NewTestOptions() - tc := mockcluster.NewCluster(ctx, opt) - stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) - oc := NewOperatorController(ctx, tc, stream) - // Add 6 stores. - storeCount := 6 - for i := uint64(1); i <= uint64(storeCount); i++ { - tc.AddRegionStore(i, 0) - } - scatterer := NewRegionScatterer(ctx, tc, oc) - regionCount := 50 - for i := 1; i <= regionCount; i++ { - p := rand.Perm(storeCount) - scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i)) - } - check := func(ss *selectedStores) { - max := uint64(0) - min := uint64(math.MaxUint64) - for i := uint64(1); i <= uint64(storeCount); i++ { - count := ss.TotalCountByStore(i) - if count > max { - max = count - } - if count < min { - min = count - } - } - c.Assert(max-min, LessEqual, uint64(2)) - } - check(scatterer.ordinaryEngine.selectedPeer) -} - func TestRegionHasLearner(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + group := "group" opt := config.NewTestOptions() tc := mockcluster.NewCluster(ctx, opt) stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false) @@ -568,14 +531,14 @@ func TestRegionHasLearner(t *testing.T) { scatterer := NewRegionScatterer(ctx, tc, oc) regionCount := 50 for i := 1; i <= regionCount; i++ { - _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group") + _, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group) re.NoError(err) } check := func(ss *selectedStores) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= max; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -590,7 +553,7 @@ func TestRegionHasLearner(t *testing.T) { max := uint64(0) min := uint64(math.MaxUint64) for i := uint64(1); i <= voterCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) if count > max { max = count } @@ -601,7 +564,7 @@ func TestRegionHasLearner(t *testing.T) { re.LessOrEqual(max-2, uint64(regionCount)/voterCount) re.LessOrEqual(min-1, uint64(regionCount)/voterCount) for i := voterCount + 1; i <= storeCount; i++ { - count := ss.TotalCountByStore(i) + count := ss.Get(i, group) re.LessOrEqual(count, uint64(0)) } } @@ -641,6 +604,9 @@ func (s *testScatterRegionSuite) TestSelectedStores(c *C) { region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2) op := scatterer.scatterRegion(region, group) c.Assert(isPeerCountChanged(op), IsFalse) + if op != nil { + c.Assert(group, Equals, op.AdditionalInfos["group"]) + } } } From 63bf9703075813792f7a9db1d8a6db9659d3bfa8 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 26 Feb 2024 15:52:59 +0800 Subject: [PATCH 03/11] checker: fix unhealth region skip the rule check (#6427) (#7822) close tikv/pd#6426 allow the `schedule=deny` label can do rule constraints check Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- server/cluster/cluster.go | 2 + server/cluster/coordinator.go | 17 +++++- server/cluster/coordinator_test.go | 52 ++++++++++++++++++- server/schedule/checker/checker_controller.go | 18 ++++--- server/schedule/metrics.go | 10 ++++ server/schedule/operator_controller.go | 9 ---- server/schedule/operator_controller_test.go | 20 +------ 7 files changed, 91 insertions(+), 37 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 24eb2ad7047..19132757751 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -63,6 +63,8 @@ import ( var ( // DefaultMinResolvedTSPersistenceInterval is the default value of min resolved ts persistence interval. DefaultMinResolvedTSPersistenceInterval = 10 * time.Second + + denySchedulersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("schedulers", "deny") ) // regionLabelGCInterval is the interval to run region-label's GC work. diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index d62de672896..4a236aea665 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -893,8 +893,23 @@ func (s *scheduleController) Schedule() []*operator.Operator { } cacheCluster := newCacheCluster(s.cluster) // If we have schedule, reset interval to the minimal interval. - if ops := s.Scheduler.Schedule(cacheCluster); len(ops) > 0 { + ops := s.Scheduler.Schedule(cacheCluster) + foundDisabled := false + for _, op := range ops { + if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil { + if labelMgr.ScheduleDisabled(s.cluster.GetRegion(op.RegionID())) { + denySchedulersByLabelerCounter.Inc() + foundDisabled = true + break + } + } + } + if len(ops) > 0 { s.nextInterval = s.Scheduler.GetMinInterval() + // try regenerating operators + if foundDisabled { + continue + } return ops } } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index c15302015f7..199ccaa38ba 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -373,11 +373,22 @@ func (s *testCoordinatorSuite) TestCheckRegionWithScheduleDeny(c *C) { Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, }) + // should allow to do rule checker c.Assert(labelerManager.ScheduleDisabled(region), IsTrue) - s.checkRegion(c, tc, co, 1, 0) + s.checkRegion(c, tc, co, 1, 1) + // should not allow to merge + tc.opt.SetSplitMergeInterval(time.Duration(0)) + + c.Assert(tc.addLeaderRegion(2, 2, 3, 4), IsNil) + c.Assert(tc.addLeaderRegion(3, 2, 3, 4), IsNil) + region = tc.GetRegion(2) + c.Assert(labelerManager.ScheduleDisabled(region), IsTrue) + s.checkRegion(c, tc, co, 2, 0) + + // delete label rule, should allow to do merge labelerManager.DeleteLabelRule("schedulelabel") c.Assert(labelerManager.ScheduleDisabled(region), IsFalse) - s.checkRegion(c, tc, co, 1, 1) + s.checkRegion(c, tc, co, 2, 2) } func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) { @@ -864,6 +875,43 @@ func (s *testCoordinatorSuite) TestRemoveScheduler(c *C) { co.wg.Wait() } +func (s *testCoordinatorSuite) TestDenyScheduler(c *C) { + tc, co, cleanup := prepare(nil, nil, func(co *coordinator) { + labelerManager := co.cluster.GetRegionLabeler() + labelerManager.SetLabelRule(&labeler.LabelRule{ + ID: "schedulelabel", + Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: labeler.KeyRange, + Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}}, + }) + co.run() + }, c) + defer cleanup() + + c.Assert(len(co.schedulers), Equals, len(config.DefaultSchedulers)) + + // Transfer peer from store 4 to store 1 if not set deny. + c.Assert(tc.addRegionStore(4, 40), IsNil) + c.Assert(tc.addRegionStore(3, 30), IsNil) + c.Assert(tc.addRegionStore(2, 20), IsNil) + c.Assert(tc.addRegionStore(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(1, 2, 3, 4), IsNil) + + // Transfer leader from store 4 to store 2 if not set deny. + c.Assert(tc.updateLeaderCount(4, 1000), IsNil) + c.Assert(tc.updateLeaderCount(3, 50), IsNil) + c.Assert(tc.updateLeaderCount(2, 20), IsNil) + c.Assert(tc.updateLeaderCount(1, 10), IsNil) + c.Assert(tc.addLeaderRegion(2, 4, 3, 2), IsNil) + + // there should no balance leader/region operator + for i := 0; i < 10; i++ { + c.Assert(co.opController.GetOperator(1), IsNil) + c.Assert(co.opController.GetOperator(2), IsNil) + time.Sleep(10 * time.Millisecond) + } +} + func (s *testCoordinatorSuite) TestRestart(c *C) { tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { // Turn off balance, we test add replica only. diff --git a/server/schedule/checker/checker_controller.go b/server/schedule/checker/checker_controller.go index 4e7e28334d0..f88bb626ee9 100644 --- a/server/schedule/checker/checker_controller.go +++ b/server/schedule/checker/checker_controller.go @@ -32,6 +32,8 @@ import ( // DefaultCacheSize is the default length of waiting list. const DefaultCacheSize = 1000 +var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny") + // Controller is used to manage all checkers. type Controller struct { cluster schedule.Cluster @@ -80,13 +82,6 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { return []*operator.Operator{op} } - if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { - l := cl.GetRegionLabeler() - if l.ScheduleDisabled(region) { - return nil - } - } - if op := c.splitChecker.Check(region); op != nil { return []*operator.Operator{op} } @@ -112,6 +107,15 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator { c.regionWaitingList.Put(region.GetID(), nil) } } + // skip the joint checker, split checker and rule checker when region label is set to "schedule=deny". + // those checkers is help to make region health, it's necessary to skip them when region is set to deny. + if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { + l := cl.GetRegionLabeler() + if l.ScheduleDisabled(region) { + denyCheckersByLabelerCounter.Inc() + return nil + } + } if c.mergeChecker != nil { allowed := opController.OperatorCount(operator.OpMerge) < c.opts.GetMergeScheduleLimit() diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index 89dabf8e74e..79e789075fe 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -83,6 +83,15 @@ var ( Name: "scatter_distribution", Help: "Counter of the distribution in scatter.", }, []string{"store", "is_leader", "engine"}) + + // LabelerEventCounter is a counter of the scheduler labeler system. + LabelerEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "labeler_event_counter", + Help: "Counter of the scheduler label.", + }, []string{"type", "event"}) ) func init() { @@ -94,4 +103,5 @@ func init() { prometheus.MustRegister(scatterCounter) prometheus.MustRegister(scatterDistributionCounter) prometheus.MustRegister(operatorSizeHist) + prometheus.MustRegister(LabelerEventCounter) } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 513abb6222c..c5d58cd7c1f 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -32,7 +32,6 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" "github.com/tikv/pd/server/schedule/hbstream" - "github.com/tikv/pd/server/schedule/labeler" "github.com/tikv/pd/server/schedule/operator" "go.uber.org/zap" ) @@ -421,14 +420,6 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() { continue } - if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok { - l := cl.GetRegionLabeler() - if l.ScheduleDisabled(region) { - log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID())) - operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc() - return false - } - } } expired := false for _, op := range ops { diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 5629f4724f1..5c4f1dddd5f 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -752,23 +752,7 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) { RuleType: labeler.KeyRange, Data: []interface{}{map[string]interface{}{"start_key": "1a", "end_key": "1b"}}, }) - c.Assert(labelerManager.ScheduleDisabled(source), IsTrue) - // add operator should be failed since it is labeled with `schedule=deny`. - c.Assert(controller.AddWaitingOperator(ops...), Equals, 0) - - // add operator should be success without `schedule=deny` - labelerManager.DeleteLabelRule("schedulelabel") - labelerManager.ScheduleDisabled(source) - c.Assert(labelerManager.ScheduleDisabled(source), IsFalse) - // now there is one operator being allowed to add, if it is a merge operator - // both of the pair are allowed - ops, err = operator.CreateMergeRegionOperator("merge-region", cluster, source, target, operator.OpMerge) - c.Assert(err, IsNil) - c.Assert(ops, HasLen, 2) - c.Assert(controller.AddWaitingOperator(ops...), Equals, 2) - c.Assert(controller.AddWaitingOperator(ops...), Equals, 0) - - // no space left, new operator can not be added. - c.Assert(controller.AddWaitingOperator(addPeerOp(0)), Equals, 0) + // add operator should be success since it is not check in addWaitingOperator + c.Assert(2, Equals, controller.AddWaitingOperator(ops...)) } From a2f526da5ef77104a3bfa5c9ff32f06ea3848d24 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 28 Feb 2024 11:54:30 +0800 Subject: [PATCH 04/11] core: batch get region size (#7252) (#7693) close tikv/pd#7248 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/cluster/cluster.go | 2 +- server/core/basic_cluster.go | 31 +++++++++- server/core/region.go | 13 ---- server/core/region_test.go | 113 +++++++++++++++++++++++++++++++++++ tests/server/api/api_test.go | 6 +- 5 files changed, 145 insertions(+), 20 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 19132757751..efc6be2ae81 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1453,8 +1453,8 @@ func (c *RaftCluster) checkStores() { } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index eee97c11d11..d82852c6bca 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap" ) +var scanRegionLimit = 1000 + // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { Stores struct { @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { - bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() - return bc.Regions.GetRegionSizeByRange(startKey, endKey) + var size int64 + for { + bc.Regions.mu.RLock() + var cnt int + bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + bc.Regions.mu.RUnlock() + if cnt == 0 { + break + } + if len(startKey) == 0 { + break + } + } + + return size } func (bc *BasicCluster) getWriteRate( diff --git a/server/core/region.go b/server/core/region.go index e43ddd032ad..f97c6aea7e6 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1191,19 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio r.tree.scanRange(startKey, iterator) } -// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false - } - size += region.GetApproximateSize() - return true - }) - return size -} - // GetAdjacentRegions returns region's info that is adjacent with specific region func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { p, n := r.tree.getAdjacentRegions(region) diff --git a/server/core/region_test.go b/server/core/region_test.go index c1ed83b7f46..8717b5d6be3 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -21,10 +21,12 @@ import ( "strconv" "strings" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/server/id" ) @@ -662,6 +664,117 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + cluster.Regions.SetRegion(item) + } +} + +func TestGetRegionSizeByRange(t *testing.T) { + cluster := NewBasicCluster() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + cluster.Regions.SetRegion(region) + } + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := nums / i + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer, SetApproximateSize(10)) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + cluster.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + cluster.Regions.SetRegion(item) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + cluster := NewBasicCluster() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + cluster.Regions.SetRegion(region) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + cluster.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[rand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + cluster.Regions.SetRegion(n) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 8102532102f..ab96e312962 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -686,7 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { pdctl.MustPutStore(c, leader.GetServer(), store) } for i := 0; i < 100; i++ { - pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -713,8 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { c.Assert(p.LeftSeconds, Equals, math.MaxFloat64) // update size - pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil) From 75b257efd11db978f5e2a572fabf239b5b204508 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 11 Jun 2024 11:05:59 +0800 Subject: [PATCH 05/11] chore: add prow OWNERS files to control the approvals for critical configuration files (#8218) (#8248) close tikv/pd#8167 Signed-off-by: wuhuizuo Co-authored-by: wuhuizuo Co-authored-by: husharp --- OWNERS_ALIASES | 6 ++++++ client/resource_group/controller/OWNERS | 7 +++++++ client/tlsutil/OWNERS | 7 +++++++ conf/OWNERS | 7 +++++++ pkg/encryption/OWNERS | 7 +++++++ pkg/mcs/resourcemanager/server/OWNERS | 7 +++++++ pkg/mcs/scheduling/server/config/OWNERS | 7 +++++++ pkg/mcs/tso/server/OWNERS | 7 +++++++ pkg/schedule/config/OWNERS | 7 +++++++ pkg/schedule/schedulers/OWNERS | 7 +++++++ server/config/OWNERS | 7 +++++++ 11 files changed, 76 insertions(+) create mode 100644 OWNERS_ALIASES create mode 100644 client/resource_group/controller/OWNERS create mode 100644 client/tlsutil/OWNERS create mode 100644 conf/OWNERS create mode 100644 pkg/encryption/OWNERS create mode 100644 pkg/mcs/resourcemanager/server/OWNERS create mode 100644 pkg/mcs/scheduling/server/config/OWNERS create mode 100644 pkg/mcs/tso/server/OWNERS create mode 100644 pkg/schedule/config/OWNERS create mode 100644 pkg/schedule/schedulers/OWNERS create mode 100644 server/config/OWNERS diff --git a/OWNERS_ALIASES b/OWNERS_ALIASES new file mode 100644 index 00000000000..516a466c91e --- /dev/null +++ b/OWNERS_ALIASES @@ -0,0 +1,6 @@ +# Sort the member alphabetically. +aliases: + sig-critical-approvers-config: + - easonn7 + - kevin-xianliu + - niubell diff --git a/client/resource_group/controller/OWNERS b/client/resource_group/controller/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/client/resource_group/controller/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/client/tlsutil/OWNERS b/client/tlsutil/OWNERS new file mode 100644 index 00000000000..211db06feee --- /dev/null +++ b/client/tlsutil/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|tlsconfig\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/conf/OWNERS b/conf/OWNERS new file mode 100644 index 00000000000..1a435c49089 --- /dev/null +++ b/conf/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.toml)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/encryption/OWNERS b/pkg/encryption/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/encryption/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/resourcemanager/server/OWNERS b/pkg/mcs/resourcemanager/server/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/resourcemanager/server/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/scheduling/server/config/OWNERS b/pkg/mcs/scheduling/server/config/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/scheduling/server/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/mcs/tso/server/OWNERS b/pkg/mcs/tso/server/OWNERS new file mode 100644 index 00000000000..aa02465dbd9 --- /dev/null +++ b/pkg/mcs/tso/server/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/schedule/config/OWNERS b/pkg/schedule/config/OWNERS new file mode 100644 index 00000000000..ce5d15ddc19 --- /dev/null +++ b/pkg/schedule/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|(config|store_config)\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/pkg/schedule/schedulers/OWNERS b/pkg/schedule/schedulers/OWNERS new file mode 100644 index 00000000000..ae96e4f1f42 --- /dev/null +++ b/pkg/schedule/schedulers/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|hot_region_config\\.go)$": + approvers: + - sig-critical-approvers-config diff --git a/server/config/OWNERS b/server/config/OWNERS new file mode 100644 index 00000000000..179de4843e6 --- /dev/null +++ b/server/config/OWNERS @@ -0,0 +1,7 @@ +# See the OWNERS docs at https://go.k8s.io/owners +options: + no_parent_owners: true +filters: + "(OWNERS|(config|service_middleware_config)\\.go)$": + approvers: + - sig-critical-approvers-config From 1d2b8913a350e9084a8c8a4f61104a53526f8bee Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 11 Jun 2024 11:45:00 +0800 Subject: [PATCH 06/11] OWNERS: Auto Sync OWNERS files from community membership (#8163) (#8271) Signed-off-by: Ti Chi Robot --- OWNERS | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 OWNERS diff --git a/OWNERS b/OWNERS new file mode 100644 index 00000000000..5911dfd3b66 --- /dev/null +++ b/OWNERS @@ -0,0 +1,26 @@ +# See the OWNERS docs at https://go.k8s.io/owners +approvers: + - AndreMouche + - binshi-bing + - bufferflies + - CabinfeverB + - Connor1996 + - disksing + - huachaohuang + - HunDunDM + - HuSharp + - JmPotato + - lhy1024 + - nolouch + - overvenus + - qiuyesuifeng + - rleungx + - siddontang + - Yisaer + - zhouqiang-cl +reviewers: + - BusyJay + - howardlau1999 + - Luffbee + - shafreeck + - xhebox From ea57f5e25bca8986d6adfd37273e6996fc35f3e6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 26 Jun 2024 10:07:22 +0800 Subject: [PATCH 07/11] config: fix the panic caused by zero RegionSplitSizeMB (#8324) (#8329) close tikv/pd#8323 Bypass the case that `RegionSplitSizeMB` might be zero in `(*StoreConfig) CheckRegionSize`. Signed-off-by: ti-chi-bot Signed-off-by: JmPotato Co-authored-by: JmPotato --- pkg/typeutil/size_test.go | 17 +++++++++++++---- server/config/store_config.go | 9 +++++++-- server/config/store_config_test.go | 4 ++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pkg/typeutil/size_test.go b/pkg/typeutil/size_test.go index eae092cdb5c..5d66f43d5a4 100644 --- a/pkg/typeutil/size_test.go +++ b/pkg/typeutil/size_test.go @@ -17,6 +17,7 @@ package typeutil import ( "encoding/json" + "github.com/docker/go-units" . "github.com/pingcap/check" ) @@ -41,23 +42,31 @@ func (s *testSizeSuite) TestJSON(c *C) { } func (s *testSizeSuite) TestParseMbFromText(c *C) { + const defaultValue = 2 + testdata := []struct { body []string size uint64 }{{ body: []string{"10Mib", "10MiB", "10M", "10MB"}, - size: uint64(10), + size: 10, }, { body: []string{"10GiB", "10Gib", "10G", "10GB"}, - size: uint64(10 * 1024), + size: 10 * units.GiB / units.MiB, + }, { + body: []string{"1024KiB", "1048576"}, + size: 1, + }, { + body: []string{"100KiB", "1023KiB", "1048575", "0"}, + size: 0, }, { body: []string{"10yiB", "10aib"}, - size: uint64(1), + size: defaultValue, }} for _, t := range testdata { for _, b := range t.body { - c.Assert(int(ParseMBFromText(b, 1)), Equals, int(t.size)) + c.Assert(ParseMBFromText(b, defaultValue), Equals, t.size) } } } diff --git a/server/config/store_config.go b/server/config/store_config.go index 27fc456dd08..12445378192 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -129,9 +129,14 @@ func (c *StoreConfig) CheckRegionSize(size, mergeSize uint64) error { if size < c.GetRegionMaxSize() { return nil } - + // This could happen when the region split size is set to a value less than 1MiB, + // which is a very extreme case, we just pass the check here to prevent panic. + regionSplitSize := c.GetRegionSplitSize() + if regionSplitSize == 0 { + return nil + } // the smallest of the split regions can not be merge again, so it's size should less merge size. - if smallSize := size % c.GetRegionSplitSize(); smallSize <= mergeSize && smallSize != 0 { + if smallSize := size % regionSplitSize; smallSize <= mergeSize && smallSize != 0 { log.Debug("region size is too small", zap.Uint64("size", size), zap.Uint64("merge-size", mergeSize), zap.Uint64("small-size", smallSize)) return errs.ErrCheckerMergeAgain.FastGenByArgs("the smallest region of the split regions is less than max-merge-region-size, " + "it will be merged again") diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 478e1ebb3d7..821d5825610 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -147,4 +147,8 @@ func (t *testTiKVConfigSuite) TestMergeCheck(c *C) { c.Assert(config.CheckRegionKeys(v.keys, v.mergeKeys), NotNil) } } + // Test CheckRegionSize when the region split size is 0. + config.RegionSplitSize = "100KiB" + c.Assert(config.GetRegionSplitSize(), Equals, uint64(0)) + c.Assert(config.CheckRegionSize(defaultRegionMaxSize, 50), IsNil) } From 064986b06c52e963380d9e51bd51854f5a7b4e8b Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 28 Aug 2024 15:46:48 +0800 Subject: [PATCH 08/11] server: skip the engine key when match store label (#8486) (#8573) close tikv/pd#8480 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung --- server/api/label_test.go | 35 +++++++++++++++++++++++++++++++++-- server/cluster/cluster.go | 3 +++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/server/api/label_test.go b/server/api/label_test.go index 9acddae8436..2fc2e5eef8f 100644 --- a/server/api/label_test.go +++ b/server/api/label_test.go @@ -17,6 +17,7 @@ package api import ( "context" "fmt" + "strings" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -24,6 +25,7 @@ import ( tu "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/config" + "github.com/tikv/pd/server/core" ) var _ = Suite(&testLabelsStoreSuite{}) @@ -264,6 +266,30 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { valid: false, expectError: "key matching the label was not found", }, + { + store: &metapb.Store{ + Id: 3, + Address: "tiflash1", + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "zone", + Value: "us-west-1", + }, + { + Key: "disk", + Value: "ssd", + }, + { + Key: core.EngineKey, + Value: core.EngineTiFlash, + }, + }, + Version: "3.0.0", + }, + valid: true, + expectError: "placement rules is disabled", + }, } for _, t := range cases { @@ -271,12 +297,16 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, Store: &metapb.Store{ Id: t.store.Id, - Address: fmt.Sprintf("tikv%d", t.store.Id), + Address: t.store.Address, State: t.store.State, Labels: t.store.Labels, Version: t.store.Version, }, }) + if t.store.Address == "tiflash1" { + c.Assert(strings.Contains(resp.GetHeader().GetError().String(), t.expectError), IsTrue) + continue + } if t.valid { c.Assert(err, IsNil) } else { @@ -291,12 +321,13 @@ func (s *testStrictlyLabelsStoreSuite) TestStoreMatch(c *C) { Header: &pdpb.RequestHeader{ClusterId: s.svr.ClusterID()}, Store: &metapb.Store{ Id: t.store.Id, - Address: fmt.Sprintf("tikv%d", t.store.Id), + Address: t.store.Address, State: t.store.State, Labels: t.store.Labels, Version: t.store.Version, }, }) + if t.valid { c.Assert(err, IsNil) } else { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index efc6be2ae81..a97d4408536 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1130,6 +1130,9 @@ func (c *RaftCluster) checkStoreLabels(s *core.StoreInfo) error { } for _, label := range s.GetLabels() { key := label.GetKey() + if key == core.EngineKey { + continue + } if _, ok := keysSet[key]; !ok { log.Warn("not found the key match with the store label", zap.Stringer("store", s.GetMeta()), From e878b626f436cf83d9549a5e56bf8db8cef4218a Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Sep 2024 15:38:30 +0800 Subject: [PATCH 09/11] schedule: fix panic when switching placement rules (#7415) (#7422) close tikv/pd#7414 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/schedule/checker/rule_checker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index ea665c1faa9..ac24aaea50b 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -102,6 +102,11 @@ func (c *RuleChecker) CheckWithFit(region *core.RegionInfo, fit *placement.Regio panic("cached should be used") }) + // the placement rule is disabled + if fit == nil { + return + } + // If the fit is calculated by FitRegion, which means we get a new fit result, thus we should // invalid the cache if it exists c.ruleManager.InvalidCache(region.GetID()) From 5c8d3f3b6fd0794d60c14be27dd989518d27d227 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 10 Sep 2024 16:37:14 +0800 Subject: [PATCH 10/11] *: fix sync isolation level to default placement rule (#7122) (#7125) close tikv/pd#7121 Signed-off-by: ti-chi-bot Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mock/mockcluster/mockcluster.go | 2 +- server/api/operator_test.go | 4 +- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 ++--- server/config/persist_options.go | 7 ++++ server/schedule/placement/rule_manager.go | 3 +- .../schedule/placement/rule_manager_test.go | 6 +-- server/server.go | 11 ++--- server/statistics/region_collection_test.go | 2 +- tests/pdctl/config/config_test.go | 40 +++++++++++++++++-- 10 files changed, 65 insertions(+), 22 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index a285e4a8cbf..b2f5bbae66b 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -188,7 +188,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts()) - mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels) + mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) } } diff --git a/server/api/operator_test.go b/server/api/operator_test.go index 86d99c5e726..9f5d3167476 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -364,7 +364,9 @@ func (s *testTransferRegionOperatorSuite) TestTransferRegionWithPlacementRule(c if tc.placementRuleEnable { err := s.svr.GetRaftCluster().GetRuleManager().Initialize( s.svr.GetRaftCluster().GetOpts().GetMaxReplicas(), - s.svr.GetRaftCluster().GetOpts().GetLocationLabels()) + s.svr.GetRaftCluster().GetOpts().GetLocationLabels(), + s.svr.GetRaftCluster().GetOpts().GetIsolationLevel(), + ) c.Assert(err, IsNil) } if len(tc.rules) > 0 { diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a97d4408536..082d5851a22 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -253,7 +253,7 @@ func (c *RaftCluster) Start(s Server) error { c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { - err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels()) + err = c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { return err } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index cf13b32e46f..728ff399972 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -222,7 +222,7 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -393,7 +393,7 @@ func (s *testClusterInfoSuite) TestUpStore(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -488,7 +488,7 @@ func (s *testClusterInfoSuite) TestDeleteStoreUpdatesClusterVersion(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1118,7 +1118,7 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { cluster.coordinator = newCoordinator(s.ctx, cluster, nil) cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { - err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } @@ -1705,7 +1705,7 @@ func newTestRaftCluster( rc.InitCluster(id, opt, s, basicCluster) rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { - err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels()) + err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { panic(err) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index fe7203722c2..fc7835e6ae1 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -260,6 +260,13 @@ func (o *PersistOptions) SetSplitMergeInterval(splitMergeInterval time.Duration) o.SetScheduleConfig(v) } +// SetMaxStoreDownTime to set the max store down time. It's only used to test. +func (o *PersistOptions) SetMaxStoreDownTime(time time.Duration) { + v := o.GetScheduleConfig().Clone() + v.MaxStoreDownTime = typeutil.NewDuration(time) + o.SetScheduleConfig(v) +} + // SetMaxMergeRegionSize sets the max merge region size. func (o *PersistOptions) SetMaxMergeRegionSize(maxMergeRegionSize uint64) { v := o.GetScheduleConfig().Clone() diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go index 04b2d96d9f1..ac5585fdbe0 100644 --- a/server/schedule/placement/rule_manager.go +++ b/server/schedule/placement/rule_manager.go @@ -63,7 +63,7 @@ func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSet // Initialize loads rules from storage. If Placement Rules feature is never enabled, it creates default rule that is // compatible with previous configuration. -func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error { +func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolationLevel string) error { m.Lock() defer m.Unlock() if m.initialized { @@ -84,6 +84,7 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error Role: Voter, Count: maxReplica, LocationLabels: locationLabels, + IsolationLevel: isolationLevel, } if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { return err diff --git a/server/schedule/placement/rule_manager_test.go b/server/schedule/placement/rule_manager_test.go index ae750fe5f9b..7e19942e488 100644 --- a/server/schedule/placement/rule_manager_test.go +++ b/server/schedule/placement/rule_manager_test.go @@ -36,7 +36,7 @@ func (s *testManagerSuite) SetUpTest(c *C) { s.store = storage.NewStorageWithMemoryBackend() var err error s.manager = NewRuleManager(s.store, nil, nil) - err = s.manager.Initialize(3, []string{"zone", "rack", "host"}) + err = s.manager.Initialize(3, []string{"zone", "rack", "host"}, "") c.Assert(err, IsNil) } @@ -113,7 +113,7 @@ func (s *testManagerSuite) TestSaveLoad(c *C) { } m2 := NewRuleManager(s.store, nil, nil) - err := m2.Initialize(3, []string{"no", "labels"}) + err := m2.Initialize(3, []string{"no", "labels"}, "") c.Assert(err, IsNil) c.Assert(m2.GetAllRules(), HasLen, 3) c.Assert(m2.GetRule("pd", "default").String(), Equals, rules[0].String()) @@ -128,7 +128,7 @@ func (s *testManagerSuite) TestSetAfterGet(c *C) { s.manager.SetRule(rule) m2 := NewRuleManager(s.store, nil, nil) - err := m2.Initialize(100, []string{}) + err := m2.Initialize(100, []string{}, "") c.Assert(err, IsNil) rule = m2.GetRule("pd", "default") c.Assert(rule.Count, Equals, 1) diff --git a/server/server.go b/server/server.go index 3463b3c6e4a..43bdc458494 100644 --- a/server/server.go +++ b/server/server.go @@ -897,7 +897,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } if cfg.EnablePlacementRules { // initialize rule manager. - if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel); err != nil { return err } } else { @@ -920,19 +920,19 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { - // replication config won't work when placement rule is enabled and exceeds one default rule + // replication config won't work when placement rule is enabled and exceeds one default rule if !(defaultRule != nil && len(defaultRule.StartKey) == 0 && len(defaultRule.EndKey) == 0) { - return errors.New("cannot update MaxReplicas or LocationLabels when placement rules feature is enabled and not only default rule exists, please update rule instead") + return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead") } - if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels))) { + if !(defaultRule.Count == int(old.MaxReplicas) && typeutil.StringsEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) && defaultRule.IsolationLevel == old.IsolationLevel) { return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead") } return nil } - if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels)) { + if !(cfg.MaxReplicas == old.MaxReplicas && typeutil.StringsEqual(cfg.LocationLabels, old.LocationLabels) && cfg.IsolationLevel == old.IsolationLevel) { if err := CheckInDefaultRule(); err != nil { return err } @@ -943,6 +943,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels + rule.IsolationLevel = cfg.IsolationLevel rc := s.GetRaftCluster() if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() diff --git a/server/statistics/region_collection_test.go b/server/statistics/region_collection_test.go index eb100e958fd..f9d1193d700 100644 --- a/server/statistics/region_collection_test.go +++ b/server/statistics/region_collection_test.go @@ -42,7 +42,7 @@ func (t *testRegionStatisticsSuite) SetUpTest(c *C) { t.store = storage.NewStorageWithMemoryBackend() var err error t.manager = placement.NewRuleManager(t.store, nil, nil) - err = t.manager.Initialize(3, []string{"zone", "rack", "host"}) + err = t.manager.Initialize(3, []string{"zone", "rack", "host"}, "") c.Assert(err, IsNil) } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 297cc538606..b15bfd5e7d9 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -637,7 +637,7 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.MaxReplicas, Equals, expect) } - checkLocaltionLabels := func(expect int) { + checkLocationLabels := func(expect int) { args := []string{"-u", pdAddr, "config", "show", "replication"} output, err := pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) @@ -646,6 +646,15 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(replicationCfg.LocationLabels, HasLen, expect) } + checkIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "show", "replication"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + replicationCfg := config.ReplicationConfig{} + c.Assert(json.Unmarshal(output, &replicationCfg), IsNil) + c.Assert(replicationCfg.IsolationLevel, Equals, expect) + } + checkRuleCount := func(expect int) { args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} output, err := pdctl.ExecuteCommand(cmd, args...) @@ -664,6 +673,15 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(rule.LocationLabels, HasLen, expect) } + checkRuleIsolationLevel := func(expect string) { + args := []string{"-u", pdAddr, "config", "placement-rules", "show", "--group", "pd", "--id", "default"} + output, err := pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + rule := placement.Rule{} + c.Assert(json.Unmarshal(output, &rule), IsNil) + c.Assert(rule.IsolationLevel, Equals, expect) + } + // update successfully when placement rules is not enabled. output, err := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "max-replicas", "2") c.Assert(err, IsNil) @@ -672,8 +690,13 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "zone,host") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - checkLocaltionLabels(2) + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "zone") + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) + checkLocationLabels(2) checkRuleLocationLabels(2) + checkIsolationLevel("zone") + checkRuleIsolationLevel("zone") // update successfully when only one default rule exists. output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "placement-rules", "enable") @@ -686,11 +709,18 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { checkMaxReplicas(3) checkRuleCount(3) + // We need to change isolation first because we will validate + // if the location label contains the isolation level when setting location labels. + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "isolation-level", "host") + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "config", "set", "location-labels", "host") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") // update unsuccessfully when many rule exists. f, _ := os.CreateTemp("/tmp", "pd_tests") @@ -720,8 +750,10 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(err, IsNil) checkMaxReplicas(4) checkRuleCount(4) - checkLocaltionLabels(1) + checkLocationLabels(1) checkRuleLocationLabels(1) + checkIsolationLevel("host") + checkRuleIsolationLevel("host") } func (s *configTestSuite) TestPDServerConfig(c *C) { From d76f55de05d608f099e915bfde5d9beabf762fce Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 11 Sep 2024 13:17:21 +0800 Subject: [PATCH 11/11] election: fix the keep alive worker (#6925) (#6939) close tikv/pd#6926 Signed-off-by: Ryan Leung Co-authored-by: Ryan Leung --- server/election/lease.go | 5 ++++- server/election/lease_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/server/election/lease.go b/server/election/lease.go index 4e368b98361..f3c00f47089 100644 --- a/server/election/lease.go +++ b/server/election/lease.go @@ -152,8 +152,11 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c expire := start.Add(time.Duration(res.TTL) * time.Second) select { case ch <- expire: - case <-ctx1.Done(): + // Here we don't use `ctx1.Done()` because we want to make sure if the keep alive success, we can update the expire time. + case <-ctx.Done(): } + } else { + log.Error("keep alive response ttl is zero", zap.String("purpose", l.Purpose)) } }() diff --git a/server/election/lease_test.go b/server/election/lease_test.go index 0c0aa3c1687..0161e7f3d27 100644 --- a/server/election/lease_test.go +++ b/server/election/lease_test.go @@ -16,9 +16,11 @@ package election import ( "context" + "testing" "time" . "github.com/pingcap/check" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -104,3 +106,34 @@ func (s *testLeaseSuite) TestLease(c *C) { time.Sleep((defaultLeaseTimeout + 1) * time.Second) c.Check(lease1.IsExpired(), IsTrue) } + +func TestLeaseKeepAlive(t *testing.T) { + re := require.New(t) + cfg := etcdutil.NewTestSingleConfig() + etcd, err := embed.StartEtcd(cfg) + defer func() { + etcd.Close() + }() + re.NoError(err) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + re.NoError(err) + + <-etcd.Server.ReadyNotify() + + // Create the lease. + lease := &lease{ + Purpose: "test_lease", + client: client, + lease: clientv3.NewLease(client), + } + + re.NoError(lease.Grant(defaultLeaseTimeout)) + ch := lease.keepAliveWorker(context.Background(), 2*time.Second) + time.Sleep(2 * time.Second) + <-ch + re.NoError(lease.Close()) +}