From 1148decba653e6c5f286a3cdf0221d9fb38aa8db Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Wed, 5 Jul 2023 00:09:43 -0700 Subject: [PATCH] tso: support multi-keyspace, fault injection and keyspace-name in pd-tso-bench (#6608) ref tikv/pd#5895 support multi-keyspace, fault injection and keyspace-name in pd-tso-bench Signed-off-by: Bin Shi Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/tso_service_discovery.go | 8 ++ tools/pd-tso-bench/main.go | 157 ++++++++++++++++++++++---------- 2 files changed, 119 insertions(+), 46 deletions(-) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index c35809a347b3..2aeb49e15230 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -469,6 +469,14 @@ func (c *tsoServiceDiscovery) updateMember() error { } } + oldGroupID := c.GetKeyspaceGroupID() + if oldGroupID != keyspaceGroup.Id { + log.Info("[tso] the keyspace group changed", + zap.Uint32("keyspace-id", keyspaceGroup.Id), + zap.Uint32("new-keyspace-group-id", keyspaceGroup.Id), + zap.Uint32("old-keyspace-group-id", oldGroupID)) + } + // Initialize the serving addresses from the returned keyspace group info. primaryAddr := "" secondaryAddrs := make([]string, 0) diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index e82a9dcabba4..236e78c78088 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -19,6 +19,7 @@ import ( "flag" "fmt" "io" + "math/rand" "net/http" "net/http/httptest" "os" @@ -43,20 +44,25 @@ const ( ) var ( - pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address") - clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark") - concurrency = flag.Int("c", 1000, "concurrency") - count = flag.Int("count", 1, "the count number that the test will run") - duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last") - dcLocation = flag.String("dc", "global", "which dc-location this bench will request") - verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end") - interval = flag.Duration("interval", time.Second, "interval to output the statistics") - caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs") - certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format") - keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format") - maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval") - enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy") - wg sync.WaitGroup + pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address") + clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark") + concurrency = flag.Int("c", 1000, "concurrency") + count = flag.Int("count", 1, "the count number that the test will run") + duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last") + dcLocation = flag.String("dc", "global", "which dc-location this bench will request") + verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end") + interval = flag.Duration("interval", time.Second, "interval to output the statistics") + caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs") + certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format") + keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format") + maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval") + enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy") + enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection") + faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate") + maxTSOSendIntervalMilliseconds = flag.Int("max-send-interval-ms", 0, "max tso send interval in milliseconds, 60s by default") + keyspaceID = flag.Uint("keyspace-id", 0, "the id of the keyspace to access") + keyspaceName = flag.String("keyspace-name", "", "the name of the keyspace to access") + wg sync.WaitGroup ) var promServer *httptest.Server @@ -97,26 +103,7 @@ func bench(mainCtx context.Context) { fmt.Printf("Create %d client(s) for benchmark\n", *clientNumber) pdClients := make([]pd.Client, *clientNumber) for idx := range pdClients { - var ( - pdCli pd.Client - err error - ) - - opt := pd.WithGRPCDialOptions( - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: keepaliveTime, - Timeout: keepaliveTimeout, - }), - ) - - pdCli, err = pd.NewClientWithContext(mainCtx, []string{*pdAddrs}, pd.SecurityOption{ - CAPath: *caPath, - CertPath: *certPath, - KeyPath: *keyPath, - }, opt) - - pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) - pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy) + pdCli, err := createPDClient(mainCtx) if err != nil { log.Fatal(fmt.Sprintf("create pd client #%d failed: %v", idx, err)) } @@ -134,10 +121,18 @@ func bench(mainCtx context.Context) { durCh := make(chan time.Duration, 2*(*concurrency)*(*clientNumber)) - wg.Add((*concurrency) * (*clientNumber)) - for _, pdCli := range pdClients { - for i := 0; i < *concurrency; i++ { - go reqWorker(ctx, pdCli, durCh) + if *enableFaultInjection { + fmt.Printf("Enable fault injection, failure rate: %f\n", *faultInjectionRate) + wg.Add(*clientNumber) + for i := 0; i < *clientNumber; i++ { + go reqWorker(ctx, pdClients, i, durCh) + } + } else { + wg.Add((*concurrency) * (*clientNumber)) + for i := 0; i < *clientNumber; i++ { + for j := 0; j < *concurrency; j++ { + go reqWorker(ctx, pdClients, i, durCh) + } } } @@ -352,22 +347,51 @@ func (s *stats) calculate(count int) float64 { return float64(count) * 100 / float64(s.count) } -func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) { +func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh chan time.Duration) { defer wg.Done() reqCtx, cancel := context.WithCancel(ctx) defer cancel() + var ( + err error + maxRetryTime int = 120 + sleepIntervalOnFailure time.Duration = 1000 * time.Millisecond + totalSleepBeforeGetTS time.Duration + ) + pdCli := pdClients[clientIdx] for { + if pdCli == nil || (*enableFaultInjection && shouldInjectFault()) { + if pdCli != nil { + pdCli.Close() + } + pdCli, err = createPDClient(ctx) + if err != nil { + log.Error(fmt.Sprintf("re-create pd client #%d failed: %v", clientIdx, err)) + select { + case <-reqCtx.Done(): + case <-time.After(100 * time.Millisecond): + } + continue + } + pdClients[clientIdx] = pdCli + } + + totalSleepBeforeGetTS = 0 start := time.Now() - var ( - i int32 - err error - maxRetryTime int32 = 50 - sleepIntervalOnFailure time.Duration = 100 * time.Millisecond - ) + i := 0 for ; i < maxRetryTime; i++ { + if *maxTSOSendIntervalMilliseconds > 0 { + sleepBeforeGetTS := time.Duration(rand.Intn(*maxTSOSendIntervalMilliseconds)) * time.Millisecond + ticker := time.NewTicker(sleepBeforeGetTS) + defer ticker.Stop() + select { + case <-reqCtx.Done(): + case <-ticker.C: + totalSleepBeforeGetTS += sleepBeforeGetTS + } + } _, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation) if errors.Cause(err) == context.Canceled { return @@ -381,7 +405,7 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) { if err != nil { log.Fatal(fmt.Sprintf("%v", err)) } - dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure + dur := time.Since(start) - time.Duration(i)*sleepIntervalOnFailure - totalSleepBeforeGetTS select { case <-reqCtx.Done(): @@ -390,3 +414,44 @@ func reqWorker(ctx context.Context, pdCli pd.Client, durCh chan time.Duration) { } } } + +func createPDClient(ctx context.Context) (pd.Client, error) { + var ( + pdCli pd.Client + err error + ) + + opts := make([]pd.ClientOption, 0) + opts = append(opts, pd.WithGRPCDialOptions( + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: keepaliveTime, + Timeout: keepaliveTimeout, + }), + )) + + if len(*keyspaceName) > 0 { + apiCtx := pd.NewAPIContextV2(*keyspaceName) + pdCli, err = pd.NewClientWithAPIContext(ctx, apiCtx, []string{*pdAddrs}, pd.SecurityOption{ + CAPath: *caPath, + CertPath: *certPath, + KeyPath: *keyPath, + }, opts...) + } else { + pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspaceID), []string{*pdAddrs}, pd.SecurityOption{ + CAPath: *caPath, + CertPath: *certPath, + KeyPath: *keyPath, + }, opts...) + } + if err != nil { + return nil, err + } + + pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) + pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy) + return pdCli, err +} + +func shouldInjectFault() bool { + return rand.Intn(10000) < int(*faultInjectionRate*10000) +}