From dffe56cb07c69aaa6f8ceb9ef083242b477ab832 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 14 Jan 2022 10:58:43 +0000 Subject: [PATCH] Add summary to journeys which don't emit journey:end (early node subprocess exits) (#29606) (#29812) * update link to beats developer guide * fix: add summary to journeys which don't emit journey:end [fixes #28770] * fix: avoid cmd/status when journey has already finished (cherry picked from commit 3270ae1ab631a156f8f7913ca3a794fe6bd80b2f) Co-authored-by: Lucas F. da Costa --- CHANGELOG.next.asciidoc | 1 + libbeat/README.md | 2 +- .../monitors/browser/synthexec/enrich.go | 41 ++++--- .../monitors/browser/synthexec/enrich_test.go | 113 ++++++++++++++++-- .../browser/synthexec/execmultiplexer.go | 2 +- .../browser/synthexec/execmultiplexer_test.go | 23 ++-- .../monitors/browser/synthexec/synthexec.go | 7 +- 7 files changed, 150 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 027fc1515247..3ade1e986a75 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Only add monitor.status to browser events when summary. {pull}29460[29460] +- Also add summary to journeys for which the synthetics runner crashes. {pull}29606[29606] *Metricbeat* diff --git a/libbeat/README.md b/libbeat/README.md index cf99987c5a3b..06a22219dfe3 100644 --- a/libbeat/README.md +++ b/libbeat/README.md @@ -8,7 +8,7 @@ If you want to create a new project that reads some sort of operational data and ships it to Elasticsearch, we suggest you make use of this library. Please start by reading our [CONTRIBUTING](../CONTRIBUTING.md) file. We also have a [developer -guide](https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html) to +guide](https://www.elastic.co/guide/en/beats/devguide/master/index.html) to help you with the creation of new Beats. Please also open a topic on the [forums](https://discuss.elastic.co/c/beats/libbeat) and diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index c3ab6c76faab..bec551a7947e 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -112,6 +112,14 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e } switch se.Type { + case "cmd/status": + // If a command failed _after_ the journey was complete, as it happens + // when an `afterAll` hook fails, for example, we don't wan't to include + // a summary in the cmd/status event. + if !je.journeyComplete { + je.end = event.Timestamp + return je.createSummary(event) + } case "journey/end": je.journeyComplete = true return je.createSummary(event) @@ -155,23 +163,24 @@ func (je *journeyEnricher) createSummary(event *beat.Event) error { down = 0 } - if je.journeyComplete { - eventext.MergeEventFields(event, common.MapStr{ - "url": je.urlFields, - "synthetics": common.MapStr{ - "type": "heartbeat/summary", - "journey": je.journey, - }, - "monitor": common.MapStr{ - "duration": common.MapStr{ - "us": int64(je.end.Sub(je.start) / time.Microsecond), - }, - }, - "summary": common.MapStr{ - "up": up, - "down": down, + eventext.MergeEventFields(event, common.MapStr{ + "url": je.urlFields, + "synthetics": common.MapStr{ + "type": "heartbeat/summary", + "journey": je.journey, + }, + "monitor": common.MapStr{ + "duration": common.MapStr{ + "us": int64(je.end.Sub(je.start) / time.Microsecond), }, - }) + }, + "summary": common.MapStr{ + "up": up, + "down": down, + }, + }) + + if je.journeyComplete { return je.firstError } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 629454f34c0e..f2c8ba25dca9 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -21,6 +21,18 @@ import ( "github.com/elastic/go-lookslike/testslike" ) +func makeStepEvent(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent { + return &SynthEvent{ + Type: typ, + TimestampEpochMicros: 1000 + ts, + PackageVersion: "1.0.0", + Step: &Step{Name: name, Index: index, Status: status}, + Error: err, + Payload: common.MapStr{}, + URL: urlstr, + } +} + func TestJourneyEnricher(t *testing.T) { journey := &Journey{ Name: "A Journey Name", @@ -50,17 +62,6 @@ func TestJourneyEnricher(t *testing.T) { Journey: journey, Payload: common.MapStr{}, } - makeStepEvent := func(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent { - return &SynthEvent{ - Type: typ, - TimestampEpochMicros: 1000 + ts, - PackageVersion: "1.0.0", - Step: &Step{Name: name, Index: index, Status: status}, - Error: err, - Payload: common.MapStr{}, - URL: urlstr, - } - } url1 := "http://example.net/url1" url2 := "http://example.net/url2" url3 := "http://example.net/url3" @@ -121,6 +122,24 @@ func TestEnrichSynthEvent(t *testing.T) { wantErr bool check func(t *testing.T, e *beat.Event, je *journeyEnricher) }{ + { + "cmd/status", + &journeyEnricher{}, + &SynthEvent{ + Type: "cmd/status", + Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"}, + }, + true, + func(t *testing.T, e *beat.Event, je *journeyEnricher) { + v := lookslike.MustCompile(map[string]interface{}{ + "summary": map[string]int{ + "up": 0, + "down": 1, + }, + }) + testslike.Test(t, v, e.Fields) + }, + }, { "journey/end", &journeyEnricher{}, @@ -195,3 +214,75 @@ func TestEnrichSynthEvent(t *testing.T) { }) } } + +func TestNoSummaryOnAfterHook(t *testing.T) { + journey := &Journey{ + Name: "A journey that fails after completing", + Id: "my-bad-after-all-hook", + } + journeyStart := &SynthEvent{ + Type: "journey/start", + TimestampEpochMicros: 1000, + PackageVersion: "1.0.0", + Journey: journey, + Payload: common.MapStr{}, + } + syntherr := &SynthError{ + Message: "my-errmsg", + Name: "my-errname", + Stack: "my\nerr\nstack", + } + journeyEnd := &SynthEvent{ + Type: "journey/end", + TimestampEpochMicros: 2000, + PackageVersion: "1.0.0", + Journey: journey, + Payload: common.MapStr{}, + } + cmdStatus := &SynthEvent{ + Type: "cmd/status", + Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"}, + TimestampEpochMicros: 3000, + } + + badStepUrl := "https://example.com/bad-step" + synthEvents := []*SynthEvent{ + journeyStart, + makeStepEvent("step/start", 10, "Step1", 1, "", "", nil), + makeStepEvent("step/end", 20, "Step1", 1, "failed", badStepUrl, syntherr), + journeyEnd, + cmdStatus, + } + + je := &journeyEnricher{} + + for idx, se := range synthEvents { + e := &beat.Event{} + t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { + enrichErr := je.enrich(e, se) + + if se != nil && se.Type == "cmd/status" { + t.Run("no summary in cmd/status", func(t *testing.T) { + require.NotContains(t, e.Fields, "summary") + }) + } + + // Only the journey/end event should get a summary when + // it's emitted before the cmd/status (when an afterX hook fails). + if se != nil && se.Type == "journey/end" { + require.Equal(t, stepError(syntherr), enrichErr) + + u, _ := url.Parse(badStepUrl) + t.Run("summary in journey/end", func(t *testing.T) { + v := lookslike.MustCompile(common.MapStr{ + "synthetics.type": "heartbeat/summary", + "url": wrappers.URLFields(u), + "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), + }) + + testslike.Test(t, v, e.Fields) + }) + } + }) + } +} diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index 57b423626d15..07de0143c38b 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -29,7 +29,7 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) { e.eventCounter.Store(-1) } hasCurrentJourney := e.currentJourney.Load() - if se.Type == "journey/end" { + if se.Type == "journey/end" || se.Type == "cmd/status" { e.currentJourney.Store(false) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go index 56af4c94d9d1..ec85a6b52229 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go @@ -18,7 +18,7 @@ func TestExecMultiplexer(t *testing.T) { var testJourneys []*Journey var testEvents []*SynthEvent time := float64(0) - for jIdx := 0; jIdx < 3; jIdx++ { + for jIdx := 0; jIdx < 4; jIdx++ { time++ // fake time to make events seem spaced out journey := &Journey{ Name: fmt.Sprintf("J%d", jIdx), @@ -45,11 +45,20 @@ func TestExecMultiplexer(t *testing.T) { }) } - testEvents = append(testEvents, &SynthEvent{ - Journey: journey, - Type: "journey/end", - TimestampEpochMicros: time, - }) + // We want one of the test journeys to end with a cmd/status indicating it failed + if jIdx != 4 { + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "journey/end", + TimestampEpochMicros: time, + }) + } else { + testEvents = append(testEvents, &SynthEvent{ + Journey: journey, + Type: "cmd/status", + TimestampEpochMicros: time, + }) + } } // Write the test events in another go routine since writes block @@ -77,7 +86,7 @@ Loop: i := 0 // counter for index, resets on journey change for _, se := range results { require.Equal(t, i, se.index) - if se.Type == "journey/end" { + if se.Type == "journey/end" || se.Type == "cmd/status" { i = 0 } else { i++ diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index e5bcd1332f5f..039a3480c800 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -196,8 +196,9 @@ func runCmd( if err != nil { str := fmt.Sprintf("command exited with status %d: %s", cmd.ProcessState.ExitCode(), err) mpx.writeSynthEvent(&SynthEvent{ - Type: "cmd/status", - Error: &SynthError{Name: "cmdexit", Message: str}, + Type: "cmd/status", + Error: &SynthError{Name: "cmdexit", Message: str}, + TimestampEpochMicros: float64(time.Now().UnixMicro()), }) logp.Warn("Error executing command '%s' (%d): %s", loggableCmd.String(), cmd.ProcessState.ExitCode(), err) } @@ -243,7 +244,7 @@ func lineToSynthEventFactory(typ string) func(bytes []byte, text string) (res *S logp.Info("%s: %s", typ, text) return &SynthEvent{ Type: typ, - TimestampEpochMicros: float64(time.Now().UnixNano() / int64(time.Millisecond)), + TimestampEpochMicros: float64(time.Now().UnixMicro()), Payload: map[string]interface{}{ "message": text, },