Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: support multi-keyspace, fault injection and keyspace-name in pd-tso-bench #6608

Merged
merged 4 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldGroupID := c.GetKeyspaceGroupID()
if oldGroupID != keyspaceGroup.Id {
log.Info("[tso] the keyspace group changed",
zap.Uint32("keyspace-id", keyspaceGroup.Id),
zap.Uint32("new-keyspace-group-id", keyspaceGroup.Id),
zap.Uint32("old-keyspace-group-id", oldGroupID))
}

// Initialize the serving addresses from the returned keyspace group info.
primaryAddr := ""
secondaryAddrs := make([]string, 0)
Expand Down
157 changes: 111 additions & 46 deletions tools/pd-tso-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"flag"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -43,20 +44,25 @@ const (
)

var (
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
wg sync.WaitGroup
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection")
faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate")
maxTSOSendIntervalMilliseconds = flag.Int("max-send-interval-ms", 0, "max tso send interval in milliseconds, 60s by default")
keyspaceID = flag.Uint("keyspace-id", 0, "the id of the keyspace to access")
keyspaceName = flag.String("keyspace-name", "", "the name of the keyspace to access")
wg sync.WaitGroup
)

var promServer *httptest.Server
Expand Down Expand Up @@ -97,26 +103,7 @@ func bench(mainCtx context.Context) {
fmt.Printf("Create %d client(s) for benchmark\n", *clientNumber)
pdClients := make([]pd.Client, *clientNumber)
for idx := range pdClients {
var (
pdCli pd.Client
err error
)

opt := pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
)

pdCli, err = pd.NewClientWithContext(mainCtx, []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opt)

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
pdCli, err := createPDClient(mainCtx)
if err != nil {
log.Fatal(fmt.Sprintf("create pd client #%d failed: %v", idx, err))
}
Expand All @@ -134,10 +121,18 @@ func bench(mainCtx context.Context) {

durCh := make(chan time.Duration, 2*(*concurrency)*(*clientNumber))

wg.Add((*concurrency) * (*clientNumber))
for _, pdCli := range pdClients {
for i := 0; i < *concurrency; i++ {
go reqWorker(ctx, pdCli, durCh)
if *enableFaultInjection {
fmt.Printf("Enable fault injection, failure rate: %f\n", *faultInjectionRate)
wg.Add(*clientNumber)
for i := 0; i < *clientNumber; i++ {
go reqWorker(ctx, pdClients, i, durCh)
}
} else {
wg.Add((*concurrency) * (*clientNumber))
for i := 0; i < *clientNumber; i++ {
for j := 0; j < *concurrency; j++ {
go reqWorker(ctx, pdClients, i, durCh)
}
}
}

Expand Down Expand Up @@ -352,22 +347,51 @@ func (s *stats) calculate(count int) float64 {
return float64(count) * 100 / float64(s.count)
}

func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh chan time.Duration) {
defer wg.Done()

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
err error
maxRetryTime int = 120
sleepIntervalOnFailure time.Duration = 1000 * time.Millisecond
totalSleepBeforeGetTS time.Duration
)
pdCli := pdClients[clientIdx]

for {
if pdCli == nil || (*enableFaultInjection && shouldInjectFault()) {
if pdCli != nil {
pdCli.Close()
}
pdCli, err = createPDClient(ctx)
if err != nil {
log.Error(fmt.Sprintf("re-create pd client #%d failed: %v", clientIdx, err))
select {
case <-reqCtx.Done():
case <-time.After(100 * time.Millisecond):
}
continue
}
pdClients[clientIdx] = pdCli
}

totalSleepBeforeGetTS = 0
start := time.Now()

var (
i int32
err error
maxRetryTime int32 = 50
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
)
i := 0
for ; i < maxRetryTime; i++ {
if *maxTSOSendIntervalMilliseconds > 0 {
sleepBeforeGetTS := time.Duration(rand.Intn(*maxTSOSendIntervalMilliseconds)) * time.Millisecond
ticker := time.NewTicker(sleepBeforeGetTS)
defer ticker.Stop()
select {
case <-reqCtx.Done():
case <-ticker.C:
totalSleepBeforeGetTS += sleepBeforeGetTS
}
}
_, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation)
if errors.Cause(err) == context.Canceled {
return
Expand All @@ -381,7 +405,7 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
if err != nil {
log.Fatal(fmt.Sprintf("%v", err))
}
dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure
dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure - totalSleepBeforeGetTS

select {
case <-reqCtx.Done():
Expand All @@ -390,3 +414,44 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) {
}
}
}

func createPDClient(ctx context.Context) (pd.Client, error) {
var (
pdCli pd.Client
err error
)

opts := make([]pd.ClientOption, 0)
opts = append(opts, pd.WithGRPCDialOptions(
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
}),
))

if len(*keyspaceName) > 0 {
apiCtx := pd.NewAPIContextV2(*keyspaceName)
pdCli, err = pd.NewClientWithAPIContext(ctx, apiCtx, []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opts...)
} else {
pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspaceID), []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
}, opts...)
}
if err != nil {
return nil, err
}

pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval)
pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy)
return pdCli, err
}

func shouldInjectFault() bool {
return rand.Intn(10000) < int(*faultInjectionRate*10000)
}