Skip to content

Commit

Permalink
tools: support tls and refactor (#7745)
Browse files Browse the repository at this point in the history
ref #7703

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jan 24, 2024
1 parent 1a38582 commit ca8fd3d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 81 deletions.
6 changes: 3 additions & 3 deletions tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ key-length = 56
replica = 3

leader-update-ratio = 0.06
epoch-update-ratio = 0.04
space-update-ratio = 0.15
flow-update-ratio = 0.35
epoch-update-ratio = 0.0
space-update-ratio = 0.0
flow-update-ratio = 0.0
no-update-ratio = 0.0

sample = false
51 changes: 26 additions & 25 deletions tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"math"
"sync/atomic"

"github.com/BurntSushi/toml"
Expand All @@ -21,7 +20,7 @@ const (
defaultEpochUpdateRatio = 0.04
defaultSpaceUpdateRatio = 0.15
defaultFlowUpdateRatio = 0.35
defaultNoUpdateRatio = 0
defaultReportRatio = 1
defaultRound = 0
defaultSample = false

Expand All @@ -39,6 +38,8 @@ type Config struct {
Logger *zap.Logger
LogProps *log.ZapProperties

Security configutil.SecurityConfig `toml:"security" json:"security"`

StoreCount int `toml:"store-count" json:"store-count"`
RegionCount int `toml:"region-count" json:"region-count"`
KeyLength int `toml:"key-length" json:"key-length"`
Expand All @@ -47,7 +48,7 @@ type Config struct {
EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"`
SpaceUpdateRatio float64 `toml:"space-update-ratio" json:"space-update-ratio"`
FlowUpdateRatio float64 `toml:"flow-update-ratio" json:"flow-update-ratio"`
NoUpdateRatio float64 `toml:"no-update-ratio" json:"no-update-ratio"`
ReportRatio float64 `toml:"report-ratio" json:"report-ratio"`
Sample bool `toml:"sample" json:"sample"`
Round int `toml:"round" json:"round"`
}
Expand All @@ -62,6 +63,9 @@ func NewConfig() *Config {
fs.StringVar(&cfg.PDAddr, "pd-endpoints", "127.0.0.1:2379", "pd address")
fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path")
fs.StringVar(&cfg.StatusAddr, "status-addr", "127.0.0.1:20180", "status address")
fs.StringVar(&cfg.Security.CAPath, "cacert", "", "path of file that contains list of trusted TLS CAs")
fs.StringVar(&cfg.Security.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format")
fs.StringVar(&cfg.Security.KeyPath, "key", "", "path of file that contains X509 key in PEM format")

return cfg
}
Expand Down Expand Up @@ -133,8 +137,8 @@ func (c *Config) Adjust(meta *toml.MetaData) {
if !meta.IsDefined("flow-update-ratio") {
configutil.AdjustFloat64(&c.FlowUpdateRatio, defaultFlowUpdateRatio)
}
if !meta.IsDefined("no-update-ratio") {
configutil.AdjustFloat64(&c.NoUpdateRatio, defaultNoUpdateRatio)
if !meta.IsDefined("report-ratio") {
configutil.AdjustFloat64(&c.ReportRatio, defaultReportRatio)
}
if !meta.IsDefined("sample") {
c.Sample = defaultSample
Expand All @@ -143,24 +147,20 @@ func (c *Config) Adjust(meta *toml.MetaData) {

// Validate is used to validate configurations
func (c *Config) Validate() error {
if c.LeaderUpdateRatio < 0 || c.LeaderUpdateRatio > 1 {
return errors.Errorf("leader-update-ratio must be in [0, 1]")
}
if c.EpochUpdateRatio < 0 || c.EpochUpdateRatio > 1 {
return errors.Errorf("epoch-update-ratio must be in [0, 1]")
if c.ReportRatio < 0 || c.ReportRatio > 1 {
return errors.Errorf("report-ratio must be in [0, 1]")
}
if c.SpaceUpdateRatio < 0 || c.SpaceUpdateRatio > 1 {
return errors.Errorf("space-update-ratio must be in [0, 1]")
if c.LeaderUpdateRatio > c.ReportRatio || c.LeaderUpdateRatio < 0 {
return errors.Errorf("leader-update-ratio can not be negative or larger than report-ratio")
}
if c.FlowUpdateRatio < 0 || c.FlowUpdateRatio > 1 {
return errors.Errorf("flow-update-ratio must be in [0, 1]")
if c.EpochUpdateRatio > c.ReportRatio || c.EpochUpdateRatio < 0 {
return errors.Errorf("epoch-update-ratio can not be negative or larger than report-ratio")
}
if c.NoUpdateRatio < 0 || c.NoUpdateRatio > 1 {
return errors.Errorf("no-update-ratio must be in [0, 1]")
if c.SpaceUpdateRatio > c.ReportRatio || c.SpaceUpdateRatio < 0 {
return errors.Errorf("space-update-ratio can not be negative or larger than report-ratio")
}
max := math.Max(c.LeaderUpdateRatio, math.Max(c.EpochUpdateRatio, math.Max(c.SpaceUpdateRatio, c.FlowUpdateRatio)))
if max+c.NoUpdateRatio > 1 {
return errors.Errorf("sum of update-ratio must be in [0, 1]")
if c.FlowUpdateRatio > c.ReportRatio || c.FlowUpdateRatio < 0 {
return errors.Errorf("flow-update-ratio can not be negative or larger than report-ratio")
}
return nil
}
Expand All @@ -174,11 +174,12 @@ func (c *Config) Clone() *Config {

// Options is the option of the heartbeat-bench.
type Options struct {
ReportRatio atomic.Value

LeaderUpdateRatio atomic.Value
EpochUpdateRatio atomic.Value
SpaceUpdateRatio atomic.Value
FlowUpdateRatio atomic.Value
NoUpdateRatio atomic.Value
}

// NewOptions creates a new option.
Expand All @@ -188,7 +189,7 @@ func NewOptions(cfg *Config) *Options {
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
o.ReportRatio.Store(cfg.ReportRatio)
return o
}

Expand All @@ -212,9 +213,9 @@ func (o *Options) GetFlowUpdateRatio() float64 {
return o.FlowUpdateRatio.Load().(float64)
}

// GetNoUpdateRatio returns the no update ratio.
func (o *Options) GetNoUpdateRatio() float64 {
return o.NoUpdateRatio.Load().(float64)
// GetReportRatio returns the report ratio.
func (o *Options) GetReportRatio() float64 {
return o.ReportRatio.Load().(float64)
}

// SetOptions sets the option.
Expand All @@ -223,5 +224,5 @@ func (o *Options) SetOptions(cfg *Config) {
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
o.ReportRatio.Store(cfg.ReportRatio)
}
98 changes: 45 additions & 53 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand All @@ -38,12 +37,12 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/tools/pd-heartbeat-bench/config"
"go.etcd.io/etcd/pkg/report"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
Expand All @@ -57,19 +56,16 @@ const (

var clusterID uint64

func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, "http://")
str = strings.TrimPrefix(str, "https://")
return str
}

func newClient(cfg *config.Config) pdpb.PDClient {
addr := trimHTTPPrefix(cfg.PDAddr)
cc, err := grpc.Dial(addr, grpc.WithInsecure())
func newClient(ctx context.Context, cfg *config.Config) (pdpb.PDClient, error) {
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
log.Fatal("failed to create gRPC connection", zap.Error(err))
return nil, err
}
return pdpb.NewPDClient(cc)
cc, err := grpcutil.GetClientConn(ctx, cfg.PDAddr, tlsConfig)
if err != nil {
return nil, err
}
return pdpb.NewPDClient(cc), nil
}

func initClusterID(ctx context.Context, cli pdpb.PDClient) {
Expand Down Expand Up @@ -255,31 +251,32 @@ func (rs *Regions) init(cfg *config.Config, options *config.Options) []int {
func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) {
rs.updateRound += 1

rs.updateLeader = pick(indexes, cfg, options.GetLeaderUpdateRatio())
rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio())
rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio())
updatedRegionsMap := make(map[int]*pdpb.RegionHeartbeatRequest)
var awakenRegions []*pdpb.RegionHeartbeatRequest
reportRegions := pick(indexes, cfg.RegionCount, options.GetReportRatio())
reportCount := len(reportRegions)
rs.updateLeader = pick(reportRegions, reportCount, options.GetLeaderUpdateRatio())
rs.updateEpoch = pick(reportRegions, reportCount, options.GetEpochUpdateRatio())
rs.updateSpace = pick(reportRegions, reportCount, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(reportRegions, reportCount, options.GetFlowUpdateRatio())
var (
updatedStatisticsMap = make(map[int]*pdpb.RegionHeartbeatRequest)
awakenRegions []*pdpb.RegionHeartbeatRequest
)

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica]
updatedRegionsMap[i] = region
}
// update epoch
for _, i := range rs.updateEpoch {
region := rs.regions[i]
region.Region.RegionEpoch.Version += 1
updatedRegionsMap[i] = region
}
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize = uint64(bytesUnit * rand.Float64())
region.ApproximateKeys = uint64(keysUint * rand.Float64())
updatedRegionsMap[i] = region
}
// update flow
for _, i := range rs.updateFlow {
Expand All @@ -292,25 +289,34 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
}
updatedRegionsMap[i] = region
updatedStatisticsMap[i] = region
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval
}
for _, region := range updatedRegionsMap {
for _, i := range reportRegions {
region := rs.regions[i]
// reset the statistics of the region which is not updated
if _, exist := updatedStatisticsMap[i]; !exist {
region.BytesWritten = 0
region.BytesRead = 0
region.KeysWritten = 0
region.KeysRead = 0
region.QueryStats = &pdpb.QueryStats{}
}
awakenRegions = append(awakenRegions, region)
}
noUpdatedRegions := pickNoUpdatedRegions(indexes, cfg, options.GetNoUpdateRatio(), updatedRegionsMap)
for _, i := range noUpdatedRegions {
awakenRegions = append(awakenRegions, rs.regions[i])
}

rs.awakenRegions.Store(awakenRegions)
}

func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient {
cli := newClient(cfg)
cli, err := newClient(ctx, cfg)
if err != nil {
log.Fatal("create client error", zap.Error(err))
}
stream, err := cli.RegionHeartbeat(ctx)
if err != nil {
log.Fatal("create stream error", zap.Error(err))
Expand Down Expand Up @@ -359,7 +365,7 @@ func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_Regi
return
}
}
log.Info("store finish one round region heartbeat", zap.Uint64("store-id", storeID), zap.Duration("cost-time", time.Since(start)))
log.Info("store finish one round region heartbeat", zap.Uint64("store-id", storeID), zap.Duration("cost-time", time.Since(start)), zap.Int("reported-region-count", len(regions)))
}

// Stores contains store stats with lock.
Expand Down Expand Up @@ -425,28 +431,11 @@ func (s *Stores) update(rs *Regions) {
}
}

func pick(slice []int, cfg *config.Config, ratio float64) []int {
rand.Shuffle(cfg.RegionCount, func(i, j int) {
func pick(slice []int, total int, ratio float64) []int {
rand.Shuffle(total, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...)
}

func pickNoUpdatedRegions(slice []int, cfg *config.Config, ratio float64, updatedMap map[int]*pdpb.RegionHeartbeatRequest) []int {
if ratio == 0 {
return nil
}
rand.Shuffle(cfg.RegionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
NoUpdatedRegionsNum := int(float64(cfg.RegionCount) * ratio)
res := make([]int, 0, NoUpdatedRegionsNum)
for i := 0; len(res) < NoUpdatedRegionsNum; i++ {
if _, ok := updatedMap[slice[i]]; !ok {
res = append(res, slice[i])
}
}
return res
return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...)
}

func main() {
Expand Down Expand Up @@ -487,7 +476,10 @@ func main() {
sig = <-sc
cancel()
}()
cli := newClient(cfg)
cli, err := newClient(ctx, cfg)
if err != nil {
log.Fatal("create client error", zap.Error(err))
}
initClusterID(ctx, cli)
go runHTTPServer(cfg, options)
regions := new(Regions)
Expand Down Expand Up @@ -604,7 +596,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio()
newCfg.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
newCfg.NoUpdateRatio = options.GetNoUpdateRatio()
newCfg.ReportRatio = options.GetReportRatio()
if err := c.BindJSON(&newCfg); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
Expand All @@ -622,7 +614,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
output.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
output.EpochUpdateRatio = options.GetEpochUpdateRatio()
output.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
output.NoUpdateRatio = options.GetNoUpdateRatio()
output.ReportRatio = options.GetReportRatio()

c.IndentedJSON(http.StatusOK, output)
})
Expand Down

0 comments on commit ca8fd3d

Please sign in to comment.