Skip to content

Commit

Permalink
Merge branch 'release-7.5' into cherry-pick-8017-to-release-7.5
Browse files Browse the repository at this point in the history
  • Loading branch information
lhy1024 authored May 22, 2024
2 parents 6aa987e + 00487a4 commit c4f339f
Show file tree
Hide file tree
Showing 18 changed files with 368 additions and 121 deletions.
23 changes: 15 additions & 8 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ func (lim *Limiter) AvailableTokens(now time.Time) float64 {
return tokens
}

const reserveWarnLogInterval = 10 * time.Millisecond

// reserveN is a helper method for Reserve.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation.
Expand Down Expand Up @@ -374,14 +376,19 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.tokens = tokens
lim.maybeNotify()
} else {
log.Warn("[resource group controller] cannot reserve enough tokens",
zap.Duration("need-wait-duration", waitDuration),
zap.Duration("max-wait-duration", maxFutureReserve),
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
// print log if the limiter cannot reserve for a while.
if time.Since(lim.last) > reserveWarnLogInterval {
log.Warn("[resource group controller] cannot reserve enough tokens",
zap.Duration("need-wait-duration", waitDuration),
zap.Duration("max-wait-duration", maxFutureReserve),
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Float64("notify-threshold", lim.notifyThreshold),
zap.Bool("is-low-process", lim.isLowProcess),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
}
lim.last = last
if lim.limit == 0 {
lim.notify()
Expand Down
8 changes: 4 additions & 4 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewTSOServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-level", "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}
Expand All @@ -122,7 +122,7 @@ func NewSchedulingServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-level", "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}
Expand All @@ -142,7 +142,7 @@ func NewResourceManagerServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-level", "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().StringP("initial-cluster", "", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
cmd.Flags().StringP("join", "", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
cmd.Flags().StringP("metrics-addr", "", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-level", "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
Expand Down
15 changes: 15 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -5877,6 +5877,21 @@
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "B"
},
{
"expr": "- sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"out\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}-out",
"refId": "C",
"step": 4
},
{
"expr": "sum(delta(pd_scheduler_hot_region_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\",type=\"move-leader\",direction=\"in\",rw=\"write\"}[1m]))by (store)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "store-{{store}}-in",
"refId": "D"
}
],
"thresholds": [],
Expand Down
6 changes: 2 additions & 4 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,6 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = utils.DefaultLogFormat
}

c.Controller.Adjust(configMetaData.Child("controller"))
configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)

Expand All @@ -251,6 +247,8 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose
}
configutil.AdjustString(&c.Log.Format, utils.DefaultLogFormat)
configutil.AdjustString(&c.Log.Level, utils.DefaultLogLevel)
}

// GetTLSConfig returns the TLS config.
Expand Down
6 changes: 2 additions & 4 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,6 @@ func (c *Config) adjust(meta *toml.MetaData) error {
c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = utils.DefaultLogFormat
}

configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)

if err := c.Schedule.Adjust(configMetaData.Child("schedule"), false); err != nil {
Expand All @@ -161,6 +157,8 @@ func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose
}
configutil.AdjustString(&c.Log.Format, utils.DefaultLogFormat)
configutil.AdjustString(&c.Log.Level, utils.DefaultLogLevel)
}

// GetListenAddr returns the ListenAddr
Expand Down
6 changes: 2 additions & 4 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,15 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.adjustLog(configMetaData.Child("log"))
c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
c.Log.Format = utils.DefaultLogFormat
}

return nil
}

func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose
}
configutil.AdjustString(&c.Log.Format, utils.DefaultLogFormat)
configutil.AdjustString(&c.Log.Level, utils.DefaultLogLevel)
}

// Validate is used to validate if some configurations are right.
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
DefaultHTTPGracefulShutdownTimeout = 5 * time.Second
// DefaultLogFormat is the default log format
DefaultLogFormat = "text"
// DefaultLogLevel is the default log level
DefaultLogLevel = "info"
// DefaultDisableErrorVerbose is the default value of DisableErrorVerbose
DefaultDisableErrorVerbose = true
// DefaultLeaderLease is the default value of LeaderLease
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/filter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func NewCounter(scope string) *Counter {
return &Counter{counter: counter, scope: scope}
}

// SetScope sets the scope for the counter.
func (c *Counter) SetScope(scope string) {
c.scope = scope
}

// Add adds the filter counter.
func (c *Counter) inc(action action, filterType filterType, sourceID uint64, targetID uint64) {
if _, ok := c.counter[action][filterType][sourceID]; !ok {
Expand Down
30 changes: 27 additions & 3 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
if !ok || op == nil {
return nil, true
}
r = oc.cluster.GetRegion(regionID)
if r == nil {
// Check the operator lightly. It cant't dispatch the op for some scenario.
var reason CancelReasonType
r, reason = oc.checkOperatorLightly(op)
if len(reason) != 0 {
_ = oc.removeOperatorLocked(op)
if op.Cancel(RegionNotFound) {
if op.Cancel(reason) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -301,6 +303,7 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
if isMerge {
// count two merge operators as one, so wopStatus.ops[desc] should
// not be updated here
// TODO: call checkAddOperator ...
i++
added++
oc.wop.PutOperator(ops[i])
Expand Down Expand Up @@ -455,6 +458,27 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool
return reason != Expired, reason
}

// checkOperatorLightly checks whether the ops can be dispatched in Controller::pollNeedDispatchRegion.
// The operators can't be dispatched for some scenarios, such as region disappeared, region changed ...
// `region` is the target region of `op`.
func (oc *Controller) checkOperatorLightly(op *Operator) (*core.RegionInfo, CancelReasonType) {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
operatorCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return nil, RegionNotFound
}

// It may be suitable for all kinds of operator but not merge-region.
// But to be cautions, it only takes effect on merge-region currently.
// If the version of epoch is changed, the region has been splitted or merged, and the key range has been changed.
// The changing for conf_version of epoch doesn't modify the region key range, skip it.
if (op.Kind()&OpMerge != 0) && region.GetRegionEpoch().GetVersion() > op.RegionEpoch().GetVersion() {
operatorCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return nil, EpochNotMatch
}
return region, ""
}

func isHigherPriorityOperator(new, old *Operator) bool {
return new.GetPriorityLevel() > old.GetPriorityLevel()
}
Expand Down
125 changes: 125 additions & 0 deletions pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,131 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() {
suite.False(next)
}

// issue #7992
func (suite *operatorControllerTestSuite) TestPollDispatchRegionForMergeRegion() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)
re.Equal(2, controller.AddWaitingOperator(ops...))
// Change next push time to now, it's used to make test case faster.
controller.opNotifierQueue[0].time = time.Now()

// first poll gets source region op.
r, next := controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

// second poll gets target region op.
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, target)

// third poll removes the two merge-region ops.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

// fourth poll removes target region op from opNotifierQueue
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)

// Add the two ops to waiting operators again.
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 0}
controller.records.ttl.Remove(101)
controller.records.ttl.Remove(102)
ops, err = CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Equal(2, controller.AddWaitingOperator(ops...))
// change the target RegionEpoch
// first poll gets source region from opNotifierQueue
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Equal(r, source)

r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Len(controller.opNotifierQueue, 1)
re.Empty(controller.operators)
re.Empty(controller.wop.ListOperator())
re.NotNil(controller.records.Get(101))
re.NotNil(controller.records.Get(102))

controller.opNotifierQueue[0].time = time.Now()
r, next = controller.pollNeedDispatchRegion()
re.True(next)
re.Nil(r)
re.Empty(controller.opNotifierQueue)
}

func (suite *operatorControllerTestSuite) TestCheckOperatorLightly() {
re := suite.Require()
opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(suite.ctx, opts)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewController(suite.ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), stream)
cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})

source := newRegionInfo(101, "1a", "1b", 10, 10, []uint64{101, 1}, []uint64{101, 1})
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(source)
target := newRegionInfo(102, "1b", "1c", 10, 10, []uint64{101, 1}, []uint64{101, 1})
target.GetMeta().RegionEpoch = &metapb.RegionEpoch{}
cluster.PutRegion(target)

ops, err := CreateMergeRegionOperator("merge-region", cluster, source, target, OpMerge)
re.NoError(err)
re.Len(ops, 2)

// check successfully
r, reason := controller.checkOperatorLightly(ops[0])
re.Empty(reason)
re.Equal(r, source)

// check failed because of region disappeared
cluster.RemoveRegion(target)
r, reason = controller.checkOperatorLightly(ops[1])
re.Nil(r)
re.Equal(reason, RegionNotFound)

// check failed because of verions of region epoch changed
cluster.PutRegion(target)
source.GetMeta().RegionEpoch = &metapb.RegionEpoch{ConfVer: 0, Version: 1}
r, reason = controller.checkOperatorLightly(ops[0])
re.Nil(r)
re.Equal(reason, EpochNotMatch)
}

func (suite *operatorControllerTestSuite) TestStoreLimit() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ func (l *balanceLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Reques
// BalanceLeaderCreateOption is used to create a scheduler with an option.
type BalanceLeaderCreateOption func(s *balanceLeaderScheduler)

// WithBalanceLeaderFilterCounterName sets the filter counter name for the scheduler.
func WithBalanceLeaderFilterCounterName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
s.filterCounter.SetScope(name)
}
}

// WithBalanceLeaderName sets the name for the scheduler.
func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
return func(s *balanceLeaderScheduler) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/schedule/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func WithBalanceRegionName(name string) BalanceRegionCreateOption {
}
}

// WithBalanceRegionFilterCounterName sets the filter counter name for the scheduler.
func WithBalanceRegionFilterCounterName(name string) BalanceRegionCreateOption {
return func(s *balanceRegionScheduler) {
s.filterCounter.SetScope(name)
}
}

func (s *balanceRegionScheduler) GetName() string {
return s.conf.Name
}
Expand Down
Loading

0 comments on commit c4f339f

Please sign in to comment.