From aaa1bb75f565cbc7b71a597f87bb2087e253e3a8 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 27 Jul 2020 09:51:12 +0800 Subject: [PATCH 1/4] worker: simplify code - remove "throttle", use time.Ticker channel directly - simplify worker.flush by using context - create spans directly, rather than in a short lived goroutine which is waited upon: --- worker/work.go | 70 +++++++++++++------------------------------------- 1 file changed, 18 insertions(+), 52 deletions(-) diff --git a/worker/work.go b/worker/work.go index 7bb4df92..2d8fd59f 100644 --- a/worker/work.go +++ b/worker/work.go @@ -8,14 +8,12 @@ import ( "os" "os/signal" "strconv" - "sync" "time" "github.com/elastic/hey-apm/internal/heptio/workgroup" "github.com/elastic/hey-apm/agent" - "go.elastic.co/apm" "go.elastic.co/apm/stacktrace" ) @@ -56,23 +54,18 @@ func (w *worker) work() (Result, error) { // flush ensures that the entire workload defined is pushed to the apm-server, within the worker timeout limit. func (w *worker) flush() { - flushed := make(chan struct{}) - go func() { - w.Flush(nil) - close(flushed) - }() - - flushWait := time.After(w.FlushTimeout) - if w.FlushTimeout == 0 { - flushWait = make(<-chan time.Time) + defer w.Close() + + ctx := context.Background() + if w.FlushTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, w.FlushTimeout) + defer cancel() } - select { - case <-flushed: - case <-flushWait: - // give up waiting for flush + w.Flush(ctx.Done()) + if ctx.Err() != nil { w.Errorf("timed out waiting for flush to complete") } - w.Close() } type generatedErr struct { @@ -104,18 +97,16 @@ func (w *worker) addErrors(frequency time.Duration, limit, framesMin, framesMax if limit <= 0 { return } - t := throttle(time.NewTicker(frequency).C) w.Add(func(done <-chan struct{}) error { - var count int - for count < limit { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + for i := 0; i < limit; i++ { select { case <-done: return nil - case <-t: + case <-ticker.C: } - w.Tracer.NewError(&generatedErr{frames: rand.Intn(framesMax-framesMin+1) + framesMin}).Send() - count++ } return nil }) @@ -125,36 +116,22 @@ func (w *worker) addTransactions(frequency time.Duration, limit, spanMin, spanMa if limit <= 0 { return } - t := throttle(time.NewTicker(frequency).C) - generateSpan := func(ctx context.Context) { - span, ctx := apm.StartSpan(ctx, "I'm a span", "gen.era.ted") - span.End() - } - generator := func(done <-chan struct{}) error { - var count int - for count < limit { + ticker := time.NewTicker(frequency) + defer ticker.Stop() + for i := 0; i < limit; i++ { select { case <-done: return nil - case <-t: + case <-ticker.C: } - tx := w.Tracer.StartTransaction("generated", "gen") - ctx := apm.ContextWithTransaction(context.Background(), tx) - var wg sync.WaitGroup spanCount := rand.Intn(spanMax-spanMin+1) + spanMin for i := 0; i < spanCount; i++ { - wg.Add(1) - go func() { - generateSpan(ctx) - wg.Done() - }() + tx.StartSpan("I'm a span", "gen.era.ted", nil).End() } - wg.Wait() tx.Context.SetTag("spans", strconv.Itoa(spanCount)) tx.End() - count++ } return nil } @@ -173,14 +150,3 @@ func (w *worker) addSignalHandling() { } }) } - -// throttle converts a time ticker to a channel of things. -func throttle(c <-chan time.Time) chan interface{} { - throttle := make(chan interface{}) - go func() { - for range c { - throttle <- struct{}{} - } - }() - return throttle -} From dc1fef3fa30e463bf764157a811220674ae2e8ab Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 27 Jul 2020 10:51:37 +0800 Subject: [PATCH 2/4] worker: simplify worker to single loop Sending events should be fast enough that there's no need for parallelism, and the concurrency is complicating things. --- internal/heptio/workgroup/LICENSE | 201 ----------------------------- internal/heptio/workgroup/group.go | 61 --------- worker/run.go | 16 ++- worker/work.go | 151 ++++++++++++---------- 4 files changed, 94 insertions(+), 335 deletions(-) delete mode 100644 internal/heptio/workgroup/LICENSE delete mode 100644 internal/heptio/workgroup/group.go diff --git a/internal/heptio/workgroup/LICENSE b/internal/heptio/workgroup/LICENSE deleted file mode 100644 index 5e0fd33c..00000000 --- a/internal/heptio/workgroup/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ -Apache License -Version 2.0, January 2004 -http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - -"License" shall mean the terms and conditions for use, reproduction, -and distribution as defined by Sections 1 through 9 of this document. - -"Licensor" shall mean the copyright owner or entity authorized by -the copyright owner that is granting the License. - -"Legal Entity" shall mean the union of the acting entity and all -other entities that control, are controlled by, or are under common -control with that entity. For the purposes of this definition, -"control" means (i) the power, direct or indirect, to cause the -direction or management of such entity, whether by contract or -otherwise, or (ii) ownership of fifty percent (50%) or more of the -outstanding shares, or (iii) beneficial ownership of such entity. - -"You" (or "Your") shall mean an individual or Legal Entity -exercising permissions granted by this License. - -"Source" form shall mean the preferred form for making modifications, -including but not limited to software source code, documentation -source, and configuration files. - -"Object" form shall mean any form resulting from mechanical -transformation or translation of a Source form, including but -not limited to compiled object code, generated documentation, -and conversions to other media types. - -"Work" shall mean the work of authorship, whether in Source or -Object form, made available under the License, as indicated by a -copyright notice that is included in or attached to the work -(an example is provided in the Appendix below). - -"Derivative Works" shall mean any work, whether in Source or Object -form, that is based on (or derived from) the Work and for which the -editorial revisions, annotations, elaborations, or other modifications -represent, as a whole, an original work of authorship. For the purposes -of this License, Derivative Works shall not include works that remain -separable from, or merely link (or bind by name) to the interfaces of, -the Work and Derivative Works thereof. - -"Contribution" shall mean any work of authorship, including -the original version of the Work and any modifications or additions -to that Work or Derivative Works thereof, that is intentionally -submitted to Licensor for inclusion in the Work by the copyright owner -or by an individual or Legal Entity authorized to submit on behalf of -the copyright owner. For the purposes of this definition, "submitted" -means any form of electronic, verbal, or written communication sent -to the Licensor or its representatives, including but not limited to -communication on electronic mailing lists, source code control systems, -and issue tracking systems that are managed by, or on behalf of, the -Licensor for the purpose of discussing and improving the Work, but -excluding communication that is conspicuously marked or otherwise -designated in writing by the copyright owner as "Not a Contribution." - -"Contributor" shall mean Licensor and any individual or Legal Entity -on behalf of whom a Contribution has been received by Licensor and -subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of -this License, each Contributor hereby grants to You a perpetual, -worldwide, non-exclusive, no-charge, royalty-free, irrevocable -copyright license to reproduce, prepare Derivative Works of, -publicly display, publicly perform, sublicense, and distribute the -Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of -this License, each Contributor hereby grants to You a perpetual, -worldwide, non-exclusive, no-charge, royalty-free, irrevocable -(except as stated in this section) patent license to make, have made, -use, offer to sell, sell, import, and otherwise transfer the Work, -where such license applies only to those patent claims licensable -by such Contributor that are necessarily infringed by their -Contribution(s) alone or by combination of their Contribution(s) -with the Work to which such Contribution(s) was submitted. If You -institute patent litigation against any entity (including a -cross-claim or counterclaim in a lawsuit) alleging that the Work -or a Contribution incorporated within the Work constitutes direct -or contributory patent infringement, then any patent licenses -granted to You under this License for that Work shall terminate -as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the -Work or Derivative Works thereof in any medium, with or without -modifications, and in Source or Object form, provided that You -meet the following conditions: - -(a) You must give any other recipients of the Work or -Derivative Works a copy of this License; and - -(b) You must cause any modified files to carry prominent notices -stating that You changed the files; and - -(c) You must retain, in the Source form of any Derivative Works -that You distribute, all copyright, patent, trademark, and -attribution notices from the Source form of the Work, -excluding those notices that do not pertain to any part of -the Derivative Works; and - -(d) If the Work includes a "NOTICE" text file as part of its -distribution, then any Derivative Works that You distribute must -include a readable copy of the attribution notices contained -within such NOTICE file, excluding those notices that do not -pertain to any part of the Derivative Works, in at least one -of the following places: within a NOTICE text file distributed -as part of the Derivative Works; within the Source form or -documentation, if provided along with the Derivative Works; or, -within a display generated by the Derivative Works, if and -wherever such third-party notices normally appear. The contents -of the NOTICE file are for informational purposes only and -do not modify the License. You may add Your own attribution -notices within Derivative Works that You distribute, alongside -or as an addendum to the NOTICE text from the Work, provided -that such additional attribution notices cannot be construed -as modifying the License. - -You may add Your own copyright statement to Your modifications and -may provide additional or different license terms and conditions -for use, reproduction, or distribution of Your modifications, or -for any such Derivative Works as a whole, provided Your use, -reproduction, and distribution of the Work otherwise complies with -the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, -any Contribution intentionally submitted for inclusion in the Work -by You to the Licensor shall be under the terms and conditions of -this License, without any additional terms or conditions. -Notwithstanding the above, nothing herein shall supersede or modify -the terms of any separate license agreement you may have executed -with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade -names, trademarks, service marks, or product names of the Licensor, -except as required for reasonable and customary use in describing the -origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or -agreed to in writing, Licensor provides the Work (and each -Contributor provides its Contributions) on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied, including, without limitation, any warranties or conditions -of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A -PARTICULAR PURPOSE. You are solely responsible for determining the -appropriateness of using or redistributing the Work and assume any -risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, -whether in tort (including negligence), contract, or otherwise, -unless required by applicable law (such as deliberate and grossly -negligent acts) or agreed to in writing, shall any Contributor be -liable to You for damages, including any direct, indirect, special, -incidental, or consequential damages of any character arising as a -result of this License or out of the use or inability to use the -Work (including but not limited to damages for loss of goodwill, -work stoppage, computer failure or malfunction, or any and all -other commercial damages or losses), even if such Contributor -has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing -the Work or Derivative Works thereof, You may choose to offer, -and charge a fee for, acceptance of support, warranty, indemnity, -or other liability obligations and/or rights consistent with this -License. However, in accepting such obligations, You may act only -on Your own behalf and on Your sole responsibility, not on behalf -of any other Contributor, and only if You agree to indemnify, -defend, and hold each Contributor harmless for any liability -incurred by, or claims asserted against, such Contributor by reason -of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS - -APPENDIX: How to apply the Apache License to your work. - -To apply the Apache License to your work, attach the following -boilerplate notice, with the fields enclosed by brackets "{}" -replaced with your own identifying information. (Don't include -the brackets!) The text should be enclosed in the appropriate -comment syntax for the file format. We also recommend that a -file or class name and description of purpose be included on the -same "printed page" as the copyright notice for easier -identification within third-party archives. - -Copyright {yyyy} {name of copyright owner} - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. diff --git a/internal/heptio/workgroup/group.go b/internal/heptio/workgroup/group.go deleted file mode 100644 index 0d45a7bf..00000000 --- a/internal/heptio/workgroup/group.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright © 2017 Heptio -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// workgroup provides a mechanism for controlling the lifetime -// of a set of related goroutines. -package workgroup - -import "sync" - -// A Group manages a set of goroutines with related lifetimes. -// The zero value for a Group is fully usable without initalisation. -type Group struct { - fn []func(<-chan struct{}) error -} - -// Add adds a function to the Group. -// The function will be exectuted in its own goroutine when Run is called. -// Add must be called before Run. -func (g *Group) Add(fn func(<-chan struct{}) error) { - g.fn = append(g.fn, fn) -} - -// Run exectues each function registered via Add in its own goroutine. -// Run blocks until all functions have returned. -// The first function to return will trigger the closure of the channel -// passed to each function, who should in turn, return. -// The return value from the first function to exit will be returned to -// the caller of Run. -func (g *Group) Run() error { - - // if there are no registered functions, return immediately. - if len(g.fn) < 1 { - return nil - } - - var wg sync.WaitGroup - wg.Add(len(g.fn)) - - stop := make(chan struct{}) - result := make(chan error, len(g.fn)) - for _, fn := range g.fn { - go func(fn func(<-chan struct{}) error) { - defer wg.Done() - result <- fn(stop) - }(fn) - } - - defer wg.Wait() - defer close(stop) - return <-result -} diff --git a/worker/run.go b/worker/run.go index 9b2b3398..96826dc8 100644 --- a/worker/run.go +++ b/worker/run.go @@ -1,6 +1,7 @@ package worker import ( + "context" "fmt" "log" "math/rand" @@ -32,7 +33,7 @@ func Run(input models.Input, testName string) (models.Report, error) { logger := worker.Logger initialStatus := server.GetStatus(logger, input.ApmServerSecret, input.ApmServerUrl, testNode) - result, err := worker.work() + result, err := worker.work(context.Background()) if err != nil { logger.Println(err.Error()) return models.Report{}, err @@ -92,10 +93,17 @@ func prepareWork(input models.Input) (worker, error) { Tracer: tracer, RunTimeout: input.RunTimeout, FlushTimeout: input.FlushTimeout, + + TransactionFrequency: input.TransactionFrequency, + TransactionLimit: input.TransactionLimit, + SpanMinLimit: input.SpanMinLimit, + SpanMaxLimit: input.SpanMaxLimit, + + ErrorFrequency: input.ErrorFrequency, + ErrorLimit: input.ErrorLimit, + ErrorFrameMinLimit: input.ErrorFrameMinLimit, + ErrorFrameMaxLimit: input.ErrorFrameMaxLimit, } - w.addErrors(input.ErrorFrequency, input.ErrorLimit, input.ErrorFrameMinLimit, input.ErrorFrameMaxLimit) - w.addTransactions(input.TransactionFrequency, input.TransactionLimit, input.SpanMinLimit, input.SpanMaxLimit) - w.addSignalHandling() return w, nil } diff --git a/worker/work.go b/worker/work.go index 2d8fd59f..a2082f18 100644 --- a/worker/work.go +++ b/worker/work.go @@ -10,8 +10,6 @@ import ( "strconv" "time" - "github.com/elastic/hey-apm/internal/heptio/workgroup" - "github.com/elastic/hey-apm/agent" "go.elastic.co/apm/stacktrace" @@ -20,36 +18,94 @@ import ( type worker struct { *apmLogger *agent.Tracer + + ErrorFrequency time.Duration + ErrorLimit int + ErrorFrameMinLimit int + ErrorFrameMaxLimit int + + TransactionFrequency time.Duration + TransactionLimit int + SpanMinLimit int + SpanMaxLimit int + RunTimeout time.Duration FlushTimeout time.Duration - - // not to be modified concurrently - workgroup.Group } // work uses the Go agent API to generate events and send them to apm-server. -func (w *worker) work() (Result, error) { +func (w *worker) work(ctx context.Context) (Result, error) { + var runTimerC <-chan time.Time if w.RunTimeout > 0 { - w.Add(func(done <-chan struct{}) error { - select { - case <-done: - return nil - case <-time.After(w.RunTimeout): - return nil // time expired + runTimer := time.NewTimer(w.RunTimeout) + defer runTimer.Stop() + runTimerC = runTimer.C + } + + var errorTicker, transactionTicker maybeTicker + if w.ErrorFrequency > 0 && w.ErrorLimit > 0 { + errorTicker.Start(w.ErrorFrequency) + defer errorTicker.Stop() + } + if w.TransactionFrequency > 0 && w.TransactionLimit > 0 { + transactionTicker.Start(w.TransactionFrequency) + defer transactionTicker.Stop() + } + + // TODO(axw) do this outside work, cancel context on signal + signalC := make(chan os.Signal, 1) + signal.Notify(signalC, os.Interrupt) + + result := Result{Start: time.Now()} + var done bool + for !done { + select { + case <-ctx.Done(): + return Result{}, ctx.Err() + case sig := <-signalC: + return Result{}, errors.New(sig.String()) + case <-runTimerC: + done = true + case <-errorTicker.C: + w.sendError() + w.ErrorLimit-- + if w.ErrorLimit == 0 { + errorTicker.Stop() } - }) + case <-transactionTicker.C: + w.sendTransaction() + w.TransactionLimit-- + if w.TransactionLimit == 0 { + transactionTicker.Stop() + } + } } - result := Result{} - result.Start = time.Now() - err := w.Run() result.End = time.Now() w.flush() result.Flushed = time.Now() result.TracerStats = w.Stats() result.TransportStats = *w.TransportStats + return result, nil +} - return result, err +func (w *worker) sendError() { + err := &generatedErr{frames: randRange(w.ErrorFrameMinLimit, w.ErrorFrameMaxLimit)} + w.Tracer.NewError(err).Send() +} + +func (w *worker) sendTransaction() { + tx := w.Tracer.StartTransaction("generated", "gen") + defer tx.End() + spanCount := randRange(w.SpanMinLimit, w.SpanMaxLimit) + for i := 0; i < spanCount; i++ { + tx.StartSpan("I'm a span", "gen.era.ted", nil).End() + } + tx.Context.SetTag("spans", strconv.Itoa(spanCount)) +} + +func randRange(min, max int) int { + return min + rand.Intn(max-min+1) } // flush ensures that the entire workload defined is pushed to the apm-server, within the worker timeout limit. @@ -93,60 +149,17 @@ func (e *generatedErr) StackTrace() []stacktrace.Frame { return st } -func (w *worker) addErrors(frequency time.Duration, limit, framesMin, framesMax int) { - if limit <= 0 { - return - } - w.Add(func(done <-chan struct{}) error { - ticker := time.NewTicker(frequency) - defer ticker.Stop() - for i := 0; i < limit; i++ { - select { - case <-done: - return nil - case <-ticker.C: - } - w.Tracer.NewError(&generatedErr{frames: rand.Intn(framesMax-framesMin+1) + framesMin}).Send() - } - return nil - }) +type maybeTicker struct { + ticker *time.Ticker + C <-chan time.Time } -func (w *worker) addTransactions(frequency time.Duration, limit, spanMin, spanMax int) { - if limit <= 0 { - return - } - generator := func(done <-chan struct{}) error { - ticker := time.NewTicker(frequency) - defer ticker.Stop() - for i := 0; i < limit; i++ { - select { - case <-done: - return nil - case <-ticker.C: - } - tx := w.Tracer.StartTransaction("generated", "gen") - spanCount := rand.Intn(spanMax-spanMin+1) + spanMin - for i := 0; i < spanCount; i++ { - tx.StartSpan("I'm a span", "gen.era.ted", nil).End() - } - tx.Context.SetTag("spans", strconv.Itoa(spanCount)) - tx.End() - } - return nil - } - w.Add(generator) +func (t *maybeTicker) Start(d time.Duration) { + t.ticker = time.NewTicker(d) + t.C = t.ticker.C } -func (w *worker) addSignalHandling() { - w.Add(func(done <-chan struct{}) error { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - select { - case <-done: - return nil - case sig := <-c: - return errors.New(sig.String()) - } - }) +func (t *maybeTicker) Stop() { + t.ticker.Stop() + t.C = nil } From 0b0ffa2bb56595cd9faff3e973f708a7997f0155 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 27 Jul 2020 12:04:35 +0800 Subject: [PATCH 3/4] models: remove "Wrap" type Add "With" methods to Input directly, for simpler and more idiomatic code. --- models/input.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/models/input.go b/models/input.go index f74c3430..63b9cf78 100644 --- a/models/input.go +++ b/models/input.go @@ -62,30 +62,26 @@ type Input struct { ErrorFrameMinLimit int `json:"error_generation_frames_min_limit"` } -type Wrap struct { - Input +func (in Input) WithErrors(limit int, freq time.Duration) Input { + in.ErrorLimit = limit + in.ErrorFrequency = freq + return in } -func (w Wrap) WithErrors(limit int, freq time.Duration) Wrap { - w.ErrorLimit = limit - w.ErrorFrequency = freq - return w +func (in Input) WithFrames(f int) Input { + in.ErrorFrameMaxLimit = f + in.ErrorFrameMinLimit = f + return in } -func (w Wrap) WithFrames(f int) Wrap { - w.ErrorFrameMaxLimit = f - w.ErrorFrameMinLimit = f - return w +func (in Input) WithTransactions(limit int, freq time.Duration) Input { + in.TransactionLimit = limit + in.TransactionFrequency = freq + return in } -func (w Wrap) WithTransactions(limit int, freq time.Duration) Wrap { - w.TransactionLimit = limit - w.TransactionFrequency = freq - return w -} - -func (w Wrap) WithSpans(s int) Wrap { - w.SpanMaxLimit = s - w.SpanMinLimit = s - return w +func (in Input) WithSpans(s int) Input { + in.SpanMaxLimit = s + in.SpanMinLimit = s + return in } From fa250c8a33f78f3f0fcd09fc556a75022bb0baec Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 27 Jul 2020 12:05:50 +0800 Subject: [PATCH 4/4] all: improve interrupt signal handling Previously, interrupt (Ctrl+C) would not be handled well. e.g. while running benchmarks, benchmarks would continue to run, and you would need to interrupt every individual benchmark. With these changes we abort all benchmarks upon interrupt, as they would no longer be useful for comparisons. Non-benchmark load generation is gracefully stopped, and stats gathered per usual without error. --- benchmark/benchmark.go | 145 ++++++++++++++++++++++++----------------- es/api.go | 3 +- go.mod | 1 + go.sum | 1 + main.go | 58 ++++++++++++----- worker/run.go | 29 +++++---- worker/work.go | 30 ++++----- 7 files changed, 156 insertions(+), 111 deletions(-) diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 0ba0dfe7..c3318e0b 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -1,7 +1,9 @@ package benchmark import ( + "context" "fmt" + "log" "math" "time" @@ -25,83 +27,105 @@ const ( // // Regression checks accept an error margin and are not aware of apm-server versions, only URLs. // apm-server must be started independently with -E apm-server.expvar.enabled=true -func Run(input models.Input) error { +func Run(ctx context.Context, input models.Input) error { conn, err := es.NewConnection(input.ElasticsearchUrl, input.ElasticsearchAuth) if err != nil { return errors.Wrap(err, "Elasticsearch not reachable, won't be able to index a report") } - if err == nil { - fmt.Println("deleting previous APM indices...") - err = es.DeleteAPMIndices(conn) + log.Printf("Deleting previous APM event documents...") + if err := es.DeleteAPMEvents(conn); err != nil { + return err + } + if err := warmUp(ctx, input); err != nil { + return err + } + + tests := defineTests(input) + reports, err := tests.run(ctx) + if err != nil { + return err + } + if err := verifyReports(reports, conn, input.RegressionMargin, input.RegressionDays); err != nil { + return err } + return nil +} + +func defineTests(input models.Input) tests { + var t tests + t.add("transactions only", input.WithTransactions(math.MaxInt32, time.Millisecond*5)) + t.add("transactions only", input.WithTransactions(math.MaxInt32, time.Millisecond*5)) + t.add("small transactions", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(10)) + t.add("large transactions", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(40)) + t.add("small errors only", input.WithErrors(math.MaxInt32, time.Millisecond).WithFrames(10)) + t.add("very large errors only", input.WithErrors(math.MaxInt32, time.Millisecond).WithFrames(500)) + t.add("transactions only very high load", input.WithTransactions(math.MaxInt32, time.Microsecond*100)) + t.add("transactions, spans and errors high load", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(10).WithErrors(math.MaxInt32, time.Millisecond).WithFrames(50)) + return t +} - warmUp(input) - - run := runner(conn, input.RegressionMargin, input.RegressionDays) - run("transactions only", models.Wrap{input}. - WithTransactions(math.MaxInt32, time.Millisecond*5). - Input) - run("small transactions", models.Wrap{input}. - WithTransactions(math.MaxInt32, time.Millisecond*5). - WithSpans(10). - Input) - run("large transactions", models.Wrap{input}. - WithTransactions(math.MaxInt32, time.Millisecond*5). - WithSpans(40). - Input) - run("small errors only", models.Wrap{input}. - WithErrors(math.MaxInt32, time.Millisecond). - WithFrames(10). - Input) - run("very large errors only", models.Wrap{input}. - WithErrors(math.MaxInt32, time.Millisecond). - WithFrames(500). - Input) - run("transactions only very high load", models.Wrap{input}. - WithTransactions(math.MaxInt32, time.Microsecond*100). - Input) - err = run("transactions, spans and errors high load", models.Wrap{input}. - WithTransactions(math.MaxInt32, time.Millisecond*5). - WithSpans(10). - WithErrors(math.MaxInt32, time.Millisecond). - WithFrames(50). - Input) - - return err +type test struct { + name string + input models.Input } -// Runner keeps track of errors during successive calls, returning the last one. -func runner(conn es.Connection, margin float64, days string) func(name string, input models.Input) error { - var err error - return func(name string, input models.Input) error { - fmt.Println("running benchmark with " + name) - report, e := worker.Run(input, name) - if e == nil { - e = verify(conn, report, margin, days) +type tests []test + +func (t *tests) add(name string, input models.Input) { + *t = append(*t, test{name: name, input: input}) +} + +func (t *tests) run(ctx context.Context) ([]models.Report, error) { + reports := make([]models.Report, len(*t)) + for i, test := range *t { + log.Printf("running benchmark %q", test.name) + report, err := worker.Run(ctx, test.input, test.name, nil /*stop*/) + if err != nil { + return nil, err } - if e != nil { - fmt.Println(e) - err = e + if err := coolDown(ctx); err != nil { + return nil, err } - return err + reports[i] = report } + return reports, nil +} + +func verifyReports(reports []models.Report, conn es.Connection, margin float64, days string) error { + var lastErr error + for _, report := range reports { + if err := verify(conn, report, margin, days); err != nil { + fmt.Println(err) + lastErr = err + } + } + return lastErr } // warmUp sends a moderate load to apm-server without saving a report. -func warmUp(input models.Input) { - input = models.Wrap{input}.WithErrors(math.MaxInt16, time.Millisecond).Input +func warmUp(ctx context.Context, input models.Input) error { + input = input.WithErrors(math.MaxInt16, time.Millisecond) input.RunTimeout = warm input.SkipIndexReport = true - fmt.Println(fmt.Sprintf("warming up %.1f seconds...", warm.Seconds())) - worker.Run(input, "warm up") - coolDown() + log.Printf("warming up %.1f seconds...", warm.Seconds()) + if _, err := worker.Run(ctx, input, "warm up", nil); err != nil { + return err + } + return coolDown(ctx) } // coolDown waits an arbitrary time for events in elasticsearch be flushed, heap be freed, etc. -func coolDown() { - fmt.Println(fmt.Sprintf("cooling down %.1f seconds... ", cool.Seconds())) - time.Sleep(cool) +func coolDown(ctx context.Context) error { + log.Printf("cooling down %.1f seconds... ", cool.Seconds()) + timer := time.NewTimer(cool) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } } // verify asserts there are no performance regressions for a given workload. @@ -110,9 +134,8 @@ func coolDown() { // returns an error if connection can't be established, // or performance decreased by a margin larger than specified func verify(conn es.Connection, report models.Report, margin float64, days string) error { - coolDown() if report.EventsIndexed < 100 { - return errors.New(fmt.Sprintf("not enough events indexed: %d", report.EventsIndexed)) + return fmt.Errorf("not enough events indexed: %d", report.EventsIndexed) } inputMap := conv.ToMap(report.Input) @@ -152,7 +175,7 @@ func verify(conn es.Connection, report models.Report, margin float64, days strin } func newRegression(r1, r2 models.Report) error { - return errors.New(fmt.Sprintf(`test report with doc id %s was expected to show same or better + return fmt.Errorf(`test report with doc id %s was expected to show same or better performance as %s, however %.2f is lower than %.2f`, - r1.ReportId, r2.ReportId, r1.Performance(), r2.Performance())) + r1.ReportId, r2.ReportId, r1.Performance(), r2.Performance()) } diff --git a/es/api.go b/es/api.go index e686dc6f..dbfec93f 100644 --- a/es/api.go +++ b/es/api.go @@ -3,6 +3,7 @@ package es import ( "encoding/json" "fmt" + "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esutil" "github.com/elastic/hey-apm/models" @@ -101,7 +102,7 @@ func Count(conn Connection, index string) uint64 { return 0 } -func DeleteAPMIndices(conn Connection) error { +func DeleteAPMEvents(conn Connection) error { body := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ diff --git a/go.mod b/go.mod index b1712cc3..6a9c843d 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/prometheus/procfs v0.0.11 // indirect github.com/stretchr/testify v1.4.0 go.elastic.co/apm v1.8.0 + golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect howett.net/plist v0.0.0-20200225050739-77e249a2e2ba // indirect ) diff --git a/go.sum b/go.sum index 4b58655c..6af29cb2 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,7 @@ go.elastic.co/apm v1.8.0/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0= go.elastic.co/fastjson v1.0.0 h1:ooXV/ABvf+tBul26jcVViPT3sBir0PvXgibYB1IQQzg= go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/main.go b/main.go index 1c060d78..1d816927 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,19 @@ package main import ( + "context" "flag" "fmt" + "log" "math" "math/rand" "os" + "os/signal" "strconv" - "sync" "time" "go.elastic.co/apm" + "golang.org/x/sync/errgroup" "github.com/elastic/hey-apm/benchmark" "github.com/elastic/hey-apm/models" @@ -23,35 +26,56 @@ func init() { } func main() { + if err := Main(); err != nil { + log.Fatal(err) + } +} - var err error - +func Main() error { + signalC := make(chan os.Signal, 1) + signal.Notify(signalC, os.Interrupt) input := parseFlags() if input.IsBenchmark { - if err = benchmark.Run(input); err != nil { - os.Exit(1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + // Ctrl+C when running benchmarks causes them to be + // aborted, as the results are not meaningful for + // comparison. + defer cancel() + <-signalC + log.Printf("Interrupt signal received, aborting benchmarks...") + }() + if err := benchmark.Run(ctx, input); err != nil { + return err } - return + return nil } - runWorkers(input) + + stopChan := make(chan struct{}) + go func() { + // Ctrl+C when running load generation gracefully stops the + // workers and prints the statistics. + defer close(stopChan) + <-signalC + log.Printf("Interrupt signal received, stopping load generator...") + }() + return runWorkers(input, stopChan) } -func runWorkers(input models.Input) { - var wg sync.WaitGroup +func runWorkers(input models.Input, stop <-chan struct{}) error { + g, ctx := errgroup.WithContext(context.Background()) for i := 0; i < input.Instances; i++ { idx := i - wg.Add(1) - go func() { - defer wg.Done() + g.Go(func() error { randomDelay := time.Duration(rand.Intn(input.DelayMillis)) * time.Millisecond fmt.Println(fmt.Sprintf("--- Starting instance (%v) in %v milliseconds", idx, randomDelay)) time.Sleep(randomDelay) - if _, err := worker.Run(input, ""); err != nil { - os.Exit(1) - } - }() + _, err := worker.Run(ctx, input, "", stop) + return err + }) } - wg.Wait() + return g.Wait() } func parseFlags() models.Input { diff --git a/worker/run.go b/worker/run.go index 96826dc8..cc0ff448 100644 --- a/worker/run.go +++ b/worker/run.go @@ -20,20 +20,23 @@ const quiesceTimeout = 5 * time.Minute // Run executes a load test work with the given input, prints the results, // indexes a performance report, and returns it along any error. -func Run(input models.Input, testName string) (models.Report, error) { +// +// If the context is cancelled, the worker exits with the context's error. +// If the stop channel is signalled, the worker exits gracefully with no error. +func Run(ctx context.Context, input models.Input, testName string, stop <-chan struct{}) (models.Report, error) { testNode, err := es.NewConnection(input.ApmElasticsearchUrl, input.ApmElasticsearchAuth) if err != nil { return models.Report{}, errors.Wrap(err, "Elasticsearch used by APM Server not known or reachable") } - worker, err := prepareWork(input) + worker, err := newWorker(input, stop) if err != nil { return models.Report{}, err } - logger := worker.Logger + logger := worker.logger.Logger initialStatus := server.GetStatus(logger, input.ApmServerSecret, input.ApmServerUrl, testNode) - result, err := worker.work(context.Background()) + result, err := worker.work(ctx) if err != nil { logger.Println(err.Error()) return models.Report{}, err @@ -80,17 +83,17 @@ func Run(input models.Input, testName string) (models.Report, error) { return report, err } -// prepareWork returns a worker with with a workload defined by the input. -func prepareWork(input models.Input) (worker, error) { - +// newWorker returns a new worker with with a workload defined by the input. +func newWorker(input models.Input, stop <-chan struct{}) (*worker, error) { logger := newApmLogger(log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lshortfile)) tracer, err := agent.NewTracer(logger, input.ApmServerUrl, input.ApmServerSecret, input.APIKey, input.ServiceName, input.SpanMaxLimit) if err != nil { - return worker{}, err + return nil, err } - w := worker{ - apmLogger: logger, - Tracer: tracer, + return &worker{ + stop: stop, + logger: logger, + tracer: tracer, RunTimeout: input.RunTimeout, FlushTimeout: input.FlushTimeout, @@ -103,9 +106,7 @@ func prepareWork(input models.Input) (worker, error) { ErrorLimit: input.ErrorLimit, ErrorFrameMinLimit: input.ErrorFrameMinLimit, ErrorFrameMaxLimit: input.ErrorFrameMaxLimit, - } - - return w, nil + }, nil } func createReport(input models.Input, testName string, result Result, initialStatus, finalStatus server.Status) models.Report { diff --git a/worker/work.go b/worker/work.go index a2082f18..8964965c 100644 --- a/worker/work.go +++ b/worker/work.go @@ -2,11 +2,8 @@ package worker import ( "context" - "errors" "fmt" "math/rand" - "os" - "os/signal" "strconv" "time" @@ -16,8 +13,9 @@ import ( ) type worker struct { - *apmLogger - *agent.Tracer + stop <-chan struct{} // graceful shutdown + logger *apmLogger + tracer *agent.Tracer ErrorFrequency time.Duration ErrorLimit int @@ -52,18 +50,14 @@ func (w *worker) work(ctx context.Context) (Result, error) { defer transactionTicker.Stop() } - // TODO(axw) do this outside work, cancel context on signal - signalC := make(chan os.Signal, 1) - signal.Notify(signalC, os.Interrupt) - result := Result{Start: time.Now()} var done bool for !done { select { case <-ctx.Done(): return Result{}, ctx.Err() - case sig := <-signalC: - return Result{}, errors.New(sig.String()) + case <-w.stop: + done = true case <-runTimerC: done = true case <-errorTicker.C: @@ -84,18 +78,18 @@ func (w *worker) work(ctx context.Context) (Result, error) { result.End = time.Now() w.flush() result.Flushed = time.Now() - result.TracerStats = w.Stats() - result.TransportStats = *w.TransportStats + result.TracerStats = w.tracer.Stats() + result.TransportStats = *w.tracer.TransportStats return result, nil } func (w *worker) sendError() { err := &generatedErr{frames: randRange(w.ErrorFrameMinLimit, w.ErrorFrameMaxLimit)} - w.Tracer.NewError(err).Send() + w.tracer.NewError(err).Send() } func (w *worker) sendTransaction() { - tx := w.Tracer.StartTransaction("generated", "gen") + tx := w.tracer.StartTransaction("generated", "gen") defer tx.End() spanCount := randRange(w.SpanMinLimit, w.SpanMaxLimit) for i := 0; i < spanCount; i++ { @@ -110,7 +104,7 @@ func randRange(min, max int) int { // flush ensures that the entire workload defined is pushed to the apm-server, within the worker timeout limit. func (w *worker) flush() { - defer w.Close() + defer w.tracer.Close() ctx := context.Background() if w.FlushTimeout > 0 { @@ -118,9 +112,9 @@ func (w *worker) flush() { ctx, cancel = context.WithTimeout(ctx, w.FlushTimeout) defer cancel() } - w.Flush(ctx.Done()) + w.tracer.Flush(ctx.Done()) if ctx.Err() != nil { - w.Errorf("timed out waiting for flush to complete") + w.logger.Errorf("timed out waiting for flush to complete") } }