Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infosync: integrate PD HTTP client into the placement manager #48858

Merged
merged 2 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7158,13 +7158,13 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "440821579da980d0405695b463da892608a59252a296cd7e52b4f97881c5fdb7",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231121080541-8919bc11f770",
sha256 = "5232ba0bba677a6d4614ae2cc102554d59cd00d473d9138739508d6f25169f02",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231127075044-9f4803d8bd05",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (p *PdController) getRegionCountWith(
}
var err error
for _, addr := range p.getAllPDAddrs() {
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(start, end), p.cli, http.MethodGet, nil)
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end)), p.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g=
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770 h1:YSXDKT9+KngRSAShoSQVKD/CK1kR4X/9hutKkSK9gn0=
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05 h1:87NPUfzaVrO5MTBwVCPQ/FlJGpFnHi6WFYHDYD3n3Zc=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/placement/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_pd_client//http",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
Expand Down Expand Up @@ -45,5 +46,6 @@ go_test(
"//pkg/util/codec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//http",
],
)
86 changes: 39 additions & 47 deletions pkg/ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client/http"
"gopkg.in/yaml.v2"
)

// Refer to https://github.com/tikv/pd/issues/2701 .
// IMO, it is indeed not bad to have a copy of definition.
// After all, placement rules are communicated using an HTTP API. Loose
// coupling is a good feature.

// Bundle is a group of all rules and configurations. It is used to support rule cache.
type Bundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}
// Alias `pd.GroupBundle` is to wrap more methods.
type Bundle pd.GroupBundle

// NewBundle will create a bundle with the provided ID.
// Note that you should never pass negative id.
Expand All @@ -70,15 +62,15 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
explicitFollowerCount := options.Followers
explicitLearnerCount := options.Learners

rules := []*Rule{}
rules := []*pd.Rule{}
commonConstraints, err := NewConstraintsFromYaml([]byte(constraints))
if err != nil {
// If it's not in array format, attempt to parse it as a dictionary for more detailed definitions.
// The dictionary format specifies details for each replica. Constraints are used to define normal
// replicas that should act as voters.
// For example: CONSTRAINTS='{ "+region=us-east-1":2, "+region=us-east-2": 2, "+region=us-west-1": 1}'
normalReplicasRules, err := NewRuleBuilder().
SetRole(Voter).
SetRole(pd.Voter).
SetConstraintStr(constraints).
BuildRulesWithDictConstraintsOnly()
if err != nil {
Expand All @@ -92,7 +84,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
return nil, fmt.Errorf("%w: 'LeaderConstraints' should be [constraint1, ...] or any yaml compatible array representation", err)
}
for _, cnst := range commonConstraints {
if err := leaderConstraints.Add(cnst); err != nil {
if err := AddConstraint(&leaderConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: LeaderConstraints conflicts with Constraints", err)
}
}
Expand All @@ -101,7 +93,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
followerReplicas = explicitFollowerCount
}
if !needCreateDefault {
if len(leaderConstraints) == 0 {
if len(leaderConst) == 0 {
leaderReplicas = 0
}
if len(followerConstraints) == 0 {
Expand All @@ -115,15 +107,15 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
// create leader rule.
// if no constraints, we need create default leader rule.
if leaderReplicas > 0 {
leaderRule := NewRule(Leader, leaderReplicas, leaderConstraints)
leaderRule := NewRule(pd.Leader, leaderReplicas, leaderConstraints)
rules = append(rules, leaderRule)
}

// create follower rules.
// if no constraints, we need create default follower rules.
if followerReplicas > 0 {
builder := NewRuleBuilder().
SetRole(Voter).
SetRole(pd.Voter).
SetReplicasNum(followerReplicas).
SetSkipCheckReplicasConsistent(needCreateDefault && (explicitFollowerCount == 0)).
SetConstraintStr(followerConstraints)
Expand All @@ -133,7 +125,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
}
for _, followerRule := range followerRules {
for _, cnst := range commonConstraints {
if err := followerRule.Constraints.Add(cnst); err != nil {
if err := AddConstraint(&followerRule.LabelConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: FollowerConstraints conflicts with Constraints", err)
}
}
Expand All @@ -143,7 +135,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,

// create learner rules.
builder := NewRuleBuilder().
SetRole(Learner).
SetRole(pd.Learner).
SetReplicasNum(explicitLearnerCount).
SetConstraintStr(learnerConstraints)
learnerRules, err := builder.BuildRules()
Expand All @@ -152,7 +144,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
}
for _, rule := range learnerRules {
for _, cnst := range commonConstraints {
if err := rule.Constraints.Add(cnst); err != nil {
if err := AddConstraint(&rule.LabelConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: LearnerConstraints conflicts with Constraints", err)
}
}
Expand Down Expand Up @@ -194,7 +186,7 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
}
schedule := options.Schedule

var rules []*Rule
var rules []*pd.Rule

locationLabels, err := newLocationLabelsFromSurvivalPreferences(options.SurvivalPreferences)
if err != nil {
Expand All @@ -203,7 +195,7 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error

// in case empty primaryRegion and regions, just return an empty bundle
if primaryRegion == "" && len(regions) == 0 {
rules = append(rules, NewRule(Voter, followers+1, NewConstraintsDirect()))
rules = append(rules, NewRule(pd.Voter, followers+1, NewConstraintsDirect()))
for _, rule := range rules {
rule.LocationLabels = locationLabels
}
Expand All @@ -230,17 +222,17 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
return nil, fmt.Errorf("%w: unsupported schedule %s", ErrInvalidPlacementOptions, schedule)
}

rules = append(rules, NewRule(Leader, 1, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))
rules = append(rules, NewRule(pd.Leader, 1, NewConstraintsDirect(NewConstraintDirect("region", pd.In, primaryRegion))))
if primaryCount > 1 {
rules = append(rules, NewRule(Voter, primaryCount-1, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))
rules = append(rules, NewRule(pd.Voter, primaryCount-1, NewConstraintsDirect(NewConstraintDirect("region", pd.In, primaryRegion))))
}
if cnt := followers + 1 - primaryCount; cnt > 0 {
// delete primary from regions
regions = regions[:primaryIndex+copy(regions[primaryIndex:], regions[primaryIndex+1:])]
if len(regions) > 0 {
rules = append(rules, NewRule(Voter, cnt, NewConstraintsDirect(NewConstraintDirect("region", In, regions...))))
rules = append(rules, NewRule(pd.Voter, cnt, NewConstraintsDirect(NewConstraintDirect("region", pd.In, regions...))))
} else {
rules = append(rules, NewRule(Voter, cnt, NewConstraintsDirect()))
rules = append(rules, NewRule(pd.Voter, cnt, NewConstraintsDirect()))
}
}

Expand Down Expand Up @@ -332,8 +324,8 @@ func (b *Bundle) Tidy() error {
// refer to tidb#22065.
// add -engine=tiflash to every rule to avoid schedules to tiflash instances.
// placement rules in SQL is not compatible with `set tiflash replica` yet
err := rule.Constraints.Add(Constraint{
Op: NotIn,
err := AddConstraint(&rule.LabelConstraints, pd.LabelConstraint{
Op: pd.NotIn,
Key: EngineLabelKey,
Values: []string{EngineLabelTiFlash},
})
Expand All @@ -348,10 +340,10 @@ func (b *Bundle) Tidy() error {
groups := make(map[string]*constraintsGroup)
finalRules := tempRules[:0]
for _, rule := range tempRules {
key := rule.Constraints.FingerPrint()
key := ConstraintsFingerPrint(&rule.LabelConstraints)
existing, ok := groups[key]
if !ok {
groups[key] = &constraintsGroup{rules: []*Rule{rule}}
groups[key] = &constraintsGroup{rules: []*pd.Rule{rule}}
continue
}
existing.rules = append(existing.rules, rule)
Expand All @@ -375,7 +367,7 @@ func (b *Bundle) Tidy() error {

// constraintsGroup is a group of rules with the same constraints.
type constraintsGroup struct {
rules []*Rule
rules []*pd.Rule
// canBecameLeader means the group has leader/voter role,
// it's valid if it has leader.
canBecameLeader bool
Expand Down Expand Up @@ -411,16 +403,16 @@ func transformableLeaderConstraint(groups map[string]*constraintsGroup) error {
// MergeRulesByRole merges the rules with the same role.
func (c *constraintsGroup) MergeRulesByRole() {
// Create a map to store rules by role
rulesByRole := make(map[PeerRoleType][]*Rule)
rulesByRole := make(map[pd.PeerRoleType][]*pd.Rule)

// Iterate through each rule
for _, rule := range c.rules {
// Add the rule to the map based on its role
rulesByRole[rule.Role] = append(rulesByRole[rule.Role], rule)
if rule.Role == Leader || rule.Role == Voter {
if rule.Role == pd.Leader || rule.Role == pd.Voter {
c.canBecameLeader = true
}
if rule.Role == Leader {
if rule.Role == pd.Leader {
c.isLeaderGroup = true
}
}
Expand Down Expand Up @@ -449,11 +441,11 @@ func (c *constraintsGroup) MergeTransformableRoles() {
if len(c.rules) == 0 || len(c.rules) == 1 {
return
}
var mergedRule *Rule
newRules := make([]*Rule, 0, len(c.rules))
var mergedRule *pd.Rule
newRules := make([]*pd.Rule, 0, len(c.rules))
for _, rule := range c.rules {
// Learner is not transformable, it should be promote by PD.
if rule.Role == Learner {
if rule.Role == pd.Learner {
newRules = append(newRules, rule)
continue
}
Expand All @@ -467,7 +459,7 @@ func (c *constraintsGroup) MergeTransformableRoles() {
}
}
if mergedRule != nil {
mergedRule.Role = Voter
mergedRule.Role = pd.Voter
newRules = append(newRules, mergedRule)
}
c.rules = newRules
Expand All @@ -491,7 +483,7 @@ func (b *Bundle) RebuildForRange(rangeName string, policyName string) *Bundle {
}

b.Override = true
newRules := make([]*Rule, 0, len(rule))
newRules := make([]*pd.Rule, 0, len(rule))
for i, r := range b.Rules {
cp := r.Clone()
cp.ID = fmt.Sprintf("%s_rule_%d", strings.ToLower(policyName), i)
Expand All @@ -508,7 +500,7 @@ func (b *Bundle) RebuildForRange(rangeName string, policyName string) *Bundle {
// Reset resets the bundle ID and keyrange of all rules.
func (b *Bundle) Reset(ruleIndex int, newIDs []int64) *Bundle {
// eliminate the redundant rules.
var basicRules []*Rule
var basicRules []*pd.Rule
if len(b.Rules) != 0 {
// Make priority for rules with RuleIndexTable cause of duplication rules existence with RuleIndexPartition.
// If RuleIndexTable doesn't exist, bundle itself is a independent series of rules for a partition.
Expand All @@ -526,7 +518,7 @@ func (b *Bundle) Reset(ruleIndex int, newIDs []int64) *Bundle {
b.ID = GroupID(newIDs[0])
b.Index = ruleIndex
b.Override = true
newRules := make([]*Rule, 0, len(basicRules)*len(newIDs))
newRules := make([]*pd.Rule, 0, len(basicRules)*len(newIDs))
for i, newID := range newIDs {
// rule.id should be distinguished with each other, otherwise it will be de-duplicated in pd http api.
var ruleID string
Expand Down Expand Up @@ -566,7 +558,7 @@ func (b *Bundle) Clone() *Bundle {
newBundle := &Bundle{}
*newBundle = *b
if len(b.Rules) > 0 {
newBundle.Rules = make([]*Rule, 0, len(b.Rules))
newBundle.Rules = make([]*pd.Rule, 0, len(b.Rules))
for i := range b.Rules {
newBundle.Rules = append(newBundle.Rules, b.Rules[i].Clone())
}
Expand Down Expand Up @@ -595,10 +587,10 @@ func (b *Bundle) ObjectID() (int64, error) {
return id, nil
}

func isValidLeaderRule(rule *Rule, dcLabelKey string) bool {
if rule.Role == Leader && rule.Count == 1 {
for _, con := range rule.Constraints {
if con.Op == In && con.Key == dcLabelKey && len(con.Values) == 1 {
func isValidLeaderRule(rule *pd.Rule, dcLabelKey string) bool {
if rule.Role == pd.Leader && rule.Count == 1 {
for _, con := range rule.LabelConstraints {
if con.Op == pd.In && con.Key == dcLabelKey && len(con.Values) == 1 {
return true
}
}
Expand All @@ -610,7 +602,7 @@ func isValidLeaderRule(rule *Rule, dcLabelKey string) bool {
func (b *Bundle) GetLeaderDC(dcLabelKey string) (string, bool) {
for _, rule := range b.Rules {
if isValidLeaderRule(rule, dcLabelKey) {
return rule.Constraints[0].Values[0], true
return rule.LabelConstraints[0].Values[0], true
}
}
return "", false
Expand Down
Loading
Loading