Skip to content
This repository has been archived by the owner on Oct 28, 2024. It is now read-only.

Refactoring, context propagation, graceful stop #177

Merged
merged 4 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 84 additions & 61 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package benchmark

import (
"context"
"fmt"
"log"
"math"
"time"

Expand All @@ -25,83 +27,105 @@ const (
//
// Regression checks accept an error margin and are not aware of apm-server versions, only URLs.
// apm-server must be started independently with -E apm-server.expvar.enabled=true
func Run(input models.Input) error {
func Run(ctx context.Context, input models.Input) error {
conn, err := es.NewConnection(input.ElasticsearchUrl, input.ElasticsearchAuth)
if err != nil {
return errors.Wrap(err, "Elasticsearch not reachable, won't be able to index a report")
}

if err == nil {
fmt.Println("deleting previous APM indices...")
err = es.DeleteAPMIndices(conn)
log.Printf("Deleting previous APM event documents...")
if err := es.DeleteAPMEvents(conn); err != nil {
return err
}
if err := warmUp(ctx, input); err != nil {
return err
}

tests := defineTests(input)
reports, err := tests.run(ctx)
if err != nil {
return err
}
if err := verifyReports(reports, conn, input.RegressionMargin, input.RegressionDays); err != nil {
return err
}
return nil
}

func defineTests(input models.Input) tests {
var t tests
t.add("transactions only", input.WithTransactions(math.MaxInt32, time.Millisecond*5))
t.add("transactions only", input.WithTransactions(math.MaxInt32, time.Millisecond*5))
t.add("small transactions", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(10))
t.add("large transactions", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(40))
t.add("small errors only", input.WithErrors(math.MaxInt32, time.Millisecond).WithFrames(10))
t.add("very large errors only", input.WithErrors(math.MaxInt32, time.Millisecond).WithFrames(500))
t.add("transactions only very high load", input.WithTransactions(math.MaxInt32, time.Microsecond*100))
t.add("transactions, spans and errors high load", input.WithTransactions(math.MaxInt32, time.Millisecond*5).WithSpans(10).WithErrors(math.MaxInt32, time.Millisecond).WithFrames(50))
return t
}

warmUp(input)

run := runner(conn, input.RegressionMargin, input.RegressionDays)
run("transactions only", models.Wrap{input}.
WithTransactions(math.MaxInt32, time.Millisecond*5).
Input)
run("small transactions", models.Wrap{input}.
WithTransactions(math.MaxInt32, time.Millisecond*5).
WithSpans(10).
Input)
run("large transactions", models.Wrap{input}.
WithTransactions(math.MaxInt32, time.Millisecond*5).
WithSpans(40).
Input)
run("small errors only", models.Wrap{input}.
WithErrors(math.MaxInt32, time.Millisecond).
WithFrames(10).
Input)
run("very large errors only", models.Wrap{input}.
WithErrors(math.MaxInt32, time.Millisecond).
WithFrames(500).
Input)
run("transactions only very high load", models.Wrap{input}.
WithTransactions(math.MaxInt32, time.Microsecond*100).
Input)
err = run("transactions, spans and errors high load", models.Wrap{input}.
WithTransactions(math.MaxInt32, time.Millisecond*5).
WithSpans(10).
WithErrors(math.MaxInt32, time.Millisecond).
WithFrames(50).
Input)

return err
type test struct {
name string
input models.Input
}

// Runner keeps track of errors during successive calls, returning the last one.
func runner(conn es.Connection, margin float64, days string) func(name string, input models.Input) error {
var err error
return func(name string, input models.Input) error {
fmt.Println("running benchmark with " + name)
report, e := worker.Run(input, name)
if e == nil {
e = verify(conn, report, margin, days)
type tests []test

func (t *tests) add(name string, input models.Input) {
*t = append(*t, test{name: name, input: input})
}

func (t *tests) run(ctx context.Context) ([]models.Report, error) {
reports := make([]models.Report, len(*t))
for i, test := range *t {
log.Printf("running benchmark %q", test.name)
report, err := worker.Run(ctx, test.input, test.name, nil /*stop*/)
if err != nil {
return nil, err
}
if e != nil {
fmt.Println(e)
err = e
if err := coolDown(ctx); err != nil {
return nil, err
}
return err
reports[i] = report
}
return reports, nil
}

func verifyReports(reports []models.Report, conn es.Connection, margin float64, days string) error {
var lastErr error
for _, report := range reports {
if err := verify(conn, report, margin, days); err != nil {
fmt.Println(err)
lastErr = err
}
}
return lastErr
}

// warmUp sends a moderate load to apm-server without saving a report.
func warmUp(input models.Input) {
input = models.Wrap{input}.WithErrors(math.MaxInt16, time.Millisecond).Input
func warmUp(ctx context.Context, input models.Input) error {
input = input.WithErrors(math.MaxInt16, time.Millisecond)
input.RunTimeout = warm
input.SkipIndexReport = true
fmt.Println(fmt.Sprintf("warming up %.1f seconds...", warm.Seconds()))
worker.Run(input, "warm up")
coolDown()
log.Printf("warming up %.1f seconds...", warm.Seconds())
if _, err := worker.Run(ctx, input, "warm up", nil); err != nil {
return err
}
return coolDown(ctx)
}

// coolDown waits an arbitrary time for events in elasticsearch be flushed, heap be freed, etc.
func coolDown() {
fmt.Println(fmt.Sprintf("cooling down %.1f seconds... ", cool.Seconds()))
time.Sleep(cool)
func coolDown(ctx context.Context) error {
log.Printf("cooling down %.1f seconds... ", cool.Seconds())
timer := time.NewTimer(cool)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}

// verify asserts there are no performance regressions for a given workload.
Expand All @@ -110,9 +134,8 @@ func coolDown() {
// returns an error if connection can't be established,
// or performance decreased by a margin larger than specified
func verify(conn es.Connection, report models.Report, margin float64, days string) error {
coolDown()
if report.EventsIndexed < 100 {
return errors.New(fmt.Sprintf("not enough events indexed: %d", report.EventsIndexed))
return fmt.Errorf("not enough events indexed: %d", report.EventsIndexed)
}

inputMap := conv.ToMap(report.Input)
Expand Down Expand Up @@ -152,7 +175,7 @@ func verify(conn es.Connection, report models.Report, margin float64, days strin
}

func newRegression(r1, r2 models.Report) error {
return errors.New(fmt.Sprintf(`test report with doc id %s was expected to show same or better
return fmt.Errorf(`test report with doc id %s was expected to show same or better
performance as %s, however %.2f is lower than %.2f`,
r1.ReportId, r2.ReportId, r1.Performance(), r2.Performance()))
r1.ReportId, r2.ReportId, r1.Performance(), r2.Performance())
}
3 changes: 2 additions & 1 deletion es/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package es
import (
"encoding/json"
"fmt"

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esutil"
"github.com/elastic/hey-apm/models"
Expand Down Expand Up @@ -101,7 +102,7 @@ func Count(conn Connection, index string) uint64 {
return 0
}

func DeleteAPMIndices(conn Connection) error {
func DeleteAPMEvents(conn Connection) error {
body := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/prometheus/procfs v0.0.11 // indirect
github.com/stretchr/testify v1.4.0
go.elastic.co/apm v1.8.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd // indirect
howett.net/plist v0.0.0-20200225050739-77e249a2e2ba // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go.elastic.co/apm v1.8.0/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
go.elastic.co/fastjson v1.0.0 h1:ooXV/ABvf+tBul26jcVViPT3sBir0PvXgibYB1IQQzg=
go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading