diff --git a/.gitignore b/.gitignore index 748d24872b6..d9e06652f27 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,8 @@ coverage.xml coverage *.txt go.work* +<<<<<<< HEAD +======= +embedded_assets_handler.go +*.log +>>>>>>> 26e90e9ff (scheduler: skip evict-leader-scheduler when setting schedule deny label (#8303)) diff --git a/Makefile b/Makefile index 4ef25b5d662..540a92d4c4b 100644 --- a/Makefile +++ b/Makefile @@ -250,7 +250,18 @@ test-tso-consistency: install-tools CGO_ENABLED=1 go test -race -tags without_dashboard,tso_consistency_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) +<<<<<<< HEAD .PHONY: test basic-test test-with-cover test-tso-function test-tso-consistency +======= +REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster + +test-real-cluster: + @ rm -rf ~/.tiup/data/pd_real_cluster_test + # testing with the real cluster... + cd $(REAL_CLUSTER_TEST_PATH) && $(MAKE) check + +.PHONY: test basic-test test-with-cover test-tso test-tso-function test-tso-consistency test-real-cluster +>>>>>>> 26e90e9ff (scheduler: skip evict-leader-scheduler when setting schedule deny label (#8303)) #### Daily CI coverage analyze #### diff --git a/client/http/interface.go b/client/http/interface.go new file mode 100644 index 00000000000..f90ab19624f --- /dev/null +++ b/client/http/interface.go @@ -0,0 +1,1026 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/client/retry" +) + +// Client is a PD (Placement Driver) HTTP client. +type Client interface { + /* Member-related interfaces */ + GetMembers(context.Context) (*MembersInfo, error) + GetLeader(context.Context) (*pdpb.Member, error) + TransferLeader(context.Context, string) error + /* Meta-related interfaces */ + GetRegionByID(context.Context, uint64) (*RegionInfo, error) + GetRegionByKey(context.Context, []byte) (*RegionInfo, error) + GetRegions(context.Context) (*RegionsInfo, error) + GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error) + GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error) + GetEmptyRegions(context.Context) (*RegionsInfo, error) + GetRegionsReplicatedStateByKeyRange(context.Context, *KeyRange) (string, error) + GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) + GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) + GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) + GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) + GetStores(context.Context) (*StoresInfo, error) + GetStore(context.Context, uint64) (*StoreInfo, error) + DeleteStore(context.Context, uint64) error + SetStoreLabels(context.Context, int64, map[string]string) error + DeleteStoreLabel(ctx context.Context, storeID int64, labelKey string) error + GetHealthStatus(context.Context) ([]Health, error) + /* Config-related interfaces */ + GetConfig(context.Context) (map[string]any, error) + SetConfig(context.Context, map[string]any, ...float64) error + GetScheduleConfig(context.Context) (map[string]any, error) + SetScheduleConfig(context.Context, map[string]any) error + GetClusterVersion(context.Context) (string, error) + GetCluster(context.Context) (*metapb.Cluster, error) + GetClusterStatus(context.Context) (*ClusterState, error) + GetStatus(context.Context) (*State, error) + GetReplicateConfig(context.Context) (map[string]any, error) + /* Scheduler-related interfaces */ + GetSchedulers(context.Context) ([]string, error) + CreateScheduler(ctx context.Context, name string, storeID uint64) error + DeleteScheduler(ctx context.Context, name string) error + SetSchedulerDelay(context.Context, string, int64) error + /* Rule-related interfaces */ + GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) + GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) + GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error) + GetPlacementRule(context.Context, string, string) (*Rule, error) + SetPlacementRule(context.Context, *Rule) error + SetPlacementRuleInBatch(context.Context, []*RuleOp) error + SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error + DeletePlacementRule(context.Context, string, string) error + GetAllPlacementRuleGroups(context.Context) ([]*RuleGroup, error) + GetPlacementRuleGroupByID(context.Context, string) (*RuleGroup, error) + SetPlacementRuleGroup(context.Context, *RuleGroup) error + DeletePlacementRuleGroupByID(context.Context, string) error + GetAllRegionLabelRules(context.Context) ([]*LabelRule, error) + GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error) + // `SetRegionLabelRule` sets the label rule for a region. + // When a label rule (deny scheduler) is set, + // 1. All schedulers will be disabled except for the evict-leader-scheduler. + // 2. The merge-checker will be disabled, preventing these regions from being merged. + SetRegionLabelRule(context.Context, *LabelRule) error + PatchRegionLabelRules(context.Context, *LabelRulePatch) error + /* Scheduling-related interfaces */ + AccelerateSchedule(context.Context, *KeyRange) error + AccelerateScheduleInBatch(context.Context, []*KeyRange) error + /* Admin-related interfaces */ + ResetTS(context.Context, uint64, bool) error + ResetBaseAllocID(context.Context, uint64) error + SetSnapshotRecoveringMark(context.Context) error + DeleteSnapshotRecoveringMark(context.Context) error + /* Other interfaces */ + GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) + GetPDVersion(context.Context) (string, error) + /* Micro Service interfaces */ + GetMicroServiceMembers(context.Context, string) ([]MicroServiceMember, error) + GetMicroServicePrimary(context.Context, string) (string, error) + DeleteOperators(context.Context) error + + /* Keyspace interface */ + + // UpdateKeyspaceGCManagementType update the `gc_management_type` in keyspace meta config. + // If `gc_management_type` is `global_gc`, it means the current keyspace requires a tidb without 'keyspace-name' + // configured to run a global gc worker to calculate a global gc safe point. + // If `gc_management_type` is `keyspace_level_gc` it means the current keyspace can calculate gc safe point by its own. + UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCManagementType *KeyspaceGCManagementTypeConfig) error + GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error) + + /* Client-related methods */ + // WithCallerID sets and returns a new client with the given caller ID. + WithCallerID(string) Client + // WithRespHandler sets and returns a new client with the given HTTP response handler. + // This allows the caller to customize how the response is handled, including error handling logic. + // Additionally, it is important for the caller to handle the content of the response body properly + // in order to ensure that it can be read and marshaled correctly into `res`. + WithRespHandler(func(resp *http.Response, res any) error) Client + // WithBackoffer sets and returns a new client with the given backoffer. + WithBackoffer(*retry.Backoffer) Client + // WithTargetURL sets and returns a new client with the given target URL. + WithTargetURL(string) Client + // Close gracefully closes the HTTP client. + Close() +} + +var _ Client = (*client)(nil) + +// GetMembers gets the members info of PD cluster. +func (c *client) GetMembers(ctx context.Context) (*MembersInfo, error) { + var members MembersInfo + err := c.request(ctx, newRequestInfo(). + WithName(getMembersName). + WithURI(membersPrefix). + WithMethod(http.MethodGet). + WithResp(&members)) + if err != nil { + return nil, err + } + return &members, nil +} + +// GetLeader gets the leader of PD cluster. +func (c *client) GetLeader(ctx context.Context) (*pdpb.Member, error) { + var leader pdpb.Member + err := c.request(ctx, newRequestInfo(). + WithName(getLeaderName). + WithURI(leaderPrefix). + WithMethod(http.MethodGet). + WithResp(&leader)) + if err != nil { + return nil, err + } + return &leader, nil +} + +// TransferLeader transfers the PD leader. +func (c *client) TransferLeader(ctx context.Context, newLeader string) error { + return c.request(ctx, newRequestInfo(). + WithName(transferLeaderName). + WithURI(TransferLeaderByID(newLeader)). + WithMethod(http.MethodPost)) +} + +// GetRegionByID gets the region info by ID. +func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) { + var region RegionInfo + err := c.request(ctx, newRequestInfo(). + WithName(getRegionByIDName). + WithURI(RegionByID(regionID)). + WithMethod(http.MethodGet). + WithResp(®ion)) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegionByKey gets the region info by key. +func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) { + var region RegionInfo + err := c.request(ctx, newRequestInfo(). + WithName(getRegionByKeyName). + WithURI(RegionByKey(key)). + WithMethod(http.MethodGet). + WithResp(®ion)) + if err != nil { + return nil, err + } + return ®ion, nil +} + +// GetRegions gets the regions info. +func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.request(ctx, newRequestInfo(). + WithName(getRegionsName). + WithURI(Regions). + WithMethod(http.MethodGet). + WithResp(®ions)) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range. +// The keys in the key range should be encoded in the UTF-8 bytes format. +func (c *client) GetRegionsByKeyRange(ctx context.Context, keyRange *KeyRange, limit int) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.request(ctx, newRequestInfo(). + WithName(getRegionsByKeyRangeName). + WithURI(RegionsByKeyRange(keyRange, limit)). + WithMethod(http.MethodGet). + WithResp(®ions)) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsByStoreID gets the regions info by store ID. +func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.request(ctx, newRequestInfo(). + WithName(getRegionsByStoreIDName). + WithURI(RegionsByStoreID(storeID)). + WithMethod(http.MethodGet). + WithResp(®ions)) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetEmptyRegions gets the empty regions info. +func (c *client) GetEmptyRegions(ctx context.Context) (*RegionsInfo, error) { + var regions RegionsInfo + err := c.request(ctx, newRequestInfo(). + WithName(getEmptyRegionsName). + WithURI(EmptyRegions). + WithMethod(http.MethodGet). + WithResp(®ions)) + if err != nil { + return nil, err + } + return ®ions, nil +} + +// GetRegionsReplicatedStateByKeyRange gets the regions replicated state info by key range. +// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes). +func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRange *KeyRange) (string, error) { + var state string + err := c.request(ctx, newRequestInfo(). + WithName(getRegionsReplicatedStateByKeyRangeName). + WithURI(RegionsReplicatedByKeyRange(keyRange)). + WithMethod(http.MethodGet). + WithResp(&state)) + if err != nil { + return "", err + } + return state, nil +} + +// GetHotReadRegions gets the hot read region statistics info. +func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotReadRegions StoreHotPeersInfos + err := c.request(ctx, newRequestInfo(). + WithName(getHotReadRegionsName). + WithURI(HotRead). + WithMethod(http.MethodGet). + WithResp(&hotReadRegions)) + if err != nil { + return nil, err + } + return &hotReadRegions, nil +} + +// GetHotWriteRegions gets the hot write region statistics info. +func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { + var hotWriteRegions StoreHotPeersInfos + err := c.request(ctx, newRequestInfo(). + WithName(getHotWriteRegionsName). + WithURI(HotWrite). + WithMethod(http.MethodGet). + WithResp(&hotWriteRegions)) + if err != nil { + return nil, err + } + return &hotWriteRegions, nil +} + +// GetHistoryHotRegions gets the history hot region statistics info. +func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegionsRequest) (*HistoryHotRegions, error) { + reqJSON, err := json.Marshal(req) + if err != nil { + return nil, errors.Trace(err) + } + var historyHotRegions HistoryHotRegions + err = c.request(ctx, newRequestInfo(). + WithName(getHistoryHotRegionsName). + WithURI(HotHistory). + WithMethod(http.MethodGet). + WithBody(reqJSON). + WithResp(&historyHotRegions), + WithAllowFollowerHandle()) + if err != nil { + return nil, err + } + return &historyHotRegions, nil +} + +// GetRegionStatusByKeyRange gets the region status by key range. +// If the `onlyCount` flag is true, the result will only include the count of regions. +// The keys in the key range should be encoded in the UTF-8 bytes format. +func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange, onlyCount bool) (*RegionStats, error) { + var regionStats RegionStats + err := c.request(ctx, newRequestInfo(). + WithName(getRegionStatusByKeyRangeName). + WithURI(RegionStatsByKeyRange(keyRange, onlyCount)). + WithMethod(http.MethodGet). + WithResp(®ionStats)) + if err != nil { + return nil, err + } + return ®ionStats, nil +} + +// SetStoreLabels sets the labels of a store. +func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels map[string]string) error { + jsonInput, err := json.Marshal(storeLabels) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setStoreLabelsName). + WithURI(LabelByStoreID(storeID)). + WithMethod(http.MethodPost). + WithBody(jsonInput)) +} + +// DeleteStoreLabel deletes the labels of a store. +func (c *client) DeleteStoreLabel(ctx context.Context, storeID int64, labelKey string) error { + jsonInput, err := json.Marshal(labelKey) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(deleteStoreLabelName). + WithURI(LabelByStoreID(storeID)). + WithMethod(http.MethodDelete). + WithBody(jsonInput)) +} + +// GetHealthStatus gets the health status of the cluster. +func (c *client) GetHealthStatus(ctx context.Context) ([]Health, error) { + var healths []Health + err := c.request(ctx, newRequestInfo(). + WithName(getHealthStatusName). + WithURI(health). + WithMethod(http.MethodGet). + WithResp(&healths)) + if err != nil { + return nil, err + } + return healths, nil +} + +// GetConfig gets the configurations. +func (c *client) GetConfig(ctx context.Context) (map[string]any, error) { + var config map[string]any + err := c.request(ctx, newRequestInfo(). + WithName(getConfigName). + WithURI(Config). + WithMethod(http.MethodGet). + WithResp(&config)) + if err != nil { + return nil, err + } + return config, nil +} + +// SetConfig sets the configurations. ttlSecond is optional. +func (c *client) SetConfig(ctx context.Context, config map[string]any, ttlSecond ...float64) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + var uri string + if len(ttlSecond) > 0 { + uri = ConfigWithTTLSeconds(ttlSecond[0]) + } else { + uri = Config + } + return c.request(ctx, newRequestInfo(). + WithName(setConfigName). + WithURI(uri). + WithMethod(http.MethodPost). + WithBody(configJSON)) +} + +// GetScheduleConfig gets the schedule configurations. +func (c *client) GetScheduleConfig(ctx context.Context) (map[string]any, error) { + var config map[string]any + err := c.request(ctx, newRequestInfo(). + WithName(getScheduleConfigName). + WithURI(ScheduleConfig). + WithMethod(http.MethodGet). + WithResp(&config)) + if err != nil { + return nil, err + } + return config, nil +} + +// SetScheduleConfig sets the schedule configurations. +func (c *client) SetScheduleConfig(ctx context.Context, config map[string]any) error { + configJSON, err := json.Marshal(config) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setScheduleConfigName). + WithURI(ScheduleConfig). + WithMethod(http.MethodPost). + WithBody(configJSON)) +} + +// GetStores gets the stores info. +func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { + var stores StoresInfo + err := c.request(ctx, newRequestInfo(). + WithName(getStoresName). + WithURI(Stores). + WithMethod(http.MethodGet). + WithResp(&stores)) + if err != nil { + return nil, err + } + return &stores, nil +} + +// GetStore gets the store info by ID. +func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, error) { + var store StoreInfo + err := c.request(ctx, newRequestInfo(). + WithName(getStoreName). + WithURI(StoreByID(storeID)). + WithMethod(http.MethodGet). + WithResp(&store)) + if err != nil { + return nil, err + } + return &store, nil +} + +// DeleteStore deletes the store by ID. +func (c *client) DeleteStore(ctx context.Context, storeID uint64) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteStoreName). + WithURI(StoreByID(storeID)). + WithMethod(http.MethodDelete)) +} + +// GetClusterVersion gets the cluster version. +func (c *client) GetClusterVersion(ctx context.Context) (string, error) { + var version string + err := c.request(ctx, newRequestInfo(). + WithName(getClusterVersionName). + WithURI(ClusterVersion). + WithMethod(http.MethodGet). + WithResp(&version)) + if err != nil { + return "", err + } + return version, nil +} + +// GetCluster gets the cluster meta information. +func (c *client) GetCluster(ctx context.Context) (*metapb.Cluster, error) { + var clusterInfo *metapb.Cluster + err := c.request(ctx, newRequestInfo(). + WithName(getClusterName). + WithURI(Cluster). + WithMethod(http.MethodGet). + WithResp(&clusterInfo)) + if err != nil { + return nil, err + } + return clusterInfo, nil +} + +// GetClusterStatus gets the cluster status. +func (c *client) GetClusterStatus(ctx context.Context) (*ClusterState, error) { + var clusterStatus *ClusterState + err := c.request(ctx, newRequestInfo(). + WithName(getClusterName). + WithURI(ClusterStatus). + WithMethod(http.MethodGet). + WithResp(&clusterStatus)) + if err != nil { + return nil, err + } + return clusterStatus, nil +} + +// GetStatus gets the status of PD. +func (c *client) GetStatus(ctx context.Context) (*State, error) { + var status *State + err := c.request(ctx, newRequestInfo(). + WithName(getStatusName). + WithURI(Status). + WithMethod(http.MethodGet). + WithResp(&status), + WithAllowFollowerHandle()) + if err != nil { + return nil, err + } + return status, nil +} + +// GetReplicateConfig gets the replication configurations. +func (c *client) GetReplicateConfig(ctx context.Context) (map[string]any, error) { + var config map[string]any + err := c.request(ctx, newRequestInfo(). + WithName(getReplicateConfigName). + WithURI(ReplicateConfig). + WithMethod(http.MethodGet). + WithResp(&config)) + if err != nil { + return nil, err + } + return config, nil +} + +// GetAllPlacementRuleBundles gets all placement rules bundles. +func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) { + var bundles []*GroupBundle + err := c.request(ctx, newRequestInfo(). + WithName(getAllPlacementRuleBundlesName). + WithURI(PlacementRuleBundle). + WithMethod(http.MethodGet). + WithResp(&bundles)) + if err != nil { + return nil, err + } + return bundles, nil +} + +// GetPlacementRuleBundleByGroup gets the placement rules bundle by group. +func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) { + var bundle GroupBundle + err := c.request(ctx, newRequestInfo(). + WithName(getPlacementRuleBundleByGroupName). + WithURI(PlacementRuleBundleByGroup(group)). + WithMethod(http.MethodGet). + WithResp(&bundle)) + if err != nil { + return nil, err + } + return &bundle, nil +} + +// GetPlacementRulesByGroup gets the placement rules by group. +func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) { + var rules []*Rule + err := c.request(ctx, newRequestInfo(). + WithName(getPlacementRulesByGroupName). + WithURI(PlacementRulesByGroup(group)). + WithMethod(http.MethodGet). + WithResp(&rules)) + if err != nil { + return nil, err + } + return rules, nil +} + +// GetPlacementRule gets the placement rule by group and ID. +func (c *client) GetPlacementRule(ctx context.Context, group, id string) (*Rule, error) { + var rule Rule + err := c.request(ctx, newRequestInfo(). + WithName(getPlacementRuleName). + WithURI(PlacementRuleByGroupAndID(group, id)). + WithMethod(http.MethodGet). + WithResp(&rule)) + if err != nil { + return nil, err + } + return &rule, nil +} + +// SetPlacementRule sets the placement rule. +func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error { + ruleJSON, err := json.Marshal(rule) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setPlacementRuleName). + WithURI(PlacementRule). + WithMethod(http.MethodPost). + WithBody(ruleJSON)) +} + +// SetPlacementRuleInBatch sets the placement rules in batch. +func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) error { + ruleOpsJSON, err := json.Marshal(ruleOps) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setPlacementRuleInBatchName). + WithURI(PlacementRulesInBatch). + WithMethod(http.MethodPost). + WithBody(ruleOpsJSON)) +} + +// SetPlacementRuleBundles sets the placement rule bundles. +// If `partial` is false, all old configurations will be over-written and dropped. +func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error { + bundlesJSON, err := json.Marshal(bundles) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setPlacementRuleBundlesName). + WithURI(PlacementRuleBundleWithPartialParameter(partial)). + WithMethod(http.MethodPost). + WithBody(bundlesJSON)) +} + +// DeletePlacementRule deletes the placement rule. +func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error { + return c.request(ctx, newRequestInfo(). + WithName(deletePlacementRuleName). + WithURI(PlacementRuleByGroupAndID(group, id)). + WithMethod(http.MethodDelete)) +} + +// GetAllPlacementRuleGroups gets all placement rule groups. +func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, error) { + var ruleGroups []*RuleGroup + err := c.request(ctx, newRequestInfo(). + WithName(getAllPlacementRuleGroupsName). + WithURI(placementRuleGroups). + WithMethod(http.MethodGet). + WithResp(&ruleGroups)) + if err != nil { + return nil, err + } + return ruleGroups, nil +} + +// GetPlacementRuleGroupByID gets the placement rule group by ID. +func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*RuleGroup, error) { + var ruleGroup RuleGroup + err := c.request(ctx, newRequestInfo(). + WithName(getPlacementRuleGroupByIDName). + WithURI(PlacementRuleGroupByID(id)). + WithMethod(http.MethodGet). + WithResp(&ruleGroup)) + if err != nil { + return nil, err + } + return &ruleGroup, nil +} + +// SetPlacementRuleGroup sets the placement rule group. +func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup) error { + ruleGroupJSON, err := json.Marshal(ruleGroup) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setPlacementRuleGroupName). + WithURI(placementRuleGroup). + WithMethod(http.MethodPost). + WithBody(ruleGroupJSON)) +} + +// DeletePlacementRuleGroupByID deletes the placement rule group by ID. +func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error { + return c.request(ctx, newRequestInfo(). + WithName(deletePlacementRuleGroupByIDName). + WithURI(PlacementRuleGroupByID(id)). + WithMethod(http.MethodDelete)) +} + +// GetAllRegionLabelRules gets all region label rules. +func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) { + var labelRules []*LabelRule + err := c.request(ctx, newRequestInfo(). + WithName(getAllRegionLabelRulesName). + WithURI(RegionLabelRules). + WithMethod(http.MethodGet). + WithResp(&labelRules)) + if err != nil { + return nil, err + } + return labelRules, nil +} + +// GetRegionLabelRulesByIDs gets the region label rules by IDs. +func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) { + idsJSON, err := json.Marshal(ruleIDs) + if err != nil { + return nil, errors.Trace(err) + } + var labelRules []*LabelRule + err = c.request(ctx, newRequestInfo(). + WithName(getRegionLabelRulesByIDsName). + WithURI(RegionLabelRules). + WithMethod(http.MethodGet). + WithBody(idsJSON). + WithResp(&labelRules)) + if err != nil { + return nil, err + } + return labelRules, nil +} + +// SetRegionLabelRule sets the region label rule. +func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error { + labelRuleJSON, err := json.Marshal(labelRule) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setRegionLabelRuleName). + WithURI(RegionLabelRule). + WithMethod(http.MethodPost). + WithBody(labelRuleJSON)) +} + +// PatchRegionLabelRules patches the region label rules. +func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error { + labelRulePatchJSON, err := json.Marshal(labelRulePatch) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(patchRegionLabelRulesName). + WithURI(RegionLabelRules). + WithMethod(http.MethodPatch). + WithBody(labelRulePatchJSON)) +} + +// GetSchedulers gets the schedulers from PD cluster. +func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { + var schedulers []string + err := c.request(ctx, newRequestInfo(). + WithName(getSchedulersName). + WithURI(Schedulers). + WithMethod(http.MethodGet). + WithResp(&schedulers)) + if err != nil { + return nil, err + } + return schedulers, nil +} + +// CreateScheduler creates a scheduler to PD cluster. +func (c *client) CreateScheduler(ctx context.Context, name string, storeID uint64) error { + inputJSON, err := json.Marshal(map[string]any{ + "name": name, + "store_id": storeID, + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(createSchedulerName). + WithURI(Schedulers). + WithMethod(http.MethodPost). + WithBody(inputJSON)) +} + +// DeleteScheduler deletes a scheduler from PD cluster. +func (c *client) DeleteScheduler(ctx context.Context, name string) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteSchedulerName). + WithURI(SchedulerByName(name)). + WithMethod(http.MethodDelete)) +} + +// AccelerateSchedule accelerates the scheduling of the regions within the given key range. +// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes). +func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) error { + startKey, endKey := keyRange.EscapeAsHexStr() + inputJSON, err := json.Marshal(map[string]string{ + "start_key": startKey, + "end_key": endKey, + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(accelerateScheduleName). + WithURI(AccelerateSchedule). + WithMethod(http.MethodPost). + WithBody(inputJSON)) +} + +// AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch. +// The keys in the key ranges should be encoded in the hex bytes format (without encoding to the UTF-8 bytes). +func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*KeyRange) error { + input := make([]map[string]string, 0, len(keyRanges)) + for _, keyRange := range keyRanges { + startKey, endKey := keyRange.EscapeAsHexStr() + input = append(input, map[string]string{ + "start_key": startKey, + "end_key": endKey, + }) + } + inputJSON, err := json.Marshal(input) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(accelerateScheduleInBatchName). + WithURI(AccelerateScheduleInBatch). + WithMethod(http.MethodPost). + WithBody(inputJSON)) +} + +// ResetTS resets the PD's TS. +func (c *client) ResetTS(ctx context.Context, ts uint64, forceUseLarger bool) error { + reqData, err := json.Marshal(struct { + Tso string `json:"tso"` + ForceUseLarger bool `json:"force-use-larger"` + }{ + Tso: strconv.FormatUint(ts, 10), + ForceUseLarger: forceUseLarger, + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(resetTSName). + WithURI(ResetTS). + WithMethod(http.MethodPost). + WithBody(reqData)) +} + +// ResetBaseAllocID resets the PD's base alloc ID. +func (c *client) ResetBaseAllocID(ctx context.Context, id uint64) error { + reqData, err := json.Marshal(struct { + ID string `json:"id"` + }{ + ID: strconv.FormatUint(id, 10), + }) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(resetBaseAllocIDName). + WithURI(BaseAllocID). + WithMethod(http.MethodPost). + WithBody(reqData)) +} + +// SetSnapshotRecoveringMark sets the snapshot recovering mark. +func (c *client) SetSnapshotRecoveringMark(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(setSnapshotRecoveringMarkName). + WithURI(SnapshotRecoveringMark). + WithMethod(http.MethodPost)) +} + +// DeleteSnapshotRecoveringMark deletes the snapshot recovering mark. +func (c *client) DeleteSnapshotRecoveringMark(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteSnapshotRecoveringMarkName). + WithURI(SnapshotRecoveringMark). + WithMethod(http.MethodDelete)) +} + +// SetSchedulerDelay sets the delay of given scheduler. +func (c *client) SetSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { + m := map[string]int64{ + "delay": delaySec, + } + inputJSON, err := json.Marshal(m) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(setSchedulerDelayName). + WithURI(SchedulerByName(scheduler)). + WithMethod(http.MethodPost). + WithBody(inputJSON)) +} + +// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. +// - When storeIDs has zero length, it will return (cluster-level's min_resolved_ts, nil, nil) when no error. +// - When storeIDs is {"cluster"}, it will return (cluster-level's min_resolved_ts, stores_min_resolved_ts, nil) when no error. +// - When storeID is specified to ID lists, it will return (min_resolved_ts of given stores, stores_min_resolved_ts, nil) when no error. +func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { + uri := MinResolvedTSPrefix + // scope is an optional parameter, it can be `cluster` or specified store IDs. + // - When no scope is given, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be nil. + // - When scope is `cluster`, cluster-level's min_resolved_ts will be returned and storesMinResolvedTS will be filled. + // - When scope given a list of stores, min_resolved_ts will be provided for each store + // and the scope-specific min_resolved_ts will be returned. + if len(storeIDs) != 0 { + storeIDStrs := make([]string, len(storeIDs)) + for idx, id := range storeIDs { + storeIDStrs[idx] = fmt.Sprintf("%d", id) + } + uri = fmt.Sprintf("%s?scope=%s", uri, strings.Join(storeIDStrs, ",")) + } + resp := struct { + MinResolvedTS uint64 `json:"min_resolved_ts"` + IsRealTime bool `json:"is_real_time,omitempty"` + StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"` + }{} + err := c.request(ctx, newRequestInfo(). + WithName(getMinResolvedTSByStoresIDsName). + WithURI(uri). + WithMethod(http.MethodGet). + WithResp(&resp)) + if err != nil { + return 0, nil, err + } + if !resp.IsRealTime { + return 0, nil, errors.Trace(errors.New("min resolved ts is not enabled")) + } + return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil +} + +// GetMicroServiceMembers gets the members of the microservice. +func (c *client) GetMicroServiceMembers(ctx context.Context, service string) ([]MicroServiceMember, error) { + var members []MicroServiceMember + err := c.request(ctx, newRequestInfo(). + WithName(getMicroServiceMembersName). + WithURI(MicroServiceMembers(service)). + WithMethod(http.MethodGet). + WithResp(&members)) + if err != nil { + return nil, err + } + return members, nil +} + +// GetMicroServicePrimary gets the primary of the microservice. +func (c *client) GetMicroServicePrimary(ctx context.Context, service string) (string, error) { + var primary string + err := c.request(ctx, newRequestInfo(). + WithName(getMicroServicePrimaryName). + WithURI(MicroServicePrimary(service)). + WithMethod(http.MethodGet). + WithResp(&primary)) + return primary, err +} + +// GetPDVersion gets the release version of the PD binary. +func (c *client) GetPDVersion(ctx context.Context) (string, error) { + var ver struct { + Version string `json:"version"` + } + err := c.request(ctx, newRequestInfo(). + WithName(getPDVersionName). + WithURI(Version). + WithMethod(http.MethodGet). + WithResp(&ver)) + return ver.Version, err +} + +// DeleteOperators deletes the running operators. +func (c *client) DeleteOperators(ctx context.Context) error { + return c.request(ctx, newRequestInfo(). + WithName(deleteOperators). + WithURI(operators). + WithMethod(http.MethodDelete)) +} + +// UpdateKeyspaceGCManagementType patches the keyspace config. +func (c *client) UpdateKeyspaceGCManagementType(ctx context.Context, keyspaceName string, keyspaceGCmanagementType *KeyspaceGCManagementTypeConfig) error { + keyspaceConfigPatchJSON, err := json.Marshal(keyspaceGCmanagementType) + if err != nil { + return errors.Trace(err) + } + return c.request(ctx, newRequestInfo(). + WithName(UpdateKeyspaceGCManagementTypeName). + WithURI(GetUpdateKeyspaceConfigURL(keyspaceName)). + WithMethod(http.MethodPatch). + WithBody(keyspaceConfigPatchJSON)) +} + +// GetKeyspaceMetaByName get the given keyspace meta. +func (c *client) GetKeyspaceMetaByName(ctx context.Context, keyspaceName string) (*keyspacepb.KeyspaceMeta, error) { + var ( + tempKeyspaceMeta tempKeyspaceMeta + keyspaceMetaPB keyspacepb.KeyspaceMeta + ) + err := c.request(ctx, newRequestInfo(). + WithName(GetKeyspaceMetaByNameName). + WithURI(GetKeyspaceMetaByNameURL(keyspaceName)). + WithMethod(http.MethodGet). + WithResp(&tempKeyspaceMeta)) + + if err != nil { + return nil, err + } + + keyspaceState, err := stringToKeyspaceState(tempKeyspaceMeta.State) + if err != nil { + return nil, err + } + + keyspaceMetaPB = keyspacepb.KeyspaceMeta{ + Name: tempKeyspaceMeta.Name, + Id: tempKeyspaceMeta.ID, + Config: tempKeyspaceMeta.Config, + CreatedAt: tempKeyspaceMeta.CreatedAt, + StateChangedAt: tempKeyspaceMeta.StateChangedAt, + State: keyspaceState, + } + return &keyspaceMetaPB, nil +} diff --git a/client/http/request_info.go b/client/http/request_info.go new file mode 100644 index 00000000000..783220bcc60 --- /dev/null +++ b/client/http/request_info.go @@ -0,0 +1,173 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package http + +import ( + "fmt" + + "github.com/tikv/pd/client/retry" + "go.uber.org/zap" +) + +// The following constants are the names of the requests. +const ( + getMembersName = "GetMembers" + getLeaderName = "GetLeader" + transferLeaderName = "TransferLeader" + getRegionByIDName = "GetRegionByID" + getRegionByKeyName = "GetRegionByKey" + getRegionsName = "GetRegions" + getRegionsByKeyRangeName = "GetRegionsByKeyRange" + getRegionsByStoreIDName = "GetRegionsByStoreID" + getEmptyRegionsName = "GetEmptyRegions" + getRegionsReplicatedStateByKeyRangeName = "GetRegionsReplicatedStateByKeyRange" + getHotReadRegionsName = "GetHotReadRegions" + getHotWriteRegionsName = "GetHotWriteRegions" + getHistoryHotRegionsName = "GetHistoryHotRegions" + getRegionStatusByKeyRangeName = "GetRegionStatusByKeyRange" + getStoresName = "GetStores" + getStoreName = "GetStore" + deleteStoreName = "DeleteStore" + setStoreLabelsName = "SetStoreLabels" + deleteStoreLabelName = "DeleteStoreLabel" + getHealthStatusName = "GetHealthStatus" + getConfigName = "GetConfig" + setConfigName = "SetConfig" + getScheduleConfigName = "GetScheduleConfig" + setScheduleConfigName = "SetScheduleConfig" + getClusterVersionName = "GetClusterVersion" + getClusterName = "GetCluster" + getClusterStatusName = "GetClusterStatus" + getStatusName = "GetStatus" + getReplicateConfigName = "GetReplicateConfig" + getSchedulersName = "GetSchedulers" + createSchedulerName = "CreateScheduler" + deleteSchedulerName = "DeleteScheduler" + setSchedulerDelayName = "SetSchedulerDelay" + getAllPlacementRuleBundlesName = "GetAllPlacementRuleBundles" + getPlacementRuleBundleByGroupName = "GetPlacementRuleBundleByGroup" + getPlacementRulesByGroupName = "GetPlacementRulesByGroup" + getPlacementRuleName = "GetPlacementRule" + setPlacementRuleName = "SetPlacementRule" + setPlacementRuleInBatchName = "SetPlacementRuleInBatch" + setPlacementRuleBundlesName = "SetPlacementRuleBundles" + deletePlacementRuleName = "DeletePlacementRule" + getAllPlacementRuleGroupsName = "GetAllPlacementRuleGroups" + getPlacementRuleGroupByIDName = "GetPlacementRuleGroupByID" + setPlacementRuleGroupName = "SetPlacementRuleGroup" + deletePlacementRuleGroupByIDName = "DeletePlacementRuleGroupByID" + getAllRegionLabelRulesName = "GetAllRegionLabelRules" + getRegionLabelRulesByIDsName = "GetRegionLabelRulesByIDs" + setRegionLabelRuleName = "SetRegionLabelRule" + patchRegionLabelRulesName = "PatchRegionLabelRules" + accelerateScheduleName = "AccelerateSchedule" + accelerateScheduleInBatchName = "AccelerateScheduleInBatch" + getMinResolvedTSByStoresIDsName = "GetMinResolvedTSByStoresIDs" + getMicroServiceMembersName = "GetMicroServiceMembers" + getMicroServicePrimaryName = "GetMicroServicePrimary" + getPDVersionName = "GetPDVersion" + resetTSName = "ResetTS" + resetBaseAllocIDName = "ResetBaseAllocID" + setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark" + deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark" + deleteOperators = "DeleteOperators" + UpdateKeyspaceGCManagementTypeName = "UpdateKeyspaceGCManagementType" + GetKeyspaceMetaByNameName = "GetKeyspaceMetaByName" +) + +type requestInfo struct { + callerID string + name string + uri string + method string + body []byte + res any + respHandler respHandleFunc + bo *retry.Backoffer + targetURL string +} + +// newRequestInfo creates a new request info. +func newRequestInfo() *requestInfo { + return &requestInfo{} +} + +// WithCallerID sets the caller ID of the request. +func (ri *requestInfo) WithCallerID(callerID string) *requestInfo { + ri.callerID = callerID + return ri +} + +// WithName sets the name of the request. +func (ri *requestInfo) WithName(name string) *requestInfo { + ri.name = name + return ri +} + +// WithURI sets the URI of the request. +func (ri *requestInfo) WithURI(uri string) *requestInfo { + ri.uri = uri + return ri +} + +// WithMethod sets the method of the request. +func (ri *requestInfo) WithMethod(method string) *requestInfo { + ri.method = method + return ri +} + +// WithBody sets the body of the request. +func (ri *requestInfo) WithBody(body []byte) *requestInfo { + ri.body = body + return ri +} + +// WithResp sets the response struct of the request. +func (ri *requestInfo) WithResp(res any) *requestInfo { + ri.res = res + return ri +} + +// WithRespHandler sets the response handle function of the request. +func (ri *requestInfo) WithRespHandler(respHandler respHandleFunc) *requestInfo { + ri.respHandler = respHandler + return ri +} + +// WithBackoffer sets the backoffer of the request. +func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo { + ri.bo = bo + return ri +} + +// WithTargetURL sets the target URL of the request. +func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo { + ri.targetURL = targetURL + return ri +} + +func (ri *requestInfo) getURL(addr string) string { + return fmt.Sprintf("%s%s", addr, ri.uri) +} + +func (ri *requestInfo) logFields() []zap.Field { + return []zap.Field{ + zap.String("caller-id", ri.callerID), + zap.String("name", ri.name), + zap.String("uri", ri.uri), + zap.String("method", ri.method), + zap.String("target-url", ri.targetURL), + } +} diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 28bb5c96c03..8b78bb45b7d 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -448,6 +448,11 @@ func (s *ScheduleController) Stop() { // Schedule tries to create some operators. func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { +<<<<<<< HEAD +======= + _, isEvictLeaderScheduler := s.Scheduler.(*evictLeaderScheduler) +retry: +>>>>>>> 26e90e9ff (scheduler: skip evict-leader-scheduler when setting schedule deny label (#8303)) for i := 0; i < maxScheduleRetries; i++ { // no need to retry if schedule should stop to speed exit select { @@ -483,7 +488,17 @@ func (s *ScheduleController) Schedule(diagnosable bool) []*operator.Operator { if foundDisabled { continue } +<<<<<<< HEAD return ops +======= + + // If the evict-leader-scheduler is disabled, it will obstruct the restart operation of tikv by the operator. + // Refer: https://docs.pingcap.com/tidb-in-kubernetes/stable/restart-a-tidb-cluster#perform-a-graceful-restart-to-a-single-tikv-pod + if labelMgr.ScheduleDisabled(region) && !isEvictLeaderScheduler { + denySchedulersByLabelerCounter.Inc() + continue retry + } +>>>>>>> 26e90e9ff (scheduler: skip evict-leader-scheduler when setting schedule deny label (#8303)) } } s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c11cead61f7..e423c54617b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1372,6 +1372,9 @@ func (c *RaftCluster) DeleteStoreLabel(storeID uint64, labelKey string) error { if store == nil { return errs.ErrInvalidStoreID.FastGenByArgs(storeID) } + if len(store.GetLabels()) == 0 { + return errors.Errorf("the label key %s does not exist", labelKey) + } newStore := typeutil.DeepClone(store.GetMeta(), core.StoreFactory) labels := make([]*metapb.StoreLabel, 0, len(newStore.GetLabels())-1) for _, label := range newStore.GetLabels() { diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go new file mode 100644 index 00000000000..1d7d4488692 --- /dev/null +++ b/tests/integrations/client/http_client_test.go @@ -0,0 +1,831 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client_test + +import ( + "context" + "math" + "net/http" + "net/url" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + pd "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/retry" + "github.com/tikv/pd/pkg/core" + sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/versioninfo" + "github.com/tikv/pd/tests" +) + +type httpClientTestSuite struct { + suite.Suite + // 1. Using `NewClient` will create a `DefaultPDServiceDiscovery` internal. + // 2. Using `NewClientWithServiceDiscovery` will need a `PDServiceDiscovery` to be passed in. + withServiceDiscovery bool + ctx context.Context + cancelFunc context.CancelFunc + cluster *tests.TestCluster + endpoints []string + client pd.Client +} + +func TestHTTPClientTestSuite(t *testing.T) { + suite.Run(t, &httpClientTestSuite{ + withServiceDiscovery: false, + }) +} + +func TestHTTPClientTestSuiteWithServiceDiscovery(t *testing.T) { + suite.Run(t, &httpClientTestSuite{ + withServiceDiscovery: true, + }) +} + +func (suite *httpClientTestSuite) SetupSuite() { + re := suite.Require() + suite.ctx, suite.cancelFunc = context.WithCancel(context.Background()) + + cluster, err := tests.NewTestCluster(suite.ctx, 2) + re.NoError(err) + + err = cluster.RunInitialServers() + re.NoError(err) + leader := cluster.WaitLeader() + re.NotEmpty(leader) + leaderServer := cluster.GetLeaderServer() + + err = leaderServer.BootstrapCluster() + // Add 2 more stores to the cluster. + for i := 2; i <= 4; i++ { + tests.MustPutStore(re, cluster, &metapb.Store{ + Id: uint64(i), + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }) + } + re.NoError(err) + for _, region := range []*core.RegionInfo{ + core.NewTestRegionInfo(10, 1, []byte("a1"), []byte("a2")), + core.NewTestRegionInfo(11, 1, []byte("a2"), []byte("a3")), + } { + err := leaderServer.GetRaftCluster().HandleRegionHeartbeat(region) + re.NoError(err) + } + var ( + testServers = cluster.GetServers() + endpoints = make([]string, 0, len(testServers)) + ) + for _, s := range testServers { + addr := s.GetConfig().AdvertiseClientUrls + url, err := url.Parse(addr) + re.NoError(err) + endpoints = append(endpoints, url.Host) + } + suite.endpoints = endpoints + suite.cluster = cluster + + if suite.withServiceDiscovery { + // Run test with specific service discovery. + cli := setupCli(suite.ctx, re, suite.endpoints) + sd := cli.GetServiceDiscovery() + suite.client = pd.NewClientWithServiceDiscovery("pd-http-client-it-grpc", sd) + } else { + // Run test with default service discovery. + suite.client = pd.NewClient("pd-http-client-it-http", suite.endpoints) + } +} + +func (suite *httpClientTestSuite) TearDownSuite() { + suite.cancelFunc() + suite.client.Close() + suite.cluster.Destroy() +} + +func (suite *httpClientTestSuite) TestMeta() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + replicateConfig, err := client.GetReplicateConfig(ctx) + re.NoError(err) + re.Equal(3.0, replicateConfig["max-replicas"]) + region, err := client.GetRegionByID(ctx, 10) + re.NoError(err) + re.Equal(int64(10), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a1")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.EndKey) + region, err = client.GetRegionByKey(ctx, []byte("a2")) + re.NoError(err) + re.Equal(int64(11), region.ID) + re.Equal(core.HexRegionKeyStr([]byte("a2")), region.StartKey) + re.Equal(core.HexRegionKeyStr([]byte("a3")), region.EndKey) + regions, err := client.GetRegions(ctx) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), -1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = client.GetRegionsByStoreID(ctx, 1) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + regions, err = client.GetEmptyRegions(ctx) + re.NoError(err) + re.Equal(int64(2), regions.Count) + re.Len(regions.Regions, 2) + state, err := client.GetRegionsReplicatedStateByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3"))) + re.NoError(err) + re.Equal("INPROGRESS", state) + regionStats, err := client.GetRegionStatusByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), false) + re.NoError(err) + re.Positive(regionStats.Count) + re.NotEmpty(regionStats.StoreLeaderCount) + regionStats, err = client.GetRegionStatusByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), true) + re.NoError(err) + re.Positive(regionStats.Count) + re.Empty(regionStats.StoreLeaderCount) + hotReadRegions, err := client.GetHotReadRegions(ctx) + re.NoError(err) + re.Len(hotReadRegions.AsPeer, 4) + re.Len(hotReadRegions.AsLeader, 4) + hotWriteRegions, err := client.GetHotWriteRegions(ctx) + re.NoError(err) + re.Len(hotWriteRegions.AsPeer, 4) + re.Len(hotWriteRegions.AsLeader, 4) + historyHorRegions, err := client.GetHistoryHotRegions(ctx, &pd.HistoryHotRegionsRequest{ + StartTime: 0, + EndTime: time.Now().AddDate(0, 0, 1).UnixNano() / int64(time.Millisecond), + }) + re.NoError(err) + re.Empty(historyHorRegions.HistoryHotRegion) + stores, err := client.GetStores(ctx) + re.NoError(err) + re.Equal(4, stores.Count) + re.Len(stores.Stores, 4) + storeID := uint64(stores.Stores[0].Store.ID) // TODO: why type is different? + store2, err := client.GetStore(ctx, storeID) + re.NoError(err) + re.EqualValues(storeID, store2.Store.ID) + version, err := client.GetClusterVersion(ctx) + re.NoError(err) + re.Equal("1.0.0", version) + rgs, _ := client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a"), []byte("a1")), 100) + re.Equal(int64(0), rgs.Count) + rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a1"), []byte("a3")), 100) + re.Equal(int64(2), rgs.Count) + rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte("a2"), []byte("b")), 100) + re.Equal(int64(1), rgs.Count) + rgs, _ = client.GetRegionsByKeyRange(ctx, pd.NewKeyRange([]byte(""), []byte("")), 100) + re.Equal(int64(2), rgs.Count) + // store 2 origin status:offline + err = client.DeleteStore(ctx, 2) + re.NoError(err) + store2, err = client.GetStore(ctx, 2) + re.NoError(err) + re.Equal(int64(metapb.StoreState_Offline), store2.Store.State) +} + +func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + testMinResolvedTS := tsoutil.TimeToTS(time.Now()) + raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() + err := raftCluster.SetMinResolvedTS(1, testMinResolvedTS) + re.NoError(err) + // Make sure the min resolved TS is updated. + testutil.Eventually(re, func() bool { + minResolvedTS, _ := raftCluster.CheckAndUpdateMinResolvedTS() + return minResolvedTS == testMinResolvedTS + }) + // Wait for the cluster-level min resolved TS to be initialized. + minResolvedTS, storeMinResolvedTSMap, err := client.GetMinResolvedTSByStoresIDs(ctx, nil) + re.NoError(err) + re.Equal(testMinResolvedTS, minResolvedTS) + re.Empty(storeMinResolvedTSMap) + // Get the store-level min resolved TS. + minResolvedTS, storeMinResolvedTSMap, err = client.GetMinResolvedTSByStoresIDs(ctx, []uint64{1}) + re.NoError(err) + re.Equal(testMinResolvedTS, minResolvedTS) + re.Len(storeMinResolvedTSMap, 1) + re.Equal(minResolvedTS, storeMinResolvedTSMap[1]) + // Get the store-level min resolved TS with an invalid store ID. + minResolvedTS, storeMinResolvedTSMap, err = client.GetMinResolvedTSByStoresIDs(ctx, []uint64{1, 2}) + re.NoError(err) + re.Equal(testMinResolvedTS, minResolvedTS) + re.Len(storeMinResolvedTSMap, 2) + re.Equal(minResolvedTS, storeMinResolvedTSMap[1]) + re.Equal(uint64(math.MaxUint64), storeMinResolvedTSMap[2]) +} + +func (suite *httpClientTestSuite) TestRule() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + bundles, err := client.GetAllPlacementRuleBundles(ctx) + re.NoError(err) + re.Len(bundles, 1) + re.Equal(placement.DefaultGroupID, bundles[0].ID) + bundle, err := client.GetPlacementRuleBundleByGroup(ctx, placement.DefaultGroupID) + re.NoError(err) + re.Equal(bundles[0], bundle) + // Check if we have the default rule. + suite.checkRuleResult(ctx, re, &pd.Rule{ + GroupID: placement.DefaultGroupID, + ID: placement.DefaultRuleID, + Role: pd.Voter, + Count: 3, + StartKey: []byte{}, + EndKey: []byte{}, + }, 1, true) + // Should be the same as the rules in the bundle. + suite.checkRuleResult(ctx, re, bundle.Rules[0], 1, true) + testRule := &pd.Rule{ + GroupID: placement.DefaultGroupID, + ID: "test", + Role: pd.Voter, + Count: 3, + StartKey: []byte{}, + EndKey: []byte{}, + } + err = client.SetPlacementRule(ctx, testRule) + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 2, true) + err = client.DeletePlacementRule(ctx, placement.DefaultGroupID, "test") + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 1, false) + testRuleOp := &pd.RuleOp{ + Rule: testRule, + Action: pd.RuleOpAdd, + } + err = client.SetPlacementRuleInBatch(ctx, []*pd.RuleOp{testRuleOp}) + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 2, true) + testRuleOp = &pd.RuleOp{ + Rule: testRule, + Action: pd.RuleOpDel, + } + err = client.SetPlacementRuleInBatch(ctx, []*pd.RuleOp{testRuleOp}) + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 1, false) + err = client.SetPlacementRuleBundles(ctx, []*pd.GroupBundle{ + { + ID: placement.DefaultGroupID, + Rules: []*pd.Rule{testRule}, + }, + }, true) + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 1, true) + ruleGroups, err := client.GetAllPlacementRuleGroups(ctx) + re.NoError(err) + re.Len(ruleGroups, 1) + re.Equal(placement.DefaultGroupID, ruleGroups[0].ID) + ruleGroup, err := client.GetPlacementRuleGroupByID(ctx, placement.DefaultGroupID) + re.NoError(err) + re.Equal(ruleGroups[0], ruleGroup) + testRuleGroup := &pd.RuleGroup{ + ID: "test-group", + Index: 1, + Override: true, + } + err = client.SetPlacementRuleGroup(ctx, testRuleGroup) + re.NoError(err) + ruleGroup, err = client.GetPlacementRuleGroupByID(ctx, testRuleGroup.ID) + re.NoError(err) + re.Equal(testRuleGroup, ruleGroup) + err = client.DeletePlacementRuleGroupByID(ctx, testRuleGroup.ID) + re.NoError(err) + ruleGroup, err = client.GetPlacementRuleGroupByID(ctx, testRuleGroup.ID) + re.ErrorContains(err, http.StatusText(http.StatusNotFound)) + re.Empty(ruleGroup) + // Test the start key and end key. + testRule = &pd.Rule{ + GroupID: placement.DefaultGroupID, + ID: "test", + Role: pd.Voter, + Count: 5, + StartKey: []byte("a1"), + EndKey: []byte(""), + } + err = client.SetPlacementRule(ctx, testRule) + re.NoError(err) + suite.checkRuleResult(ctx, re, testRule, 1, true) +} + +func (suite *httpClientTestSuite) checkRuleResult( + ctx context.Context, re *require.Assertions, + rule *pd.Rule, totalRuleCount int, exist bool, +) { + client := suite.client + if exist { + got, err := client.GetPlacementRule(ctx, rule.GroupID, rule.ID) + re.NoError(err) + // skip comparison of the generated field + got.StartKeyHex = rule.StartKeyHex + got.EndKeyHex = rule.EndKeyHex + re.Equal(rule, got) + } else { + _, err := client.GetPlacementRule(ctx, rule.GroupID, rule.ID) + re.ErrorContains(err, http.StatusText(http.StatusNotFound)) + } + // Check through the `GetPlacementRulesByGroup` API. + rules, err := client.GetPlacementRulesByGroup(ctx, rule.GroupID) + re.NoError(err) + checkRuleFunc(re, rules, rule, totalRuleCount, exist) + // Check through the `GetPlacementRuleBundleByGroup` API. + bundle, err := client.GetPlacementRuleBundleByGroup(ctx, rule.GroupID) + re.NoError(err) + checkRuleFunc(re, bundle.Rules, rule, totalRuleCount, exist) +} + +func checkRuleFunc( + re *require.Assertions, + rules []*pd.Rule, rule *pd.Rule, totalRuleCount int, exist bool, +) { + re.Len(rules, totalRuleCount) + for _, r := range rules { + if r.ID != rule.ID { + continue + } + re.Equal(rule.GroupID, r.GroupID) + re.Equal(rule.ID, r.ID) + re.Equal(rule.Role, r.Role) + re.Equal(rule.Count, r.Count) + re.Equal(rule.StartKey, r.StartKey) + re.Equal(rule.EndKey, r.EndKey) + return + } + if exist { + re.Failf("Failed to check the rule", "rule %+v not found", rule) + } +} + +func (suite *httpClientTestSuite) TestRegionLabel() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + labelRules, err := client.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 1) + re.Equal("keyspaces/0", labelRules[0].ID) + // Set a new region label rule. + labelRule := &pd.LabelRule{ + ID: "rule1", + Labels: []pd.RegionLabel{{Key: "k1", Value: "v1"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges("1234", "5678"), + } + err = client.SetRegionLabelRule(ctx, labelRule) + re.NoError(err) + labelRules, err = client.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 2) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) + // Patch the region label rule. + labelRule = &pd.LabelRule{ + ID: "rule2", + Labels: []pd.RegionLabel{{Key: "k2", Value: "v2"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges("ab12", "cd12"), + } + patch := &pd.LabelRulePatch{ + SetRules: []*pd.LabelRule{labelRule}, + DeleteRules: []string{"rule1"}, + } + err = client.PatchRegionLabelRules(ctx, patch) + re.NoError(err) + allLabelRules, err := client.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 2) + sort.Slice(allLabelRules, func(i, j int) bool { + return allLabelRules[i].ID < allLabelRules[j].ID + }) + re.Equal(labelRule.ID, allLabelRules[1].ID) + re.Equal(labelRule.Labels, allLabelRules[1].Labels) + re.Equal(labelRule.RuleType, allLabelRules[1].RuleType) + labelRules, err = client.GetRegionLabelRulesByIDs(ctx, []string{"keyspaces/0", "rule2"}) + re.NoError(err) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Equal(allLabelRules, labelRules) +} + +func (suite *httpClientTestSuite) TestAccelerateSchedule() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + raftCluster := suite.cluster.GetLeaderServer().GetRaftCluster() + suspectRegions := raftCluster.GetSuspectRegions() + re.Empty(suspectRegions) + err := client.AccelerateSchedule(ctx, pd.NewKeyRange([]byte("a1"), []byte("a2"))) + re.NoError(err) + suspectRegions = raftCluster.GetSuspectRegions() + re.Len(suspectRegions, 1) + raftCluster.ClearSuspectRegions() + suspectRegions = raftCluster.GetSuspectRegions() + re.Empty(suspectRegions) + err = client.AccelerateScheduleInBatch(ctx, []*pd.KeyRange{ + pd.NewKeyRange([]byte("a1"), []byte("a2")), + pd.NewKeyRange([]byte("a2"), []byte("a3")), + }) + re.NoError(err) + suspectRegions = raftCluster.GetSuspectRegions() + re.Len(suspectRegions, 2) +} + +func (suite *httpClientTestSuite) TestConfig() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + config, err := client.GetConfig(ctx) + re.NoError(err) + re.Equal(float64(4), config["schedule"].(map[string]any)["leader-schedule-limit"]) + + newConfig := map[string]any{ + "schedule.leader-schedule-limit": float64(8), + } + err = client.SetConfig(ctx, newConfig) + re.NoError(err) + + config, err = client.GetConfig(ctx) + re.NoError(err) + re.Equal(float64(8), config["schedule"].(map[string]any)["leader-schedule-limit"]) + + // Test the config with TTL. + newConfig = map[string]any{ + "schedule.leader-schedule-limit": float64(16), + } + err = client.SetConfig(ctx, newConfig, 5) + re.NoError(err) + resp, err := suite.cluster.GetEtcdClient().Get(ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") + re.NoError(err) + re.Equal([]byte("16"), resp.Kvs[0].Value) + // delete the config with TTL. + err = client.SetConfig(ctx, newConfig, 0) + re.NoError(err) + resp, err = suite.cluster.GetEtcdClient().Get(ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") + re.NoError(err) + re.Empty(resp.Kvs) + + // Test the config with TTL for storing float64 as uint64. + newConfig = map[string]any{ + "schedule.max-pending-peer-count": uint64(math.MaxInt32), + } + err = client.SetConfig(ctx, newConfig, 4) + re.NoError(err) + c := suite.cluster.GetLeaderServer().GetRaftCluster().GetOpts().GetMaxPendingPeerCount() + re.Equal(uint64(math.MaxInt32), c) + + err = client.SetConfig(ctx, newConfig, 0) + re.NoError(err) + resp, err = suite.cluster.GetEtcdClient().Get(ctx, sc.TTLConfigPrefix+"/schedule.max-pending-peer-count") + re.NoError(err) + re.Empty(resp.Kvs) +} + +func (suite *httpClientTestSuite) TestScheduleConfig() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + config, err := client.GetScheduleConfig(ctx) + re.NoError(err) + re.Equal(float64(4), config["hot-region-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) + config["hot-region-schedule-limit"] = float64(8) + err = client.SetScheduleConfig(ctx, config) + re.NoError(err) + config, err = client.GetScheduleConfig(ctx) + re.NoError(err) + re.Equal(float64(8), config["hot-region-schedule-limit"]) + re.Equal(float64(2048), config["region-schedule-limit"]) +} + +func (suite *httpClientTestSuite) TestSchedulers() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + schedulers, err := client.GetSchedulers(ctx) + re.NoError(err) + const schedulerName = "evict-leader-scheduler" + re.NotContains(schedulers, schedulerName) + + err = client.CreateScheduler(ctx, schedulerName, 1) + re.NoError(err) + schedulers, err = client.GetSchedulers(ctx) + re.NoError(err) + re.Contains(schedulers, schedulerName) + err = client.SetSchedulerDelay(ctx, schedulerName, 100) + re.NoError(err) + err = client.SetSchedulerDelay(ctx, "not-exist", 100) + re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message + + re.NoError(client.DeleteScheduler(ctx, schedulerName)) + schedulers, err = client.GetSchedulers(ctx) + re.NoError(err) + re.NotContains(schedulers, schedulerName) +} + +func (suite *httpClientTestSuite) TestStoreLabels() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + resp, err := client.GetStores(ctx) + re.NoError(err) + re.NotEmpty(resp.Stores) + firstStore := resp.Stores[0] + re.Empty(firstStore.Store.Labels, nil) + storeLabels := map[string]string{ + "zone": "zone1", + } + err = client.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels) + re.NoError(err) + + getResp, err := client.GetStore(ctx, uint64(firstStore.Store.ID)) + re.NoError(err) + + labelsMap := make(map[string]string) + for _, label := range getResp.Store.Labels { + re.NotNil(label) + labelsMap[label.Key] = label.Value + } + + for key, value := range storeLabels { + re.Equal(value, labelsMap[key]) + } + + re.NoError(client.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) + store, err := client.GetStore(ctx, uint64(firstStore.Store.ID)) + re.NoError(err) + re.Empty(store.Store.Labels) +} + +func (suite *httpClientTestSuite) TestTransferLeader() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + members, err := client.GetMembers(ctx) + re.NoError(err) + re.Len(members.Members, 2) + + leader, err := client.GetLeader(ctx) + re.NoError(err) + + // Transfer leader to another pd + for _, member := range members.Members { + if member.GetName() != leader.GetName() { + err = client.TransferLeader(ctx, member.GetName()) + re.NoError(err) + break + } + } + + newLeader := suite.cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NoError(err) + re.NotEqual(leader.GetName(), newLeader) + // Force to update the members info. + testutil.Eventually(re, func() bool { + leader, err = client.GetLeader(ctx) + re.NoError(err) + return newLeader == leader.GetName() + }) + members, err = client.GetMembers(ctx) + re.NoError(err) + re.Len(members.Members, 2) + re.Equal(leader.GetName(), members.Leader.GetName()) +} + +func (suite *httpClientTestSuite) TestVersion() { + re := suite.Require() + ver, err := suite.client.GetPDVersion(suite.ctx) + re.NoError(err) + re.Equal(versioninfo.PDReleaseVersion, ver) +} + +func (suite *httpClientTestSuite) TestStatus() { + re := suite.Require() + status, err := suite.client.GetStatus(suite.ctx) + re.NoError(err) + re.Equal(versioninfo.PDReleaseVersion, status.Version) + re.Equal(versioninfo.PDGitHash, status.GitHash) + re.Equal(versioninfo.PDBuildTS, status.BuildTS) + re.GreaterOrEqual(time.Now().Unix(), status.StartTimestamp) +} + +func (suite *httpClientTestSuite) TestAdmin() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + err := client.SetSnapshotRecoveringMark(ctx) + re.NoError(err) + err = client.ResetTS(ctx, 123, true) + re.NoError(err) + err = client.ResetBaseAllocID(ctx, 456) + re.NoError(err) + err = client.DeleteSnapshotRecoveringMark(ctx) + re.NoError(err) +} + +func (suite *httpClientTestSuite) TestWithBackoffer() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + // Should return with 404 error without backoffer. + rule, err := client.GetPlacementRule(ctx, "non-exist-group", "non-exist-rule") + re.ErrorContains(err, http.StatusText(http.StatusNotFound)) + re.Nil(rule) + // Should return with 404 error even with an infinite backoffer. + rule, err = client. + WithBackoffer(retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)). + GetPlacementRule(ctx, "non-exist-group", "non-exist-rule") + re.ErrorContains(err, http.StatusText(http.StatusNotFound)) + re.Nil(rule) +} + +func (suite *httpClientTestSuite) TestRedirectWithMetrics() { + re := suite.Require() + + cli := setupCli(suite.ctx, re, suite.endpoints) + defer cli.Close() + sd := cli.GetServiceDiscovery() + + metricCnt := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "check", + }, []string{"name", ""}) + // 1. Test all followers failed, need to send all followers. + httpClient := pd.NewHTTPClientWithRequestChecker(func(req *http.Request) error { + if req.URL.Path == pd.Schedulers { + return errors.New("mock error") + } + return nil + }) + c := pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c.CreateScheduler(context.Background(), "test", 0) + var out dto.Metric + failureCnt, err := metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", "network error"}...) + re.NoError(err) + failureCnt.Write(&out) + re.Equal(float64(2), out.Counter.GetValue()) + c.Close() + + leader := sd.GetServingURL() + httpClient = pd.NewHTTPClientWithRequestChecker(func(req *http.Request) error { + // mock leader success. + if !strings.Contains(leader, req.Host) { + return errors.New("mock error") + } + return nil + }) + c = pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c.CreateScheduler(context.Background(), "test", 0) + successCnt, err := metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", ""}...) + re.NoError(err) + successCnt.Write(&out) + re.Equal(float64(1), out.Counter.GetValue()) + c.Close() + + httpClient = pd.NewHTTPClientWithRequestChecker(func(req *http.Request) error { + // mock leader success. + if strings.Contains(leader, req.Host) { + return errors.New("mock error") + } + return nil + }) + c = pd.NewClientWithServiceDiscovery("pd-http-client-it", sd, pd.WithHTTPClient(httpClient), pd.WithMetrics(metricCnt, nil)) + c.CreateScheduler(context.Background(), "test", 0) + successCnt, err = metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", ""}...) + re.NoError(err) + successCnt.Write(&out) + re.Equal(float64(2), out.Counter.GetValue()) + failureCnt, err = metricCnt.GetMetricWithLabelValues([]string{"CreateScheduler", "network error"}...) + re.NoError(err) + failureCnt.Write(&out) + re.Equal(float64(3), out.Counter.GetValue()) + c.Close() +} + +func (suite *httpClientTestSuite) TestUpdateKeyspaceGCManagementType() { + re := suite.Require() + client := suite.client + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + keyspaceName := "DEFAULT" + expectGCManagementType := "keyspace_level_gc" + + keyspaceSafePointVersionConfig := pd.KeyspaceGCManagementTypeConfig{ + Config: pd.KeyspaceGCManagementType{ + GCManagementType: expectGCManagementType, + }, + } + err := client.UpdateKeyspaceGCManagementType(ctx, keyspaceName, &keyspaceSafePointVersionConfig) + re.NoError(err) + + keyspaceMetaRes, err := client.GetKeyspaceMetaByName(ctx, keyspaceName) + re.NoError(err) + val, ok := keyspaceMetaRes.Config["gc_management_type"] + + // Check it can get expect key and value in keyspace meta config. + re.True(ok) + re.Equal(expectGCManagementType, val) +} + +func (suite *httpClientTestSuite) TestGetHealthStatus() { + re := suite.Require() + healths, err := suite.client.GetHealthStatus(suite.ctx) + re.NoError(err) + re.Len(healths, 2) + sort.Slice(healths, func(i, j int) bool { + return healths[i].Name < healths[j].Name + }) + re.Equal("pd1", healths[0].Name) + re.Equal("pd2", healths[1].Name) + re.True(healths[0].Health && healths[1].Health) +} + +func (suite *httpClientTestSuite) TestRetryOnLeaderChange() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + bo := retry.InitialBackoffer(100*time.Millisecond, time.Second, 0) + client := suite.client.WithBackoffer(bo) + for { + healths, err := client.GetHealthStatus(ctx) + if err != nil && strings.Contains(err.Error(), "context canceled") { + return + } + re.NoError(err) + re.Len(healths, 2) + select { + case <-ctx.Done(): + return + default: + } + } + }() + + leader := suite.cluster.GetLeaderServer() + re.NotNil(leader) + for i := 0; i < 3; i++ { + leader.ResignLeader() + re.NotEmpty(suite.cluster.WaitLeader()) + leader = suite.cluster.GetLeaderServer() + re.NotNil(leader) + } + + // Cancel the context to stop the goroutine. + cancel() + wg.Wait() +} diff --git a/tests/integrations/realcluster/deploy.sh b/tests/integrations/realcluster/deploy.sh new file mode 100755 index 00000000000..f6f567314f0 --- /dev/null +++ b/tests/integrations/realcluster/deploy.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# deploy `tiup playground` + +set -x + +TIUP_BIN_DIR=$HOME/.tiup/bin/tiup +CUR_PATH=$(pwd) + +# See https://misc.flogisoft.com/bash/tip_colors_and_formatting. +color-green() { # Green + echo -e "\x1B[1;32m${*}\x1B[0m" +} + +# Install TiUP +color-green "install TiUP..." +curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh +$TIUP_BIN_DIR update playground + +cd ../../.. +if [ ! -d "bin" ] || [ ! -e "bin/tikv-server" ] && [ ! -e "bin/tidb-server" ] && [ ! -e "bin/tiflash" ]; then + color-green "downloading binaries..." + color-green "this may take a few minutes, you can also download them manually and put them in the bin directory." + make pd-server WITH_RACE=1 + $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor --tag pd_real_cluster_test \ + --pd.binpath ./bin/pd-server --pd.config ./tests/integrations/realcluster/pd.toml \ + > $CUR_PATH/playground.log 2>&1 & +else + # CI will download the binaries in the prepare phase. + # ref https://github.com/PingCAP-QE/ci/blob/387e9e533b365174962ccb1959442a7070f9cd66/pipelines/tikv/pd/latest/pull_integration_realcluster_test.groovy#L55-L68 + color-green "using existing binaries..." + $TIUP_BIN_DIR playground nightly --kv 3 --tiflash 1 --db 1 --pd 3 --without-monitor \ + --pd.binpath ./bin/pd-server --kv.binpath ./bin/tikv-server --db.binpath ./bin/tidb-server \ + --tiflash.binpath ./bin/tiflash --tag pd_real_cluster_test --pd.config ./tests/integrations/realcluster/pd.toml \ + > $CUR_PATH/playground.log 2>&1 & +fi + +cd $CUR_PATH diff --git a/tests/integrations/realcluster/pd.toml b/tests/integrations/realcluster/pd.toml new file mode 100644 index 00000000000..876c7f13af2 --- /dev/null +++ b/tests/integrations/realcluster/pd.toml @@ -0,0 +1,5 @@ +[schedule] +patrol-region-interval = "100ms" + +[log] +level = "debug" diff --git a/tests/integrations/realcluster/reboot_pd_test.go b/tests/integrations/realcluster/reboot_pd_test.go new file mode 100644 index 00000000000..14c86f2dedb --- /dev/null +++ b/tests/integrations/realcluster/reboot_pd_test.go @@ -0,0 +1,77 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "context" + "os/exec" + "testing" + + "github.com/pingcap/log" + "github.com/stretchr/testify/require" +) + +func restartTiUP() { + log.Info("start to restart TiUP") + cmd := exec.Command("make", "deploy") + err := cmd.Run() + if err != nil { + panic(err) + } + log.Info("TiUP restart success") +} + +// https://github.com/tikv/pd/issues/6467 +func TestReloadLabel(t *testing.T) { + re := require.New(t) + ctx := context.Background() + + resp, err := pdHTTPCli.GetStores(ctx) + re.NoError(err) + re.NotEmpty(resp.Stores) + firstStore := resp.Stores[0] + // TiFlash labels will be ["engine": "tiflash"] + // So we need to merge the labels + storeLabels := map[string]string{ + "zone": "zone1", + } + for _, label := range firstStore.Store.Labels { + storeLabels[label.Key] = label.Value + } + re.NoError(pdHTTPCli.SetStoreLabels(ctx, firstStore.Store.ID, storeLabels)) + defer func() { + re.NoError(pdHTTPCli.DeleteStoreLabel(ctx, firstStore.Store.ID, "zone")) + }() + + checkLabelsAreEqual := func() { + resp, err := pdHTTPCli.GetStore(ctx, uint64(firstStore.Store.ID)) + re.NoError(err) + + labelsMap := make(map[string]string) + for _, label := range resp.Store.Labels { + re.NotNil(label) + labelsMap[label.Key] = label.Value + } + + for key, value := range storeLabels { + re.Equal(value, labelsMap[key]) + } + } + // Check the label is set + checkLabelsAreEqual() + // Restart TiUP to reload the label + restartTiUP() + checkLabelsAreEqual() +} diff --git a/tests/integrations/realcluster/scheduler_test.go b/tests/integrations/realcluster/scheduler_test.go new file mode 100644 index 00000000000..0ed6f6c6b76 --- /dev/null +++ b/tests/integrations/realcluster/scheduler_test.go @@ -0,0 +1,188 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package realcluster + +import ( + "context" + "fmt" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/testutil" + "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/schedule/schedulers" +) + +// https://github.com/tikv/pd/issues/6988#issuecomment-1694924611 +// https://github.com/tikv/pd/issues/6897 +func TestTransferLeader(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp, err := pdHTTPCli.GetLeader(ctx) + re.NoError(err) + oldLeader := resp.Name + + var newLeader string + for i := 0; i < 2; i++ { + if resp.Name != fmt.Sprintf("pd-%d", i) { + newLeader = fmt.Sprintf("pd-%d", i) + } + } + + // record scheduler + re.NoError(pdHTTPCli.CreateScheduler(ctx, schedulers.EvictLeaderName, 1)) + defer func() { + re.NoError(pdHTTPCli.DeleteScheduler(ctx, schedulers.EvictLeaderName)) + }() + res, err := pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + oldSchedulersLen := len(res) + + re.NoError(pdHTTPCli.TransferLeader(ctx, newLeader)) + // wait for transfer leader to new leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(newLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) + + // transfer leader to old leader + re.NoError(pdHTTPCli.TransferLeader(ctx, oldLeader)) + // wait for transfer leader + time.Sleep(1 * time.Second) + resp, err = pdHTTPCli.GetLeader(ctx) + re.NoError(err) + re.Equal(oldLeader, resp.Name) + + res, err = pdHTTPCli.GetSchedulers(ctx) + re.NoError(err) + re.Len(res, oldSchedulersLen) +} + +func TestRegionLabelDenyScheduler(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + re.GreaterOrEqual(len(regions.Regions), 1) + region1 := regions.Regions[0] + + err = pdHTTPCli.DeleteScheduler(ctx, schedulers.BalanceLeaderName) + if err == nil { + defer func() { + pdHTTPCli.CreateScheduler(ctx, schedulers.BalanceLeaderName, 0) + }() + } + + re.NoError(pdHTTPCli.CreateScheduler(ctx, schedulers.GrantLeaderName, uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, schedulers.GrantLeaderName) + }() + + // wait leader transfer + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + // disable schedule for region1 + labelRule := &pd.LabelRule{ + ID: "rule1", + Labels: []pd.RegionLabel{{Key: "schedule", Value: "deny"}}, + RuleType: "key-range", + Data: labeler.MakeKeyRanges(region1.StartKey, region1.EndKey), + } + re.NoError(pdHTTPCli.SetRegionLabelRule(ctx, labelRule)) + defer func() { + pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + }() + labelRules, err := pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 2) + sort.Slice(labelRules, func(i, j int) bool { + return labelRules[i].ID < labelRules[j].ID + }) + re.Equal(labelRule.ID, labelRules[1].ID) + re.Equal(labelRule.Labels, labelRules[1].Labels) + re.Equal(labelRule.RuleType, labelRules[1].RuleType) + + // enable evict leader scheduler, and check it works + re.NoError(pdHTTPCli.DeleteScheduler(ctx, schedulers.GrantLeaderName)) + re.NoError(pdHTTPCli.CreateScheduler(ctx, schedulers.EvictLeaderName, uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, schedulers.EvictLeaderName) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID == region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + re.NoError(pdHTTPCli.DeleteScheduler(ctx, schedulers.EvictLeaderName)) + re.NoError(pdHTTPCli.CreateScheduler(ctx, schedulers.GrantLeaderName, uint64(region1.Leader.StoreID))) + defer func() { + pdHTTPCli.DeleteScheduler(ctx, schedulers.GrantLeaderName) + }() + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.ID == region1.ID { + continue + } + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) + + pdHTTPCli.PatchRegionLabelRules(ctx, &pd.LabelRulePatch{DeleteRules: []string{labelRule.ID}}) + labelRules, err = pdHTTPCli.GetAllRegionLabelRules(ctx) + re.NoError(err) + re.Len(labelRules, 1) + + testutil.Eventually(re, func() bool { + regions, err := pdHTTPCli.GetRegions(ctx) + re.NoError(err) + for _, region := range regions.Regions { + if region.Leader.StoreID != region1.Leader.StoreID { + return false + } + } + return true + }, testutil.WithWaitFor(time.Minute)) +} diff --git a/tests/integrations/realcluster/wait_tiup.sh b/tests/integrations/realcluster/wait_tiup.sh new file mode 100755 index 00000000000..3a8c02a969e --- /dev/null +++ b/tests/integrations/realcluster/wait_tiup.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Wait until `tiup playground` command runs success + +TIUP_BIN_DIR=$HOME/.tiup/bin/tiup +INTERVAL=$1 +MAX_TIMES=$2 + +if ([ -z "${INTERVAL}" ] || [ -z "${MAX_TIMES}" ]); then + echo "Usage: command " + exit 1 +fi + +for ((i=0; i<${MAX_TIMES}; i++)); do + sleep ${INTERVAL} + $TIUP_BIN_DIR playground display --tag pd_real_cluster_test + if [ $? -eq 0 ]; then + exit 0 + fi + cat ./playground.log +done + +exit 1 \ No newline at end of file diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index e353f686fe7..9c9548e8c30 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -34,8 +34,39 @@ test: failpoint-enable CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } $(MAKE) failpoint-disable +<<<<<<< HEAD:tests/integrations/tso/Makefile ci-test-job: CGO_ENABLED=1 go test ./... -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso +======= +deploy: kill_cluster + @ echo "deploying..." + ./deploy.sh + @ echo "wait cluster ready..." + ./wait_tiup.sh 15 20 + @ echo "check cluster status..." + @ pid=$$(ps -ef | grep 'playground' | grep -v grep | awk '{print $$2}' | head -n 1); \ + echo $$pid; + +kill_cluster: + @ echo "kill cluster..." + @ pid=$$(ps -ef | grep 'playground' | grep -v grep | awk '{print $$2}' | head -n 1); \ + if [ ! -z "$$pid" ]; then \ + echo $$pid; \ + kill $$pid; \ + echo "waiting for cluster to exit..."; \ + sleep 30; \ + fi + +test: + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || (\ + echo "follow is pd-0 log\n" ; \ + cat ~/.tiup/data/pd_real_cluster_test/pd-0/pd.log ; \ + echo "follow is pd-1 log\n" ; \ + cat ~/.tiup/data/pd_real_cluster_test/pd-1/pd.log ; \ + echo "follow is pd-2 log\n" ; \ + cat ~/.tiup/data/pd_real_cluster_test/pd-2/pd.log ; \ + exit 1) +>>>>>>> 26e90e9ff (scheduler: skip evict-leader-scheduler when setting schedule deny label (#8303)):tests/integrations/realcluster/Makefile install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools