diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index c367aabdd1f..a6af3ea3735 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -48,7 +48,7 @@ type minResolvedTS struct { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts [get] func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) { - c := h.svr.GetRaftCluster() + c := getCluster(r) value := c.GetMinResolvedTS() persistInterval := c.GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval h.rd.JSON(w, http.StatusOK, minResolvedTS{ diff --git a/server/api/router.go b/server/api/router.go index 523d7a3e94a..18cb9ae42e4 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -327,7 +327,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { // min resolved ts API minResolvedTSHandler := newMinResolvedTSHandler(svr, rd) - registerFunc(apiRouter, "/min-resolved-ts", minResolvedTSHandler.GetMinResolvedTS, setMethods("GET")) + registerFunc(clusterRouter, "/min-resolved-ts", minResolvedTSHandler.GetMinResolvedTS, setMethods("GET")) // unsafe admin operation API unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) diff --git a/server/grpc_service.go b/server/grpc_service.go index fc82aeddd8d..acefd95f566 100755 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1210,7 +1210,6 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if rc == nil { return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil } - _, err := rc.HandleBatchReportSplit(request) if err != nil { return &pdpb.ReportBatchSplitResponse{ @@ -1697,6 +1696,9 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S return rsp.(*pdpb.SplitAndScatterRegionsResponse), err } rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil + } splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit())) scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit())) if err != nil { diff --git a/server/handler.go b/server/handler.go index 08419a05c11..0fd112c6eb2 100644 --- a/server/handler.go +++ b/server/handler.go @@ -925,14 +925,20 @@ func (h *Handler) ResetTS(ts uint64) error { // SetStoreLimitScene sets the limit values for different scenes func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) { - cluster := h.s.GetRaftCluster() - cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return + } + rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType) } // GetStoreLimitScene returns the limit values for different scenes func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene { - cluster := h.s.GetRaftCluster() - return cluster.GetStoreLimiter().StoreLimitScene(limitType) + rc := h.s.GetRaftCluster() + if rc == nil { + return nil + } + return rc.GetStoreLimiter().StoreLimitScene(limitType) } // GetProgressByID returns the progress details for a given store ID. diff --git a/server/server.go b/server/server.go index 2d95a54cf0f..3463b3c6e4a 100644 --- a/server/server.go +++ b/server/server.go @@ -891,18 +891,18 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { } old := s.persistOptions.GetReplicationConfig() if cfg.EnablePlacementRules != old.EnablePlacementRules { - raftCluster := s.GetRaftCluster() - if raftCluster == nil { + rc := s.GetRaftCluster() + if rc == nil { return errs.ErrNotBootstrapped.GenWithStackByArgs() } if cfg.EnablePlacementRules { // initialize rule manager. - if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { + if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil { return err } } else { // NOTE: can be removed after placement rules feature is enabled by default. - for _, s := range raftCluster.GetStores() { + for _, s := range rc.GetStores() { if !s.IsRemoved() && s.IsTiFlash() { return errors.New("cannot disable placement rules with TiFlash nodes") } @@ -912,8 +912,12 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { var rule *placement.Rule if cfg.EnablePlacementRules { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } // replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule. - defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default") + defaultRule := rc.GetRuleManager().GetRule("pd", "default") CheckInDefaultRule := func() error { // replication config won't work when placement rule is enabled and exceeds one default rule @@ -939,7 +943,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { if rule != nil { rule.Count = int(cfg.MaxReplicas) rule.LocationLabels = cfg.LocationLabels - if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if err := rc.GetRuleManager().SetRule(rule); err != nil { log.Error("failed to update rule count", errs.ZapError(err)) return err @@ -951,7 +959,11 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { s.persistOptions.SetReplicationConfig(old) if rule != nil { rule.Count = int(old.MaxReplicas) - if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil { + rc := s.GetRaftCluster() + if rc == nil { + return errs.ErrNotBootstrapped.GenWithStackByArgs() + } + if e := rc.GetRuleManager().SetRule(rule); e != nil { log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e)) } } @@ -1136,18 +1148,18 @@ func (s *Server) GetServerOption() *config.PersistOptions { // GetMetaRegions gets meta regions from cluster. func (s *Server) GetMetaRegions() []*metapb.Region { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetMetaRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetMetaRegions() } return nil } // GetRegions gets regions from cluster. func (s *Server) GetRegions() []*core.RegionInfo { - cluster := s.GetRaftCluster() - if cluster != nil { - return cluster.GetRegions() + rc := s.GetRaftCluster() + if rc != nil { + return rc.GetRegions() } return nil } @@ -1253,9 +1265,9 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro } log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) - cluster := s.GetRaftCluster() - if cluster != nil { - err := cluster.GetReplicationMode().UpdateConfig(cfg) + rc := s.GetRaftCluster() + if rc != nil { + err := rc.GetReplicationMode().UpdateConfig(cfg) if err != nil { log.Warn("failed to update replication mode", errs.ZapError(err)) // revert to old config diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index b947d7f0f04..8102532102f 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -742,6 +742,46 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"), IsNil) } +func (s *testProgressSuite) TestSendApiWhenRestartRaftCluster(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Replication.MaxReplicas = 1 + }) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + leader := cluster.GetServer(cluster.WaitLeader()) + + grpcPDClient := testutil.MustNewGrpcClient(c, leader.GetAddr()) + clusterID := leader.GetClusterID() + req := &pdpb.BootstrapRequest{ + Header: testutil.NewRequestHeader(clusterID), + Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"}, + Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}}, + } + resp, err := grpcPDClient.Bootstrap(context.Background(), req) + c.Assert(err, IsNil) + c.Assert(resp.GetHeader().GetError(), IsNil) + + // Mock restart raft cluster + rc := leader.GetRaftCluster() + c.Assert(rc, NotNil) + rc.Stop() + + // Mock client-go will still send request + output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError) + + c.Assert(strings.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first"), IsTrue) + + err = rc.Start(leader.GetServer()) + c.Assert(err, IsNil) + rc = leader.GetRaftCluster() + c.Assert(rc, NotNil) +} + func sendRequest(c *C, url string, method string, statusCode int) []byte { req, _ := http.NewRequest(method, url, nil) resp, err := dialClient.Do(req)