Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: add service mode server #5994

Merged
merged 28 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
// errNotLeaderMsg is returned when the requested server is not the leader.
errNotLeaderMsg = "not leader"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please explain a little about what "primary" mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For microservices that do not contain an embedded etcd, we will use primary to mark the node that is currently providing the service.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether it will affect the error handling behavior of the client? we need to confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, it will rely on the client's service discovery to get the nodes of primary directly, which is only a transition for now

)

// ResourceManagerClient manages resource group info and token request.
Expand All @@ -58,7 +58,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotLeaderMsg) {
if strings.Contains(err.Error(), errNotPrimary) {
c.ScheduleCheckLeader()
Comment on lines +61 to 62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we unify the terms leader and primary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions will be removed after client finished

}
}
Expand Down
25 changes: 22 additions & 3 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/autoscaling"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
resource_manager "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/configutil"
Expand Down Expand Up @@ -78,14 +79,15 @@ func main() {
// NewServiceCommand returns the service command.
func NewServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "service <tso>",
Short: "Run a service",
Use: "service <mode>",
Short: "Run a service, for example, tso, resource_manager",
}
cmd.AddCommand(NewTSOServiceCommand())
cmd.AddCommand(NewResourceManagerServiceCommand())
return cmd
}

// NewTSOServiceCommand returns the unsafe remove failed stores command.
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tso",
Expand All @@ -102,6 +104,23 @@ func NewTSOServiceCommand() *cobra.Command {
return cmd
}

// NewResourceManagerServiceCommand returns the resource manager service command.
func NewResourceManagerServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "resource_manager",
Short: "Run the resource manager service",
Run: resource_manager.CreateServerWrapper,
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
return cmd
}

func createServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cfg := config.NewConfig()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ require (
github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/stretchr/objx v0.5.0 // indirect
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"net/http"

"github.com/tikv/pd/pkg/member"
"go.etcd.io/etcd/clientv3"
)

Expand All @@ -38,9 +37,8 @@ type Server interface {
GetHTTPClient() *http.Client
// AddStartCallback adds a callback in the startServer phase.
AddStartCallback(callbacks ...func())
// TODO: replace these two methods with `primary` function without etcd server dependency.
// GetMember returns the member information.
GetMember() *member.Member
// AddLeaderCallback adds a callback in the leader campaign phase.
AddLeaderCallback(callbacks ...func(context.Context))
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is kind of confusing since "serving" may refer to whether a server is running rather than it's the one who provides a service to outside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsServiceProvider?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for reference. no strong opinion.

// AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
}
170 changes: 170 additions & 0 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.uber.org/zap"
)

const (
defaultName = "Resource Manager"
defaultBackendEndpoints = "127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:3380"
defaultEnableGRPCGateway = true

defaultLogFormat = "text"
defaultDisableErrorVerbose = true
)

// Config is the configuration for the resource manager.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is DataDir used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L162 log path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log.File.Filename does the same thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will do in another pr

EnableGRPCGateway bool `json:"enable-grpc-gateway"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`

Logger *zap.Logger
LogProps *log.ZapProperties
Security configutil.SecurityConfig `toml:"security" json:"security"`
}

// NewConfig creates a new config.
func NewConfig() *Config {
return &Config{}
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Load config file if specified.
var (
meta *toml.MetaData
err error
)
if configFile, _ := flagSet.GetString("config"); configFile != "" {
meta, err = configutil.ConfigFromFile(c, configFile)
if err != nil {
return err
}
}

// Ignore the error check here
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level")
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file")
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr")
configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert")
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")

return c.Adjust(meta, false)
}

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
warningMsgs := make([]string, 0)
if err := configMetaData.CheckUndecoded(); err != nil {
warningMsgs = append(warningMsgs, err.Error())
}
configutil.PrintConfigCheckMsg(os.Stdout, warningMsgs)

if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))
}
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name))
adjustPath(&c.DataDir)

if err := c.Validate(); err != nil {
return err
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)

if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = defaultEnableGRPCGateway
}

c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = defaultLogFormat
}

return nil
}

func adjustPath(p *string) {
absPath, err := filepath.Abs(*p)
if err == nil {
*p = absPath
}
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
}
}

// GetTLSConfig returns the TLS config.
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig {
return &c.Security.TLSConfig
}

// Validate is used to validate if some configurations are right.
func (c *Config) Validate() error {
dataDir, err := filepath.Abs(c.DataDir)
if err != nil {
return errors.WithStack(err)
}
logFile, err := filepath.Abs(c.Log.File.Filename)
if err != nil {
return errors.WithStack(err)
}
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile))
if err != nil {
return errors.WithStack(err)
}
if !strings.HasPrefix(rel, "..") {
return errors.New("log directory shouldn't be the subdirectory of data directory")
}

return nil
}
16 changes: 8 additions & 8 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ func (s *Service) GetManager() *Manager {
return s.manager
}

func (s *Service) checkLeader() error {
if !s.manager.member.IsLeader() {
func (s *Service) checkServing() error {
if !s.manager.srv.IsServing() {
return errNotLeader
}
return nil
}

// GetResourceGroup implements ResourceManagerServer.GetResourceGroup.
func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGroupRequest) (*rmpb.GetResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := s.manager.GetResourceGroup(req.ResourceGroupName)
Expand All @@ -107,7 +107,7 @@ func (s *Service) GetResourceGroup(ctx context.Context, req *rmpb.GetResourceGro

// ListResourceGroups implements ResourceManagerServer.ListResourceGroups.
func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResourceGroupsRequest) (*rmpb.ListResourceGroupsResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
groups := s.manager.GetResourceGroupList()
Expand All @@ -122,7 +122,7 @@ func (s *Service) ListResourceGroups(ctx context.Context, req *rmpb.ListResource

// AddResourceGroup implements ResourceManagerServer.AddResourceGroup.
func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
rg := FromProtoResourceGroup(req.GetGroup())
Expand All @@ -135,7 +135,7 @@ func (s *Service) AddResourceGroup(ctx context.Context, req *rmpb.PutResourceGro

// DeleteResourceGroup implements ResourceManagerServer.DeleteResourceGroup.
func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResourceGroupRequest) (*rmpb.DeleteResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.DeleteResourceGroup(req.ResourceGroupName)
Expand All @@ -147,7 +147,7 @@ func (s *Service) DeleteResourceGroup(ctx context.Context, req *rmpb.DeleteResou

// ModifyResourceGroup implements ResourceManagerServer.ModifyResourceGroup.
func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResourceGroupRequest) (*rmpb.PutResourceGroupResponse, error) {
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.ModifyResourceGroup(req.GetGroup())
Expand All @@ -172,7 +172,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
if err != nil {
return errors.WithStack(err)
}
if err := s.checkLeader(); err != nil {
if err := s.checkServing(); err != nil {
return err
}
targetPeriodMs := request.GetTargetRequestPeriodMs()
Expand Down
10 changes: 4 additions & 6 deletions pkg/mcs/resource_manager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"go.uber.org/zap"
Expand All @@ -38,7 +37,7 @@ const defaultConsumptionChanSize = 1024
// Manager is the manager of resource group.
type Manager struct {
sync.RWMutex
member *member.Member
srv bs.Server
groups map[string]*ResourceGroup
storage endpoint.ResourceGroupStorage
// consumptionChan is used to send the consumption
Expand All @@ -52,7 +51,6 @@ type Manager struct {
// NewManager returns a new Manager.
func NewManager(srv bs.Server) *Manager {
m := &Manager{
member: &member.Member{},
groups: make(map[string]*ResourceGroup),
consumptionDispatcher: make(chan struct {
resourceGroupName string
Expand All @@ -66,10 +64,10 @@ func NewManager(srv bs.Server) *Manager {
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
nil,
)
m.member = srv.GetMember()
m.srv = srv
})
// The second initialization after the leader is elected.
srv.AddLeaderCallback(m.Init)
// The second initialization after becoming serving.
srv.AddServiceReadyCallback(m.Init)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge AddStartCallback and AddServiceReadyCallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will affect pd legacy server.

return m
}

Expand Down
Loading