Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: add service mode server #5994

Merged
merged 28 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
// errNotLeaderMsg is returned when the requested server is not the leader.
errNotLeaderMsg = "not leader"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain a little about what "primary" mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For microservices that do not contain an embedded etcd, we will use primary to mark the node that is currently providing the service.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether it will affect the error handling behavior of the client? we need to confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, it will rely on the client's service discovery to get the nodes of primary directly, which is only a transition for now

)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -58,7 +58,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotLeaderMsg) {
if strings.Contains(err.Error(), errNotPrimary) {
c.ScheduleCheckLeader()
Comment on lines +61 to 62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we unify the terms leader and primary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions will be removed after client finished

}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"net/http"

"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -38,9 +37,8 @@ type Server interface {
GetHTTPClient() *http.Client
// AddStartCallback adds a callback in the startServer phase.
AddStartCallback(callbacks ...func())
// TODO: replace these two methods with `primary` function without etcd server dependency.
// GetMember returns the member information.
GetMember() *member.Member
// AddLeaderCallback adds a callback in the leader campaign phase.
AddLeaderCallback(callbacks ...func(context.Context))
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is kind of confusing since "serving" may refer to whether a server is running rather than it's the one who provides a service to outside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsServiceProvider?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for reference. no strong opinion.

// AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
}
92 changes: 92 additions & 0 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2022 TiKV Project Authors.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"

lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
"go.uber.org/zap"
)

// Config is the configuration for the TSO.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Logger *zap.Logger
LogProps *log.ZapProperties

Security SecurityConfig `toml:"security" json:"security"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
return &Config{}
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Load config file if specified.
if configFile, _ := flagSet.GetString("config"); configFile != "" {
_, err := c.configFromFile(configFile)
if err != nil {
return err
}
}

// ignore the error check here
adjustCommandlineString(flagSet, &c.Log.Level, "log-level")
adjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file")
adjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr")
adjustCommandlineString(flagSet, &c.Security.CAPath, "cacert")
adjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
adjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
adjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
adjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")

// TODO: Implement the main function body
return nil
}

// configFromFile loads config from file.
func (c *Config) configFromFile(path string) (*toml.MetaData, error) {
meta, err := toml.DecodeFile(path, c)
return &meta, errors.WithStack(err)
}

// SecurityConfig indicates the security configuration for pd server
type SecurityConfig struct {
grpcutil.TLSConfig
// RedactInfoLog indicates that whether enabling redact log
RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"`
Encryption encryption.Config `toml:"encryption" json:"encryption"`
}

func adjustCommandlineString(flagSet *pflag.FlagSet, v *string, name string) {
if value, _ := flagSet.GetString(name); value != "" {
*v = value
}
}
16 changes: 8 additions & 8 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ func (s *Service) GetManager() *Manager {
return s.manager
}

func (s *Service) checkLeader() error {
if !s.manager.member.IsLeader() {
func (s *Service) checkServing() error {
if !s.manager.srv.IsServing() {
return errNotLeader
}
return nil
}

// GetResourceGroup implements ResourceManagerServer.GetResourceGroup.
func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
Expand All @@ -107,7 +107,7 @@ func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGro

// ListResourceGroups implements ResourceManagerServer.ListResourceGroups.
func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResourceGroupsRequest) (*rmpb.ListResourceGroupsResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
groups := s.manager.GetResourceGroupList()
Expand All @@ -122,7 +122,7 @@ func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResource

// AddResourceGroup implements ResourceManagerServer.AddResourceGroup.
func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := FromProtoResourceGroup(req.GetGroup())
Expand All @@ -135,7 +135,7 @@ func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGro

// DeleteResourceGroup implements ResourceManagerServer.DeleteResourceGroup.
func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResourceGroupRequest) (*rmpb.DeleteResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.DeleteResourceGroup(req.ResourceGroupName)
Expand All @@ -147,7 +147,7 @@ func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResou

// ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.
func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.ModifyResourceGroup(req.GetGroup())
Expand All @@ -172,7 +172,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
if err != nil {
return errors.WithStack(err)
}
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return err
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
Expand Down
10 changes: 4 additions & 6 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"go.uber.org/zap"
Expand All @@ -38,7 +37,7 @@ const defaultConsumptionChanSize = 1024
// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
member *member.Member
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
Expand All @@ -52,7 +51,6 @@ type Manager struct {
// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
m := &Manager{
member: &member.Member{},
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
Expand All @@ -66,10 +64,10 @@ func NewManager(srv bs.Server) *Manager {
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
nil,
)
m.member = srv.GetMember()
m.srv = srv
})
// The second initialization after the leader is elected.
srv.AddLeaderCallback(m.Init)
// The second initialization after becoming serving.
srv.AddServiceReadyCallback(m.Init)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge AddStartCallback and AddServiceReadyCallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will affect pd legacy server.

return m
}

Expand Down
Loading