Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: check raftcluster nil (#7054) #7071

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/api/min_resolved_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts/{store_id} [get]
func (h *minResolvedTSHandler) GetStoreMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)

Check warning on line 54 in server/api/min_resolved_ts.go

View check run for this annotation

Codecov / codecov/patch

server/api/min_resolved_ts.go#L54

Added line #L54 was not covered by tests
idStr := mux.Vars(r)["store_id"]
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand All @@ -74,7 +74,7 @@
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /min-resolved-ts [get]
func (h *minResolvedTSHandler) GetMinResolvedTS(w http.ResponseWriter, r *http.Request) {
c := h.svr.GetRaftCluster()
c := getCluster(r)
value := c.GetMinResolvedTS()
persistInterval := c.GetPDServerConfig().MinResolvedTSPersistenceInterval
h.rd.JSON(w, http.StatusOK, minResolvedTS{
Expand Down
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2518,7 +2518,11 @@
func (c *RaftCluster) GetStoreMinResolvedTS(storeID uint64) uint64 {
c.RLock()
defer c.RUnlock()
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(c.GetStore(storeID)) {
store := c.GetStore(storeID)
if store == nil {
return math.MaxUint64

Check warning on line 2523 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2521-L2523

Added lines #L2521 - L2523 were not covered by tests
}
if !c.isInitialized() || !core.IsAvailableForMinResolvedTS(store) {

Check warning on line 2525 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L2525

Added line #L2525 was not covered by tests
return math.MaxUint64
}
return c.GetStore(storeID).GetMinResolvedTS()
Expand Down
4 changes: 3 additions & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,6 @@
if rc == nil {
return &pdpb.ReportBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -1585,6 +1584,9 @@
return rsp.(*pdpb.SplitAndScatterRegionsResponse), err
}
rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.SplitAndScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil

Check warning on line 1588 in server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

server/grpc_service.go#L1587-L1588

Added lines #L1587 - L1588 were not covered by tests
}
splitFinishedPercentage, newRegionIDs := rc.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
scatterFinishedPercentage, err := scatterRegions(rc, newRegionIDs, request.GetGroup(), int(request.GetRetryLimit()))
if err != nil {
Expand Down
14 changes: 10 additions & 4 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,14 +943,20 @@

// SetStoreLimitScene sets the limit values for different scenes
func (h *Handler) SetStoreLimitScene(scene *storelimit.Scene, limitType storelimit.Type) {
cluster := h.s.GetRaftCluster()
cluster.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return

Check warning on line 948 in server/handler.go

View check run for this annotation

Codecov / codecov/patch

server/handler.go#L948

Added line #L948 was not covered by tests
}
rc.GetStoreLimiter().ReplaceStoreLimitScene(scene, limitType)
}

// GetStoreLimitScene returns the limit values for different scenes
func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scene {
cluster := h.s.GetRaftCluster()
return cluster.GetStoreLimiter().StoreLimitScene(limitType)
rc := h.s.GetRaftCluster()
if rc == nil {
return nil

Check warning on line 957 in server/handler.go

View check run for this annotation

Codecov / codecov/patch

server/handler.go#L957

Added line #L957 was not covered by tests
}
return rc.GetStoreLimiter().StoreLimitScene(limitType)
}

// GetProgressByID returns the progress details for a given store ID.
Expand Down
60 changes: 40 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,18 +963,18 @@
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
raftCluster := s.GetRaftCluster()
if raftCluster == nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := raftCluster.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
for _, s := range rc.GetStores() {
if !s.IsRemoved() && s.IsTiFlash() {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
Expand All @@ -984,8 +984,12 @@

var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 989 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L989

Added line #L989 was not covered by tests
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := s.GetRaftCluster().GetRuleManager().GetRule("pd", "default")
defaultRule := rc.GetRuleManager().GetRule("pd", "default")

CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
Expand All @@ -1011,7 +1015,11 @@
if rule != nil {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
if err := s.GetRaftCluster().GetRuleManager().SetRule(rule); err != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 1020 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1020

Added line #L1020 was not covered by tests
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
Expand All @@ -1023,7 +1031,11 @@
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
if e := s.GetRaftCluster().GetRuleManager().SetRule(rule); e != nil {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()

Check warning on line 1036 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1034-L1036

Added lines #L1034 - L1036 were not covered by tests
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {

Check warning on line 1038 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1038

Added line #L1038 was not covered by tests
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
Expand Down Expand Up @@ -1262,18 +1274,18 @@

// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetMetaRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()

Check warning on line 1279 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1277-L1279

Added lines #L1277 - L1279 were not covered by tests
}
return nil
}

// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
cluster := s.GetRaftCluster()
if cluster != nil {
return cluster.GetRegions()
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
Expand Down Expand Up @@ -1394,9 +1406,9 @@
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))

cluster := s.GetRaftCluster()
if cluster != nil {
err := cluster.GetReplicationMode().UpdateConfig(cfg)
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
Expand Down Expand Up @@ -1917,7 +1929,11 @@

// GetExternalTS returns external timestamp.
func (s *Server) GetExternalTS() uint64 {
return s.GetRaftCluster().GetExternalTS()
rc := s.GetRaftCluster()
if rc == nil {
return 0

Check warning on line 1934 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1934

Added line #L1934 was not covered by tests
}
return rc.GetExternalTS()
}

// SetExternalTS returns external timestamp.
Expand All @@ -1927,14 +1943,18 @@
log.Error(desc, zap.Uint64("request timestamp", externalTS), zap.Uint64("global ts", globalTS))
return errors.New(desc)
}
currentExternalTS := s.GetRaftCluster().GetExternalTS()
c := s.GetRaftCluster()
if c == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()

Check warning on line 1948 in server/server.go

View check run for this annotation

Codecov / codecov/patch

server/server.go#L1948

Added line #L1948 was not covered by tests
}
currentExternalTS := c.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than current external timestamp"
log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS))
return errors.New(desc)
}
s.GetRaftCluster().SetExternalTS(externalTS)
return nil

return c.SetExternalTS(externalTS)
}

// IsLocalTSOEnabled returns if the local TSO is enabled.
Expand Down
40 changes: 40 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,46 @@ func TestRemovingProgress(t *testing.T) {
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func TestSendApiWhenRestartRaftCluster(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Replication.MaxReplicas = 1
})
re.NoError(err)
defer cluster.Destroy()

err = cluster.RunInitialServers()
re.NoError(err)
leader := cluster.GetServer(cluster.WaitLeader())

grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Store: &metapb.Store{Id: 1, Address: "127.0.0.1:0"},
Region: &metapb.Region{Id: 2, Peers: []*metapb.Peer{{Id: 3, StoreId: 1, Role: metapb.PeerRole_Voter}}},
}
resp, err := grpcPDClient.Bootstrap(context.Background(), req)
re.NoError(err)
re.Nil(resp.GetHeader().GetError())

// Mock restart raft cluster
rc := leader.GetRaftCluster()
re.NotNil(rc)
rc.Stop()

// Mock client-go will still send request
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/min-resolved-ts", http.MethodGet, http.StatusInternalServerError)
re.Contains(string(output), "TiKV cluster not bootstrapped, please start TiKV first")

err = rc.Start(leader.GetServer())
re.NoError(err)
rc = leader.GetRaftCluster()
re.NotNil(rc)
}

func TestPreparingProgress(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
Expand Down
Loading