From 8b1e141a67e3976e1cd565981058fbf76d20568a Mon Sep 17 00:00:00 2001 From: Hu# Date: Mon, 11 Sep 2023 15:42:13 +0800 Subject: [PATCH] This is an automated cherry-pick of #7054 close tikv/pd#7053 Signed-off-by: ti-chi-bot --- server/api/min_resolved_ts.go | 127 +++++++++++++++ server/grpc_service.go | 53 +++++- server/handler.go | 14 +- server/server.go | 238 +++++++++++++++++++++++++-- tests/server/api/api_test.go | 295 ++++++++++++++++++++++++++++++++++ 5 files changed, 707 insertions(+), 20 deletions(-) create mode 100644 server/api/min_resolved_ts.go diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go new file mode 100644 index 000000000000..1edf924370f5 --- /dev/null +++ b/server/api/min_resolved_ts.go @@ -0,0 +1,127 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + "strconv" + "strings" + + "github.com/gorilla/mux" + "github.com/tikv/pd/pkg/utils/typeutil" + "github.com/tikv/pd/server" + "github.com/unrolled/render" +) + +type minResolvedTSHandler struct { + svr *server.Server + rd *render.Render +} + +func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler { + return &minResolvedTSHandler{ + svr: svr, + rd: rd, + } +} + +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type minResolvedTS struct { + IsRealTime bool `json:"is_real_time,omitempty"` + MinResolvedTS uint64 `json:"min_resolved_ts"` + PersistInterval typeutil.Duration `json:"persist_interval,omitempty"` + StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` +} + +// @Tags min_store_resolved_ts +// @Summary Get store-level min resolved ts. +// @Produce json +// @Success 200 {array} minResolvedTS +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /min-resolved-ts/{store_id} [get] +func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) { + c := getCluster(r) + idStr := mux.Vars(r)["store_id"] + storeID, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + value := c.GetStoreMinResolvedTS(storeID) + persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval + h.rd.JSON(w, http.StatusOK, minResolvedTS{ + MinResolvedTS: value, + PersistInterval: persistInterval, + IsRealTime: persistInterval.Duration != 0, + }) +} + +// @Tags min_resolved_ts +// @Summary Get cluster-level min resolved ts and optionally store-level min resolved ts. +// @Description Optionally, we support a query parameter `scope` +// to get store-level min resolved ts by specifying a list of store IDs. +// - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. +// - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. +// - When scope given a list of stores, min_resolved_ts will be provided for each store +// and the scope-specific min_resolved_ts will be returned. +// +// @Produce json +// @Param scope query string false "Scope of the min resolved ts: comma-separated list of store IDs (e.g., '1,2,3')." default(cluster) +// @Success 200 {array} minResolvedTS +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /min-resolved-ts [get] +func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) { + c := getCluster(r) + scopeMinResolvedTS := c.GetMinResolvedTS() + persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval + + var storesMinResolvedTS map[uint64]uint64 + if scopeStr := r.URL.Query().Get("scope"); len(scopeStr) > 0 { + // scope is an optional parameter, it can be `cluster` or specified store IDs. + // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. + // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. + // - When scope given a list of stores, min_resolved_ts will be provided for each store + // and the scope-specific min_resolved_ts will be returned. + if scopeStr == "cluster" { + stores := c.GetMetaStores() + ids := make([]uint64, len(stores)) + for i, store := range stores { + ids[i] = store.GetId() + } + // use cluster-level min_resolved_ts as the scope-specific min_resolved_ts. + _, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids) + } else { + scopeIDs := strings.Split(scopeStr, ",") + ids := make([]uint64, len(scopeIDs)) + for i, idStr := range scopeIDs { + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + ids[i] = id + } + scopeMinResolvedTS, storesMinResolvedTS = c.GetMinResolvedTSByStoreIDs(ids) + } + } + + h.rd.JSON(w, http.StatusOK, minResolvedTS{ + MinResolvedTS: scopeMinResolvedTS, + PersistInterval: persistInterval, + IsRealTime: persistInterval.Duration != 0, + StoresMinResolvedTS: storesMinResolvedTS, + }) +} diff --git a/server/grpc_service.go b/server/grpc_service.go index 71bcc8bba97b..52a83d88d5da 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1062,7 +1062,6 @@ func (s *Server) ReportBatchSplit(ctx context.Context, request *pdpb.ReportBatch if rc == nil { return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil } - _, err := rc.HandleBatchReportSplit(request) if err != nil { return nil, status.Errorf(codes.Unknown, err.Error()) @@ -1551,6 +1550,58 @@ func (s *Server) SplitRegions(ctx context.Context, request *pdpb.SplitRegionsReq }, nil } +<<<<<<< HEAD +======= +// SplitAndScatterRegions split regions by the given split keys, and scatter regions. +// Only regions which splited successfully will be scattered. +// scatterFinishedPercentage indicates the percentage of successfully splited regions that are scattered. +func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.SplitAndScatterRegionsRequest) (*pdpb.SplitAndScatterRegionsResponse, error) { + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { + return pdpb.NewPDClient(client).SplitAndScatterRegions(ctx, request) + } + if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + return nil, err + } else if rsp != nil { + return rsp.(*pdpb.SplitAndScatterRegionsResponse), err + } + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } + splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) + scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()), false) + if err != nil { + return nil, err + } + return &pdpb.SplitAndScatterRegionsResponse{ + Header: s.header(), + RegionsId: newRegionIDs, + SplitFinishedPercentage: uint64(splitFinishedPercentage), + ScatterFinishedPercentage: uint64(scatterFinishedPercentage), + }, nil +} + +// scatterRegions add operators to scatter regions and return the processed percentage and error +func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group string, retryLimit int, skipStoreLimit bool) (int, error) { + opsCount, failures, err := cluster.GetRegionScatter().ScatterRegionsByID(regionsID, group, retryLimit, skipStoreLimit) + if err != nil { + return 0, err + } + percentage := 100 + if len(failures) > 0 { + percentage = 100 - 100*len(failures)/(opsCount+len(failures)) + log.Debug("scatter regions", zap.Errors("failures", func() []error { + r := make([]error, 0, len(failures)) + for _, err := range failures { + r = append(r, err) + } + return r + }())) + } + return percentage, nil +} + +>>>>>>> d03f485c9 (*: check raftcluster nil (#7054)) // GetDCLocationInfo gets the dc-location info of the given dc-location from PD leader's TSO allocator manager. func (s *Server) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCLocationInfoRequest) (*pdpb.GetDCLocationInfoResponse, error) { var err error diff --git a/server/handler.go b/server/handler.go index 39303f4fc458..cfd48849c5fd 100644 --- a/server/handler.go +++ b/server/handler.go @@ -884,14 +884,20 @@ func (h *Handler) ResetTS(ts uint64) error { // SetStoreLimitScene sets the limit values for different scenes func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) { - cluster := h.s.GetRaftCluster() - cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return + } + rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) } // GetStoreLimitScene returns the limit values for different scenes func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene { - cluster := h.s.GetRaftCluster() - return cluster.GetStoreLimiter().StoreLimitScene(limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoreLimiter().StoreLimitScene(limitType) } // PluginLoad loads the plugin referenced by the pluginPath diff --git a/server/server.go b/server/server.go index 69cb827bcda0..bb223ddb91d7 100644 --- a/server/server.go +++ b/server/server.go @@ -845,19 +845,24 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } old := s.persistOptions.GetReplicationConfig() if cfg.EnablePlacementRules != old.EnablePlacementRules { - raftCluster := s.GetRaftCluster() - if raftCluster == nil { + rc := s.GetRaftCluster() + if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() } if cfg.EnablePlacementRules { // initialize rule manager. - if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { return err } } else { // NOTE: can be removed after placement rules feature is enabled by default. +<<<<<<< HEAD for _, s := range raftCluster.GetStores() { if !s.IsTombstone() && core.IsStoreContainLabel(s.GetMeta(), core.EngineKey, core.EngineTiFlash) { +======= + for _, s := range rc.GetStores() { + if !s.IsRemoved() && s.IsTiFlash() { +>>>>>>> d03f485c9 (*: check raftcluster nil (#7054)) return errors.New("cannot disable placement rules with TiFlash nodes") } } @@ -866,8 +871,12 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { var rule *placement.Rule if cfg.EnablePlacementRules { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } // replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule. - defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default") + defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { // replication config won't work when placement rule is enabled and exceeds one default rule @@ -893,7 +902,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels - if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err := rc.GetRuleManager().SetRule(rule); err != nil { log.Error("failed to update rule count", errs.ZapError(err)) return err @@ -905,7 +918,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { s.persistOptions.SetReplicationConfig(old) if rule != nil { rule.Count = int(old.MaxReplicas) - if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if e := rc.GetRuleManager().SetRule(rule); e != nil { log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e)) } } @@ -1079,18 +1096,18 @@ func (s *Server) GetServerOption() *config.PersistOptions { // GetMetaRegions gets meta regions from cluster. func (s *Server) GetMetaRegions() []*metapb.Region { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetMetaRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetMetaRegions() } return nil } // GetRegions gets regions from cluster. func (s *Server) GetRegions() []*core.RegionInfo { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetRegions() } return nil } @@ -1145,9 +1162,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro } log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) - cluster := s.GetRaftCluster() - if cluster != nil { - err := cluster.GetReplicationMode().UpdateConfig(cfg) + rc := s.GetRaftCluster() + if rc != nil { + err := rc.GetReplicationMode().UpdateConfig(cfg) if err != nil { log.Warn("failed to update replication mode", errs.ZapError(err)) // revert to old config @@ -1422,3 +1439,194 @@ func (s *Server) SaveTTLConfig(data map[string]interface{}, ttl time.Duration) e } return nil } +<<<<<<< HEAD +======= + +// IsTTLConfigExist returns true if the ttl config is existed for a given config. +func (s *Server) IsTTLConfigExist(key string) bool { + if config.IsSupportedTTLConfig(key) { + if _, ok := s.persistOptions.GetTTLData(key); ok { + return true + } + } + return false +} + +// MarkSnapshotRecovering mark pd that we're recovering +// tikv will get this state during BR EBS restore. +// we write this info into etcd for simplicity, the key only stays inside etcd temporary +// during BR EBS restore in which period the cluster is not able to serve request. +// and is deleted after BR EBS restore is done. +func (s *Server) MarkSnapshotRecovering() error { + log.Info("mark snapshot recovering") + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + // the value doesn't matter, set to a static string + _, err := kv.NewSlowLogTxn(s.client). + If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)). + Then(clientv3.OpPut(markPath, "on")). + Commit() + // if other client already marked, return success too + return err +} + +// IsSnapshotRecovering check whether recovering-mark marked +func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + resp, err := s.client.Get(ctx, markPath) + if err != nil { + return false, err + } + return len(resp.Kvs) > 0, nil +} + +// UnmarkSnapshotRecovering unmark recovering mark +func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { + log.Info("unmark snapshot recovering") + markPath := endpoint.AppendToRootPath(s.rootPath, recoveringMarkPath) + _, err := s.client.Delete(ctx, markPath) + // if other client already unmarked, return success too + return err +} + +// GetServicePrimaryAddr returns the primary address for a given service. +// Note: This function will only return primary address without judging if it's alive. +func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) { + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() + for i := 0; i < maxRetryTimesGetServicePrimary; i++ { + if v, ok := s.servicePrimaryMap.Load(serviceName); ok { + return v.(string), true + } + select { + case <-s.ctx.Done(): + return "", false + case <-ctx.Done(): + return "", false + case <-ticker.C: + } + } + return "", false +} + +// SetServicePrimaryAddr sets the primary address directly. +// Note: This function is only used for test. +func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { + s.servicePrimaryMap.Store(serviceName, addr) +} + +func (s *Server) initTSOPrimaryWatcher() { + serviceName := mcs.TSOServiceName + tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) + tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) + putFn := func(kv *mvccpb.KeyValue) error { + primary := &tsopb.Participant{} // TODO: use Generics + if err := proto.Unmarshal(kv.Value, primary); err != nil { + return err + } + listenUrls := primary.GetListenUrls() + if len(listenUrls) > 0 { + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update tso primary", zap.String("primary", listenUrls[0])) + } + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + var oldPrimary string + v, ok := s.servicePrimaryMap.Load(serviceName) + if ok { + oldPrimary = v.(string) + } + log.Info("delete tso primary", zap.String("old-primary", oldPrimary)) + s.servicePrimaryMap.Delete(serviceName) + return nil + } + s.tsoPrimaryWatcher = etcdutil.NewLoopWatcher( + s.serverLoopCtx, + &s.serverLoopWg, + s.client, + "tso-primary-watcher", + tsoServicePrimaryKey, + putFn, + deleteFn, + func() error { return nil }, + ) +} + +// RecoverAllocID recover alloc id. set current base id to input id +func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { + return s.idAllocator.SetBase(id) +} + +// GetExternalTS returns external timestamp. +func (s *Server) GetExternalTS() uint64 { + rc := s.GetRaftCluster() + if rc == nil { + return 0 + } + return rc.GetExternalTS() +} + +// SetExternalTS returns external timestamp. +func (s *Server) SetExternalTS(externalTS, globalTS uint64) error { + if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 { + desc := "the external timestamp should not be larger than global ts" + log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS)) + return errors.New(desc) + } + c := s.GetRaftCluster() + if c == nil { + return errs.ErrNotBootstrapped.FastGenByArgs() + } + currentExternalTS := c.GetExternalTS() + if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 { + desc := "the external timestamp should be larger than current external timestamp" + log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS)) + return errors.New(desc) + } + + return c.SetExternalTS(externalTS) +} + +// IsLocalTSOEnabled returns if the local TSO is enabled. +func (s *Server) IsLocalTSOEnabled() bool { + return s.cfg.IsLocalTSOEnabled() +} + +// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings. +// If the value is negative, there is no limit. +func (s *Server) GetMaxConcurrentTSOProxyStreamings() int { + return s.cfg.GetMaxConcurrentTSOProxyStreamings() +} + +// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client. +func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration { + return s.cfg.GetTSOProxyRecvFromClientTimeout() +} + +// GetLeaderLease returns the leader lease. +func (s *Server) GetLeaderLease() int64 { + return s.cfg.GetLeaderLease() +} + +// GetTSOSaveInterval returns TSO save interval. +func (s *Server) GetTSOSaveInterval() time.Duration { + return s.cfg.GetTSOSaveInterval() +} + +// GetTSOUpdatePhysicalInterval returns TSO update physical interval. +func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration { + return s.cfg.GetTSOUpdatePhysicalInterval() +} + +// GetMaxResetTSGap gets the max gap to reset the tso. +func (s *Server) GetMaxResetTSGap() time.Duration { + return s.persistOptions.GetMaxResetTSGap() +} + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} +>>>>>>> d03f485c9 (*: check raftcluster nil (#7054)) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b563a1bcc1ae..543637b64d37 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -213,3 +213,298 @@ func mustRequestSuccess(c *C, s *server.Server) http.Header { c.Assert(resp.StatusCode, Equals, http.StatusOK) return resp.Header } +<<<<<<< HEAD +======= + +func TestRemovingProgress(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + cluster.WaitLeader() + leader := cluster.GetServer(cluster.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + for _, store := range stores { + pdctl.MustPutStore(re, leader.GetServer(), store) + } + pdctl.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) + pdctl.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(30)) + pdctl.MustPutRegion(re, cluster, 1002, 1, []byte("e"), []byte("f"), core.SetApproximateSize(50)) + pdctl.MustPutRegion(re, cluster, 1003, 2, []byte("g"), []byte("h"), core.SetApproximateSize(40)) + + // no store removing + output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the action") + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the given store ID") + + // remove store 1 and store 2 + _ = sendRequest(re, leader.GetAddr()+"/pd/api/v1/store/1", http.MethodDelete, http.StatusOK) + _ = sendRequest(re, leader.GetAddr()+"/pd/api/v1/store/2", http.MethodDelete, http.StatusOK) + + // size is not changed. + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusOK) + var p api.Progress + re.NoError(json.Unmarshal(output, &p)) + re.Equal("removing", p.Action) + re.Equal(0.0, p.Progress) + re.Equal(0.0, p.CurrentSpeed) + re.Equal(math.MaxFloat64, p.LeftSeconds) + + // update size + pdctl.MustPutRegion(re, cluster, 1000, 1, []byte("a"), []byte("b"), core.SetApproximateSize(20)) + pdctl.MustPutRegion(re, cluster, 1001, 2, []byte("c"), []byte("d"), core.SetApproximateSize(10)) + + // is not prepared + time.Sleep(2 * time.Second) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + re.Equal("removing", p.Action) + re.Equal(0.0, p.Progress) + re.Equal(0.0, p.CurrentSpeed) + re.Equal(math.MaxFloat64, p.LeftSeconds) + + leader.GetRaftCluster().SetPrepared() + time.Sleep(2 * time.Second) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=removing", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + re.Equal("removing", p.Action) + // store 1: (60-20)/(60+50) ~= 0.36 + // store 2: (30-10)/(30+40) ~= 0.28 + // average progress ~= (0.36+0.28)/2 = 0.32 + re.Equal("0.32", fmt.Sprintf("%.2f", p.Progress)) + // store 1: 40/10s = 4 + // store 2: 20/10s = 2 + // average speed = (2+4)/2 = 33 + re.Equal(3.0, p.CurrentSpeed) + // store 1: (20+50)/4 = 17.5s + // store 2: (10+40)/2 = 25s + // average time = (17.5+25)/2 = 21.25s + re.Equal(21.25, p.LeftSeconds) + + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=2", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + re.Equal("removing", p.Action) + // store 2: (30-10)/(30+40) ~= 0.285 + re.Equal("0.29", fmt.Sprintf("%.2f", p.Progress)) + // store 2: 20/10s = 2 + re.Equal(2.0, p.CurrentSpeed) + // store 2: (10+40)/2 = 25s + re.Equal(25.0, p.LeftSeconds) + + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) +} + +func TestSendApiWhenRestartRaftCluster(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + leader := cluster.GetServer(cluster.WaitLeader()) + + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + + // Mock restart raft cluster + rc := leader.GetRaftCluster() + re.NotNil(rc) + rc.Stop() + + // Mock client-go will still send request + output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) + re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first") + + err = rc.Start(leader.GetServer()) + re.NoError(err) + rc = leader.GetRaftCluster() + re.NotNil(rc) +} + +func TestPreparingProgress(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + re.NoError(err) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + re.NoError(err) + + cluster.WaitLeader() + leader := cluster.GetServer(cluster.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + StartTimestamp: time.Now().UnixNano() - 100, + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + StartTimestamp: time.Now().UnixNano() - 100, + }, + { + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + StartTimestamp: time.Now().UnixNano() - 100, + }, + { + Id: 4, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Preparing, + LastHeartbeat: time.Now().UnixNano(), + StartTimestamp: time.Now().UnixNano() - 100, + }, + { + Id: 5, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Preparing, + LastHeartbeat: time.Now().UnixNano(), + StartTimestamp: time.Now().UnixNano() - 100, + }, + } + + for _, store := range stores { + pdctl.MustPutStore(re, leader.GetServer(), store) + } + for i := 0; i < 100; i++ { + pdctl.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) + } + // no store preparing + output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the action") + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=4", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the given store ID") + + // is not prepared + time.Sleep(2 * time.Second) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the action") + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=4", http.MethodGet, http.StatusNotFound) + re.Contains((string(output)), "no progress found for the given store ID") + + // size is not changed. + leader.GetRaftCluster().SetPrepared() + time.Sleep(2 * time.Second) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) + var p api.Progress + re.NoError(json.Unmarshal(output, &p)) + re.Equal("preparing", p.Action) + re.Equal(0.0, p.Progress) + re.Equal(0.0, p.CurrentSpeed) + re.Equal(math.MaxFloat64, p.LeftSeconds) + + // update size + pdctl.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) + time.Sleep(2 * time.Second) + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + re.Equal("preparing", p.Action) + // store 4: 10/(210*0.9) ~= 0.05 + // store 5: 40/(210*0.9) ~= 0.21 + // average progress ~= (0.05+0.21)/2 = 0.13 + re.Equal("0.13", fmt.Sprintf("%.2f", p.Progress)) + // store 4: 10/10s = 1 + // store 5: 40/10s = 4 + // average speed = (1+4)/2 = 2.5 + re.Equal(2.5, p.CurrentSpeed) + // store 4: 179/1 ~= 179 + // store 5: 149/4 ~= 37.25 + // average time ~= (179+37.25)/2 = 108.125 + re.Equal(108.125, p.LeftSeconds) + + output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?id=4", http.MethodGet, http.StatusOK) + re.NoError(json.Unmarshal(output, &p)) + re.Equal("preparing", p.Action) + re.Equal("0.05", fmt.Sprintf("%.2f", p.Progress)) + re.Equal(1.0, p.CurrentSpeed) + re.Equal(179.0, p.LeftSeconds) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) +} + +func sendRequest(re *require.Assertions, url string, method string, statusCode int) []byte { + req, _ := http.NewRequest(method, url, nil) + resp, err := dialClient.Do(req) + re.NoError(err) + re.Equal(statusCode, resp.StatusCode) + output, err := io.ReadAll(resp.Body) + re.NoError(err) + resp.Body.Close() + return output +} +>>>>>>> d03f485c9 (*: check raftcluster nil (#7054))