Skip to content

Commit

Permalink
some fixes to release 3.0 (#1612)
Browse files Browse the repository at this point in the history
* schedulers: fix the issue about the limit of the hot region (#1552)

Signed-off-by: nolouch <nolouch@gmail.com>

* config: add a switch about grpc gateway (#1596)

Signed-off-by: nolouch <nolouch@gmail.com>

* statistics: add the missing schedule config items (#1601)

Signed-off-by: nolouch <nolouch@gmail.com>

* update changelog

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch authored and disksing committed Jul 3, 2019
1 parent bfbaa06 commit 0f2c85e
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# PD Change Log

## Unreleased

+ Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552)
+ Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596)
+ Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601)

## v3.0.0

+ Support re-creating a cluster from a single node
Expand Down
51 changes: 51 additions & 0 deletions server/api/etcd_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"encoding/json"
"net/http"
"strings"

. "github.com/pingcap/check"
)

var _ = Suite(&testEtcdAPISuite{})

type testEtcdAPISuite struct {
hc *http.Client
}

func (s *testEtcdAPISuite) SetUpSuite(c *C) {
s.hc = newHTTPClient()
}

func (s *testEtcdAPISuite) TestGRPCGateway(c *C) {
svr, clean := mustNewServer(c)
defer clean()

addr := svr.GetConfig().ClientUrls + "/v3/kv/put"
putKey := map[string]string{"key": "Zm9v", "value": "YmFy"}
v, _ := json.Marshal(putKey)
err := postJSON(addr, v)
c.Assert(err, IsNil)
addr = svr.GetConfig().ClientUrls + "/v3/kv/range"
getKey := map[string]string{"key": "Zm9v"}
v, _ = json.Marshal(getKey)
err = postJSON(addr, v, func(res []byte) bool {
c.Assert(strings.Contains(string(res), "Zm9v"), IsTrue)
return true
})
c.Assert(err, IsNil)
}
11 changes: 9 additions & 2 deletions server/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,26 @@ func readJSON(r io.ReadCloser, data interface{}) error {
return nil
}

func postJSON(url string, data []byte) error {
func postJSON(url string, data []byte, checkOpts ...func(res []byte) bool) error {
resp, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return errors.WithStack(err)
}
res, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
defer resp.Body.Close()

if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
return errors.New(string(res))
}
for _, opt := range checkOpts {
if !opt(res) {
return errors.New("check failed")
}
}
return nil
}

Expand Down
12 changes: 9 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type Config struct {
AdvertiseClientUrls string `toml:"advertise-client-urls" json:"advertise-client-urls"`
AdvertisePeerUrls string `toml:"advertise-peer-urls" json:"advertise-peer-urls"`

Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
ForceNewCluster bool `json:"force-new-cluster"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
ForceNewCluster bool `json:"force-new-cluster"`
EnableGRPCGateway bool `json:"enable-grpc-gateway"`

InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Expand Down Expand Up @@ -199,6 +200,7 @@ const (

defaultUseRegionStorage = true
defaultStrictlyMatchLabel = false
defaultEnableGRPCGateway = true
)

func adjustString(v *string, defValue string) {
Expand Down Expand Up @@ -427,6 +429,9 @@ func (c *Config) Adjust(meta *toml.MetaData) error {
if !configMetaData.IsDefined("enable-prevote") {
c.PreVote = true
}
if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = defaultEnableGRPCGateway
}
return nil
}

Expand Down Expand Up @@ -872,6 +877,7 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg.PeerTLSInfo.KeyFile = c.Security.KeyPath
cfg.ForceNewCluster = c.ForceNewCluster
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(c.logger, c.logger.Core(), c.logProps.Syncer)
cfg.EnableGRPCGateway = c.EnableGRPCGateway
cfg.Logger = "zap"
var err error

Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,9 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
opt.HotRegionScheduleLimit = mockoption.NewScheduleOptions().HotRegionScheduleLimit
opt.RegionScheduleLimit = 0
c.Assert(hb.Schedule(tc), HasLen, 1)
// Always produce operator
c.Assert(hb.Schedule(tc), HasLen, 1)
c.Assert(hb.Schedule(tc), HasLen, 1)

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
Expand Down
14 changes: 4 additions & 10 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,13 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster)
return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster)
}

func min(a, b uint64) uint64 {
if a < b {
return a
}
return b
}

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
h.opController.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.opController.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit())
return h.opController.OperatorCount(schedule.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit())
}

func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator {
Expand Down Expand Up @@ -443,7 +436,8 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt

avgRegionCount := hotRegionTotalCount / float64(len(storesStat))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
return uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
return maxUint64(limit, 1)
}

func (h *balanceHotRegionsScheduler) GetHotReadStatus() *statistics.StoreHotRegionInfos {
Expand Down
22 changes: 21 additions & 1 deletion server/statistics/store_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,35 @@ const (
)

// ScheduleOptions is an interface to access configurations.
// TODO: merge the Options to schedule.Options
type ScheduleOptions interface {
GetLocationLabels() []string
GetMaxStoreDownTime() time.Duration

GetLowSpaceRatio() float64
GetHighSpaceRatio() float64
GetTolerantSizeRatio() float64
GetStoreBalanceRate() float64

GetSchedulerMaxWaitingOperator() uint64
GetLeaderScheduleLimit(name string) uint64
GetRegionScheduleLimit(name string) uint64
GetReplicaScheduleLimit(name string) uint64
GetMergeScheduleLimit(name string) uint64
GetHotRegionScheduleLimit(name string) uint64
GetMaxReplicas(name string) int
GetHotRegionCacheHitsThreshold() int
GetMaxSnapshotCount() uint64
GetMaxPendingPeerCount() uint64
GetMaxMergeRegionSize() uint64
GetMaxMergeRegionKeys() uint64

IsRaftLearnerEnabled() bool
IsMakeUpReplicaEnabled() bool
IsRemoveExtraReplicaEnabled() bool
IsRemoveDownReplicaEnabled() bool
IsReplaceOfflineReplicaEnabled() bool

GetMaxStoreDownTime() time.Duration
}

type storeStatistics struct {
Expand Down Expand Up @@ -160,6 +173,13 @@ func (s *storeStatistics) Collect() {
configs["high_space_ratio"] = float64(s.opt.GetHighSpaceRatio())
configs["low_space_ratio"] = float64(s.opt.GetLowSpaceRatio())
configs["tolerant_size_ratio"] = float64(s.opt.GetTolerantSizeRatio())
configs["store-balance-rate"] = float64(s.opt.GetStoreBalanceRate())
configs["hot-region-schedule-limit"] = float64(s.opt.GetHotRegionScheduleLimit(s.namespace))
configs["hot-region-cache-hits-threshold"] = float64(s.opt.GetHotRegionCacheHitsThreshold())
configs["max-pending-peer-count"] = float64(s.opt.GetMaxPendingPeerCount())
configs["max-snapshot-count"] = float64(s.opt.GetMaxSnapshotCount())
configs["max-merge-region-size"] = float64(s.opt.GetMaxMergeRegionSize())
configs["max-merge-region-keys"] = float64(s.opt.GetMaxMergeRegionKeys())

var disableMakeUpReplica, disableLearner, disableRemoveDownReplica, disableRemoveExtraReplica, disableReplaceOfflineReplica float64
if !s.opt.IsMakeUpReplicaEnabled() {
Expand Down

0 comments on commit 0f2c85e

Please sign in to comment.