Skip to content

Commit

Permalink
Merge branch 'master' into update_pd_http_client_api
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 21, 2023
2 parents cd5397a + 5a2a8d6 commit c10956a
Show file tree
Hide file tree
Showing 31 changed files with 201 additions and 122 deletions.
4 changes: 3 additions & 1 deletion client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec(
fn func() error,
) error {
if err := fn(); err != nil {
after := time.NewTimer(bo.nextInterval())
defer after.Stop()
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
Expand Down
20 changes: 20 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetStoreMeta sets the meta for the store.
func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.Version = newMeta.GetVersion()
meta.GitHash = newMeta.GetGitHash()
meta.Address = newMeta.GetAddress()
meta.StatusAddress = newMeta.GetStatusAddress()
meta.PeerAddress = newMeta.GetPeerAddress()
meta.StartTimestamp = newMeta.GetStartTimestamp()
meta.DeployPath = newMeta.GetDeployPath()
meta.LastHeartbeat = newMeta.GetLastHeartbeat()
meta.State = newMeta.GetState()
meta.Labels = newMeta.GetLabels()
meta.NodeState = newMeta.GetNodeState()
meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed()
store.meta = meta
}
}
19 changes: 1 addition & 18 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
newStore := store.Clone(core.SetStoreStats(stats))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
Expand Down Expand Up @@ -486,10 +485,6 @@ func (c *Cluster) collectMetrics() {

c.coordinator.GetSchedulersController().CollectSchedulerMetrics()
c.coordinator.CollectHotSpotMetrics()
c.collectClusterMetrics()
}

func (c *Cluster) collectClusterMetrics() {
if c.regionStats == nil {
return
}
Expand All @@ -501,20 +496,8 @@ func (c *Cluster) collectClusterMetrics() {

func (c *Cluster) resetMetrics() {
statistics.Reset()

schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

func (c *Cluster) resetClusterMetrics() {
if c.regionStats == nil {
return
}
c.regionStats.Reset()
c.labelStats.Reset()
// reset hot cache metrics
c.hotStat.ResetMetrics()
}

// StartBackgroundJobs starts background jobs.
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package operator
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -514,7 +513,7 @@ func (suite *operatorTestSuite) TestOpStepTimeout() {
},
}
for i, v := range testData {
fmt.Printf("case:%d\n", i)
suite.T().Logf("case: %d", i)
for _, step := range v.step {
suite.Equal(v.expect, step.Timeout(v.regionSize))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth)
}

// Prepare does some prepare work
func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil }
// PrepareConfig does some prepare work about config.
func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil }

// Cleanup does some cleanup work
func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {}
// CleanConfig does some cleanup work about config.
func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6
}
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictStore := s.conf.evictStore()
if evictStore != 0 {
return cluster.SlowStoreEvicted(evictStore)
}
return nil
}

func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictStore())
suite.False(es2.conf.readyForRecovery())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,15 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictedStoreID := s.conf.evictedStore()
if evictedStoreID == 0 {
return nil
}
return cluster.SlowTrendEvicted(evictedStoreID)
}

func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictedStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictedStore())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error {
return nil
}

func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -209,7 +209,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *grantLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Scheduler interface {
ReloadConfig() error
GetMinInterval() time.Duration
GetNextInterval(interval time.Duration) time.Duration
Prepare(cluster sche.SchedulerCluster) error
Cleanup(cluster sche.SchedulerCluster)
PrepareConfig(cluster sche.SchedulerCluster) error
CleanConfig(cluster sche.SchedulerCluster)
Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan)
IsScheduleAllowed(cluster sche.SchedulerCluster) bool
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er
return err
}
c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args)
return nil
err := scheduler.PrepareConfig(c.cluster)
return err
}

// RemoveSchedulerHandler removes the HTTP handler for a scheduler.
Expand All @@ -183,6 +184,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error {
return err
}

s.(Scheduler).CleanConfig(c.cluster)
delete(c.schedulerHandlers, name)

return nil
Expand All @@ -198,7 +200,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
}

s := NewScheduleController(c.ctx, c.cluster, c.opController, scheduler)
if err := s.Scheduler.Prepare(c.cluster); err != nil {
if err := s.Scheduler.PrepareConfig(c.cluster); err != nil {
return err
}

Expand Down Expand Up @@ -343,7 +345,7 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) {
func (c *Controller) runScheduler(s *ScheduleController) {
defer logutil.LogPanic()
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)
defer s.Scheduler.CleanConfig(c.cluster)

ticker := time.NewTicker(s.GetInterval())
defer ticker.Stop()
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (w *HotCache) CollectMetrics() {
w.CheckReadAsync(newCollectMetricsTask())
}

// ResetMetrics resets the hot cache metrics.
func (w *HotCache) ResetMetrics() {
// ResetHotCacheStatusMetrics resets the hot cache metrics.
func ResetHotCacheStatusMetrics() {
hotCacheStatusGauge.Reset()
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func (r *RegionStatistics) Collect() {
regionWitnessLeaderRegionCounter.Set(float64(len(r.stats[WitnessLeader])))
}

// Reset resets the metrics of the regions' status.
func (r *RegionStatistics) Reset() {
// ResetRegionStatsMetrics resets the metrics of the regions' status.
func ResetRegionStatsMetrics() {
regionMissPeerRegionCounter.Set(0)
regionExtraPeerRegionCounter.Set(0)
regionDownPeerRegionCounter.Set(0)
Expand Down Expand Up @@ -326,8 +326,8 @@ func (l *LabelStatistics) Collect() {
}
}

// Reset resets the metrics of the label status.
func (l *LabelStatistics) Reset() {
// ResetLabelStatsMetrics resets the metrics of the label status.
func ResetLabelStatsMetrics() {
regionLabelLevelGauge.Reset()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,7 @@ func Reset() {
storeStatusGauge.Reset()
clusterStatusGauge.Reset()
placementStatusGauge.Reset()
ResetRegionStatsMetrics()
ResetLabelStatsMetrics()
ResetHotCacheStatusMetrics()
}
4 changes: 2 additions & 2 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
return schedulers.EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ func (c *RaftCluster) runMetricsCollectionJob() {
ticker := time.NewTicker(metricsCollectionJobInterval)
failpoint.Inject("highFrequencyClusterJobs", func() {
ticker.Stop()
ticker = time.NewTicker(time.Microsecond)
ticker = time.NewTicker(time.Millisecond)
})
defer ticker.Stop()

Expand Down Expand Up @@ -734,10 +734,10 @@ func (c *RaftCluster) Stop() {
return
}
c.running = false
c.cancel()
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.stopSchedulingJobs()
}
c.cancel()
c.Unlock()

c.wg.Wait()
Expand Down
Loading

0 comments on commit c10956a

Please sign in to comment.