Skip to content

Commit

Permalink
Merge branch 'master' into cleanup-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Nov 20, 2023
2 parents 04caaa6 + 89c8374 commit a35f679
Show file tree
Hide file tree
Showing 26 changed files with 467 additions and 87 deletions.
20 changes: 20 additions & 0 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption {
store.lastAwakenTime = lastAwaken
}
}

// SetStoreMeta sets the meta for the store.
func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.Version = newMeta.GetVersion()
meta.GitHash = newMeta.GetGitHash()
meta.Address = newMeta.GetAddress()
meta.StatusAddress = newMeta.GetStatusAddress()
meta.PeerAddress = newMeta.GetPeerAddress()
meta.StartTimestamp = newMeta.GetStartTimestamp()
meta.DeployPath = newMeta.GetDeployPath()
meta.LastHeartbeat = newMeta.GetLastHeartbeat()
meta.State = newMeta.GetState()
meta.Labels = newMeta.GetLabels()
meta.NodeState = newMeta.GetNodeState()
meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed()
store.meta = meta
}
}
167 changes: 165 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apis
import (
"encoding/hex"
"net/http"
"net/url"
"strconv"
"sync"

Expand Down Expand Up @@ -191,12 +192,22 @@ func (s *Service) RegisterConfigRouter() {
placementRule := router.Group("placement-rule")
placementRule.GET("", getPlacementRules)
placementRule.GET("/:group", getPlacementRuleByGroup)

regionLabel := router.Group("region-label")
regionLabel.GET("/rules", getAllRegionLabelRules)
regionLabel.GET("/rules/ids", getRegionLabelRulesByIDs)
regionLabel.GET("/rules/:id", getRegionLabelRuleByID)

regions := router.Group("regions")
regions.GET("/:id/label/:key", getRegionLabelByKey)
regions.GET("/:id/labels", getRegionLabels)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
// @Success 200 {string} string "The log level is updated."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/log [put]
func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand Down Expand Up @@ -230,6 +241,7 @@ func getConfig(c *gin.Context) {
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/cache/regions [delete]
func deleteAllRegionCache(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand All @@ -248,6 +260,7 @@ func deleteAllRegionCache(c *gin.Context) {
// @Produce json
// @Success 200 {string} string "The region is removed from server cache."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/cache/regions/{id} [delete]
func deleteRegionCacheByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand Down Expand Up @@ -683,8 +696,6 @@ func getHotBuckets(c *gin.Context) {
// @Accept json
// @Produce json
// @Success 200 {object} storage.HistoryHotRegions
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/history [get]
func getHistoryHotRegions(c *gin.Context) {
// TODO: support history hotspot in scheduling server with stateless in the future.
Expand Down Expand Up @@ -955,3 +966,155 @@ func getPlacementRuleByGroup(c *gin.Context) {
group := manager.GetGroupBundle(g)
c.IndentedJSON(http.StatusOK, group)
}

// @Tags region_label
// @Summary Get label of a region.
// @Param id path integer true "Region Id"
// @Param key path string true "Label key"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/regions/{id}/label/{key} [get]
func getRegionLabelByKey(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
labelKey := c.Param("key") // TODO: test https://github.com/tikv/pd/pull/4004

id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region, err := handler.GetRegion(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labelValue := l.GetRegionLabel(region, labelKey)
c.IndentedJSON(http.StatusOK, labelValue)
}

// @Tags region_label
// @Summary Get labels of a region.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Router /config/regions/{id}/labels [get]
func getRegionLabels(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region, err := handler.GetRegion(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labels := l.GetRegionLabels(region)
c.IndentedJSON(http.StatusOK, labels)
}

// @Tags region_label
// @Summary List all label rules of cluster.
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules [get]
func getAllRegionLabelRules(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rules := l.GetAllLabelRules()
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rules of cluster by ids.
// @Param body body []string true "IDs of query rules"
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules/ids [get]
func getRegionLabelRulesByIDs(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
var ids []string
if err := c.BindJSON(&ids); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
rules, err := l.GetLabelRules(ids)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rule of cluster by id.
// @Param id path string true "Rule Id"
// @Produce json
// @Success 200 {object} labeler.LabelRule
// @Failure 404 {string} string "The rule does not exist."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules/{id} [get]
func getRegionLabelRuleByID(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

id, err := url.PathUnescape(c.Param("id"))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rule := l.GetLabelRule(id)
if rule == nil {
c.String(http.StatusNotFound, errs.ErrRegionRuleNotFound.FastGenByArgs().Error())
return
}
c.IndentedJSON(http.StatusOK, rule)
}
3 changes: 1 addition & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq
return errors.Errorf("store %v not found", storeID)
}

nowTime := time.Now()
newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime))
newStore := store.Clone(core.SetStoreStats(stats))

if store := c.GetStore(storeID); store != nil {
statistics.UpdateStoreHeartbeatMetrics(store)
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error {
w.basicCluster.PutStore(core.NewStoreInfo(store))
return nil
}
w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed())))
w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store)))
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
Expand Down
19 changes: 19 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
Expand Down Expand Up @@ -1063,6 +1064,24 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error)
return ret, nil
}

// GetRegion returns the region labeler.
func (h *Handler) GetRegion(id uint64) (*core.RegionInfo, error) {
c := h.GetCluster()
if c == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return c.GetRegion(id), nil
}

// GetRegionLabeler returns the region labeler.
func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) {
c := h.GetCluster()
if c == nil || c.GetRegionLabeler() == nil {
return nil, errs.ErrNotBootstrapped
}
return c.GetRegionLabeler(), nil
}

// GetRuleManager returns the rule manager.
func (h *Handler) GetRuleManager() (*placement.RuleManager, error) {
c := h.GetCluster()
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth)
}

// Prepare does some prepare work
func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil }
// PrepareConfig does some prepare work about config.
func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil }

// Cleanup does some cleanup work
func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {}
// CleanConfig does some cleanup work about config.
func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6
}
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictStore := s.conf.evictStore()
if evictStore != 0 {
return cluster.SlowStoreEvicted(evictStore)
}
return nil
}

func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictStore())
suite.False(es2.conf.readyForRecovery())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,15 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictedStoreID := s.conf.evictedStore()
if evictedStoreID == 0 {
return nil
}
return cluster.SlowTrendEvicted(evictedStoreID)
}

func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictedStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictedStore())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}
Loading

0 comments on commit a35f679

Please sign in to comment.