diff --git a/pkg/report/report.go b/pkg/report/report.go index 4d138f9744e..945ce2787c3 100644 --- a/pkg/report/report.go +++ b/pkg/report/report.go @@ -83,7 +83,7 @@ func NewReport(precision string) Report { return newReport(precision) } func newReport(precision string) *report { r := &report{ - results: make(chan Result, 16), + results: make(chan Result, 65536), precision: precision, } r.stats.ErrorDist = make(map[string]int) diff --git a/tools/benchmark/cmd/watch_latency.go b/tools/benchmark/cmd/watch_latency.go index 67db42755a7..f63c412ce32 100644 --- a/tools/benchmark/cmd/watch_latency.go +++ b/tools/benchmark/cmd/watch_latency.go @@ -18,11 +18,11 @@ import ( "context" "fmt" "os" + "sync/atomic" "time" "github.com/cheggaaa/pb/v3" "github.com/spf13/cobra" - "golang.org/x/time/rate" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/report" @@ -61,7 +61,7 @@ func init() { } func watchLatencyFunc(_ *cobra.Command, _ []string) { - key := string(mustRandBytes(watchLKeySize)) + key := "/registry/pods" value := string(mustRandBytes(watchLValueSize)) wchs := setupWatchChannels(key) putClient := mustCreateConn() @@ -69,15 +69,8 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) { bar = pb.New(watchLPutTotal * len(wchs)) bar.Start() - limiter := rate.NewLimiter(rate.Limit(watchLPutRate), watchLPutRate) - - putTimes := make([]time.Time, watchLPutTotal) - eventTimes := make([][]time.Time, len(wchs)) - - for i, wch := range wchs { + for _, wch := range wchs { wch := wch - i := i - eventTimes[i] = make([]time.Time, watchLPutTotal) wg.Add(1) go func() { defer wg.Done() @@ -85,7 +78,6 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) { for eventCount < watchLPutTotal { resp := <-wch for range resp.Events { - eventTimes[i][eventCount] = time.Now() eventCount++ bar.Increment() } @@ -95,40 +87,30 @@ func watchLatencyFunc(_ *cobra.Command, _ []string) { putReport := newReport() putReportResults := putReport.Run() - watchReport := newReport() - watchReportResults := watchReport.Run() - for i := 0; i < watchLPutTotal; i++ { - // limit key put as per reqRate - if err := limiter.Wait(context.TODO()); err != nil { - break - } - start := time.Now() - if _, err := putClient.Put(context.TODO(), key, value); err != nil { - fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err) - os.Exit(1) - } - end := time.Now() - putReport.Results() <- report.Result{Start: start, End: end} - putTimes[i] = end + + var putCount atomic.Uint64 + for i := 0; i < watchLPutRate; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + if putCount.Load() >= uint64(watchLPutTotal) { + return + } + start := time.Now() + if _, err := putClient.Put(context.TODO(), key, value); err != nil { + fmt.Fprintf(os.Stderr, "Failed to Put for watch latency benchmark: %v\n", err) + } + end := time.Now() + putReport.Results() <- report.Result{Start: start, End: end} + putCount.Add(1) + } + }() } wg.Wait() close(putReport.Results()) bar.Finish() fmt.Printf("\nPut summary:\n%s", <-putReportResults) - - for i := 0; i < len(wchs); i++ { - for j := 0; j < watchLPutTotal; j++ { - start := putTimes[j] - end := eventTimes[i][j] - if end.Before(start) { - start = end - } - watchReport.Results() <- report.Result{Start: start, End: end} - } - } - - close(watchReport.Results()) - fmt.Printf("\nWatch events summary:\n%s", <-watchReportResults) } func setupWatchChannels(key string) []clientv3.WatchChan {