-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resource_manager: add service mode server #5994
Changes from 24 commits
0bebe96
e12f074
00eced3
f90a988
216b9e4
3495230
fb5ef97
136fc94
8ef16e4
fc02ece
d834b11
e51fd17
f07206e
3f8328c
c7a2e57
f144e7c
07ad1a7
ea4c5bb
f6cefeb
739ecd7
98a6fdb
5ead2d3
4c1911c
b0d7c5f
27adb0d
a9a016a
a94eaab
241bb8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,8 @@ const ( | |
add actionType = 0 | ||
modify actionType = 1 | ||
groupSettingsPathPrefix = "resource_group/settings" | ||
// errNotLeaderMsg is returned when the requested server is not the leader. | ||
errNotLeaderMsg = "not leader" | ||
// errNotPrimary is returned when the requested server is not primary. | ||
errNotPrimary = "not primary" | ||
) | ||
|
||
// ResourceManagerClient manages resource group info and token request. | ||
|
@@ -58,7 +58,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient { | |
|
||
// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. | ||
func (c *client) gRPCErrorHandler(err error) { | ||
if strings.Contains(err.Error(), errNotLeaderMsg) { | ||
if strings.Contains(err.Error(), errNotPrimary) { | ||
c.ScheduleCheckLeader() | ||
Comment on lines
+61
to
62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will we unify the terms leader and primary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two functions will be removed after client finished |
||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ import ( | |
"context" | ||
"net/http" | ||
|
||
"github.com/tikv/pd/pkg/member" | ||
"go.etcd.io/etcd/clientv3" | ||
) | ||
|
||
|
@@ -38,9 +37,8 @@ type Server interface { | |
GetHTTPClient() *http.Client | ||
// AddStartCallback adds a callback in the startServer phase. | ||
AddStartCallback(callbacks ...func()) | ||
// TODO: replace these two methods with `primary` function without etcd server dependency. | ||
// GetMember returns the member information. | ||
GetMember() *member.Member | ||
// AddLeaderCallback adds a callback in the leader campaign phase. | ||
AddLeaderCallback(callbacks ...func(context.Context)) | ||
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. | ||
IsServing() bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this name is kind of confusing since "serving" may refer to whether a server is running rather than it's the one who provides a service to outside. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IsServiceProvider? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for reference. no strong opinion. |
||
// AddServiceReadyCallback adds the callback function when the server becomes the leader, if there is embedded etcd, or the primary otherwise. | ||
AddServiceReadyCallback(callbacks ...func(context.Context)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package server | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/BurntSushi/toml" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"github.com/spf13/pflag" | ||
"github.com/tikv/pd/pkg/utils/configutil" | ||
"github.com/tikv/pd/pkg/utils/grpcutil" | ||
"github.com/tikv/pd/pkg/utils/metricutil" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
defaultName = "Resource Manager" | ||
defaultBackendEndpoints = "127.0.0.1:2379" | ||
defaultListenAddr = "127.0.0.1:3380" | ||
defaultEnableGRPCGateway = true | ||
|
||
defaultLogFormat = "text" | ||
defaultDisableErrorVerbose = true | ||
) | ||
|
||
// Config is the configuration for the resource manager. | ||
type Config struct { | ||
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` | ||
ListenAddr string `toml:"listen-addr" json:"listen-addr"` | ||
Name string `toml:"name" json:"name"` | ||
DataDir string `toml:"data-dir" json:"data-dir"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L162 log path? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log.File.Filename does the same thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will do in another pr |
||
EnableGRPCGateway bool `json:"enable-grpc-gateway"` | ||
|
||
Metric metricutil.MetricConfig `toml:"metric" json:"metric"` | ||
|
||
// Log related config. | ||
Log log.Config `toml:"log" json:"log"` | ||
|
||
Logger *zap.Logger | ||
LogProps *log.ZapProperties | ||
Security configutil.SecurityConfig `toml:"security" json:"security"` | ||
} | ||
|
||
// NewConfig creates a new config. | ||
func NewConfig() *Config { | ||
return &Config{} | ||
} | ||
|
||
// Parse parses flag definitions from the argument list. | ||
func (c *Config) Parse(flagSet *pflag.FlagSet) error { | ||
// Load config file if specified. | ||
var ( | ||
meta *toml.MetaData | ||
err error | ||
) | ||
if configFile, _ := flagSet.GetString("config"); configFile != "" { | ||
meta, err = configutil.ConfigFromFile(c, configFile) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// Ignore the error check here | ||
configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") | ||
configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") | ||
configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert") | ||
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key") | ||
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") | ||
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") | ||
|
||
return c.Adjust(meta, false) | ||
} | ||
|
||
// Adjust is used to adjust the PD configurations. | ||
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { | ||
configMetaData := configutil.NewConfigMetadata(meta) | ||
warningMsgs := make([]string, 0) | ||
if err := configMetaData.CheckUndecoded(); err != nil { | ||
warningMsgs = append(warningMsgs, err.Error()) | ||
} | ||
configutil.PrintConfigCheckMsg(os.Stdout, warningMsgs) | ||
|
||
if c.Name == "" { | ||
hostname, err := os.Hostname() | ||
if err != nil { | ||
return err | ||
} | ||
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) | ||
} | ||
configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) | ||
adjustPath(&c.DataDir) | ||
|
||
if err := c.Validate(); err != nil { | ||
return err | ||
} | ||
|
||
configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) | ||
configutil.AdjustString(&c.ListenAddr, defaultListenAddr) | ||
|
||
if !configMetaData.IsDefined("enable-grpc-gateway") { | ||
c.EnableGRPCGateway = defaultEnableGRPCGateway | ||
} | ||
|
||
c.adjustLog(configMetaData.Child("log")) | ||
c.Security.Encryption.Adjust() | ||
|
||
if len(c.Log.Format) == 0 { | ||
c.Log.Format = defaultLogFormat | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func adjustPath(p *string) { | ||
absPath, err := filepath.Abs(*p) | ||
if err == nil { | ||
*p = absPath | ||
} | ||
} | ||
|
||
func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { | ||
if !meta.IsDefined("disable-error-verbose") { | ||
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose | ||
} | ||
} | ||
|
||
// GetTLSConfig returns the TLS config. | ||
func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { | ||
return &c.Security.TLSConfig | ||
} | ||
|
||
// Validate is used to validate if some configurations are right. | ||
func (c *Config) Validate() error { | ||
dataDir, err := filepath.Abs(c.DataDir) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
logFile, err := filepath.Abs(c.Log.File.Filename) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) | ||
if err != nil { | ||
return errors.WithStack(err) | ||
} | ||
if !strings.HasPrefix(rel, "..") { | ||
return errors.New("log directory shouldn't be the subdirectory of data directory") | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ import ( | |
rmpb "github.com/pingcap/kvproto/pkg/resource_manager" | ||
"github.com/pingcap/log" | ||
bs "github.com/tikv/pd/pkg/basicserver" | ||
"github.com/tikv/pd/pkg/member" | ||
"github.com/tikv/pd/pkg/storage/endpoint" | ||
"github.com/tikv/pd/pkg/storage/kv" | ||
"go.uber.org/zap" | ||
|
@@ -38,7 +37,7 @@ const defaultConsumptionChanSize = 1024 | |
// Manager is the manager of resource group. | ||
type Manager struct { | ||
sync.RWMutex | ||
member *member.Member | ||
srv bs.Server | ||
groups map[string]*ResourceGroup | ||
storage endpoint.ResourceGroupStorage | ||
// consumptionChan is used to send the consumption | ||
|
@@ -52,7 +51,6 @@ type Manager struct { | |
// NewManager returns a new Manager. | ||
func NewManager(srv bs.Server) *Manager { | ||
m := &Manager{ | ||
member: &member.Member{}, | ||
groups: make(map[string]*ResourceGroup), | ||
consumptionDispatcher: make(chan struct { | ||
resourceGroupName string | ||
|
@@ -66,10 +64,10 @@ func NewManager(srv bs.Server) *Manager { | |
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"), | ||
nil, | ||
) | ||
m.member = srv.GetMember() | ||
m.srv = srv | ||
}) | ||
// The second initialization after the leader is elected. | ||
srv.AddLeaderCallback(m.Init) | ||
// The second initialization after becoming serving. | ||
srv.AddServiceReadyCallback(m.Init) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will affect pd legacy server. |
||
return m | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain a little about what "primary" mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For microservices that do not contain an embedded etcd, we will use
primary
to mark the node that is currently providing the service.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether it will affect the error handling behavior of the client? we need to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, it will rely on the client's service discovery to get the nodes of primary directly, which is only a transition for now