Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 2, 2024
1 parent b917461 commit 8773f6a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 6 deletions.
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
ClusterStatus = "/pd/api/v1/cluster/status"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
operators = "/pd/api/v1/operators"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
)
Expand Down
9 changes: 9 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type Client interface {
GetPDVersion(context.Context) (string, error)
/* Micro Service interfaces */
GetMicroServiceMembers(context.Context, string) ([]string, error)
DeleteOperators(context.Context) error

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
Expand Down Expand Up @@ -879,3 +880,11 @@ func (c *client) GetPDVersion(ctx context.Context) (string, error) {
WithResp(&ver))
return ver.Version, err
}

// DeleteOperators deletes the running operators.
func (c *client) DeleteOperators(ctx context.Context) error {
return c.request(ctx, newRequestInfo().
WithName(deleteOperators).
WithURI(operators).
WithMethod(http.MethodDelete))

Check warning on line 889 in client/http/interface.go

View check run for this annotation

Codecov / codecov/patch

client/http/interface.go#L886-L889

Added lines #L886 - L889 were not covered by tests
}
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const (
resetBaseAllocIDName = "ResetBaseAllocID"
setSnapshotRecoveringMarkName = "SetSnapshotRecoveringMark"
deleteSnapshotRecoveringMarkName = "DeleteSnapshotRecoveringMark"
deleteOperators = "DeleteOperators"
)

type requestInfo struct {
Expand Down
65 changes: 59 additions & 6 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"io"
"math/rand"
Expand All @@ -38,8 +39,11 @@ import (
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/client/grpcutil"
pdHttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/pkg/codec"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/tools/pd-heartbeat-bench/config"
"go.etcd.io/etcd/pkg/report"
Expand Down Expand Up @@ -271,13 +275,13 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options) {
for _, i := range rs.updateFlow {
region := rs.regions[i]
if region.Leader.StoreId <= uint64(options.GetHotStoreCount()) {
region.BytesWritten = uint64(hotByteUnit * (1 + rand.Float64()))
region.BytesRead = uint64(hotByteUnit * (1 + rand.Float64()))
region.KeysWritten = uint64(hotKeysUint * (1 + rand.Float64()))
region.KeysRead = uint64(hotKeysUint * (1 + rand.Float64()))
region.BytesWritten = uint64(hotByteUnit * (1 + rand.Float64()) * 60)
region.BytesRead = uint64(hotByteUnit * (1 + rand.Float64()) * 10)
region.KeysWritten = uint64(hotKeysUint * (1 + rand.Float64()) * 60)
region.KeysRead = uint64(hotKeysUint * (1 + rand.Float64()) * 10)
region.QueryStats = &pdpb.QueryStats{
Get: uint64(hotQueryUnit * (1 + rand.Float64())),
Put: uint64(hotQueryUnit * (1 + rand.Float64())),
Get: uint64(hotQueryUnit * (1 + rand.Float64()) * 10),
Put: uint64(hotQueryUnit * (1 + rand.Float64()) * 60),
}
} else {
region.BytesWritten = uint64(bytesUnit * rand.Float64())
Expand Down Expand Up @@ -451,6 +455,7 @@ func pick(slice []int, total int, ratio float64) []int {

func main() {
rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
statistics.Denoising = false
cfg := config.NewConfig()
err := cfg.Parse(os.Args[1:])
defer logutil.LogPanic()
Expand Down Expand Up @@ -491,6 +496,7 @@ func main() {
if err != nil {
log.Fatal("create client error", zap.Error(err))
}

initClusterID(ctx, cli)
go runHTTPServer(cfg, options)
regions := new(Regions)
Expand All @@ -501,6 +507,8 @@ func main() {
bootstrap(ctx, cli)
putStores(ctx, cfg, cli, stores)
log.Info("finish put stores")
httpCli := pdHttp.NewClient("tools-heartbeat-bench", []string{cfg.PDAddr}, pdHttp.WithTLSConfig(loadTLSConfig(cfg)))
go deleteOperators(ctx, httpCli)
streams := make(map[uint64]pdpb.PD_RegionHeartbeatClient, cfg.StoreCount)
for i := 1; i <= cfg.StoreCount; i++ {
streams[uint64(i)] = createHeartbeatStream(ctx, cfg)
Expand Down Expand Up @@ -555,6 +563,22 @@ func exit(code int) {
os.Exit(code)
}

func deleteOperators(ctx context.Context, httpCli pdHttp.Client) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := httpCli.DeleteOperators(ctx)
if err != nil {
log.Error("fail to delete operators", zap.Error(err))
}
}
}
}

func newReport(cfg *config.Config) report.Report {
p := "%4.4f"
if cfg.Sample {
Expand Down Expand Up @@ -633,3 +657,32 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
})
engine.Run(cfg.StatusAddr)
}

func loadTLSConfig(cfg *config.Config) *tls.Config {
if len(cfg.Security.CAPath) == 0 {
return nil
}
caData, err := os.ReadFile(cfg.Security.CAPath)
if err != nil {
log.Error("fail to read ca file", zap.Error(err))
}
certData, err := os.ReadFile(cfg.Security.CertPath)
if err != nil {
log.Error("fail to read cert file", zap.Error(err))
}
keyData, err := os.ReadFile(cfg.Security.KeyPath)
if err != nil {
log.Error("fail to read key file", zap.Error(err))
}

tlsConf, err := tlsutil.TLSConfig{
SSLCABytes: caData,
SSLCertBytes: certData,
SSLKEYBytes: keyData,
}.ToTLSConfig()
if err != nil {
log.Fatal("failed to load tlc config", zap.Error(err))
}

return tlsConf
}

0 comments on commit 8773f6a

Please sign in to comment.