Skip to content

Commit

Permalink
Add summary to journeys which don't emit journey:end (early node subp…
Browse files Browse the repository at this point in the history
…rocess exits) (#29606) (#29812)

* update link to beats developer guide

* fix: add summary to journeys which don't emit journey:end [fixes #28770]

* fix: avoid cmd/status when journey has already finished

(cherry picked from commit 3270ae1)

Co-authored-by: Lucas F. da Costa <lucas@lucasfcosta.com>
  • Loading branch information
mergify[bot] and lucasfcosta authored Jan 14, 2022
1 parent c0d9dbf commit dffe56c
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Only add monitor.status to browser events when summary. {pull}29460[29460]
- Also add summary to journeys for which the synthetics runner crashes. {pull}29606[29606]

*Metricbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ If you want to create a new project that reads some sort of operational data
and ships it to Elasticsearch, we suggest you make use of this library. Please
start by reading our [CONTRIBUTING](../CONTRIBUTING.md) file. We also have a
[developer
guide](https://www.elastic.co/guide/en/beats/libbeat/current/new-beat.html) to
guide](https://www.elastic.co/guide/en/beats/devguide/master/index.html) to
help you with the creation of new Beats.

Please also open a topic on the [forums](https://discuss.elastic.co/c/beats/libbeat) and
Expand Down
41 changes: 25 additions & 16 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e
}

switch se.Type {
case "cmd/status":
// If a command failed _after_ the journey was complete, as it happens
// when an `afterAll` hook fails, for example, we don't wan't to include
// a summary in the cmd/status event.
if !je.journeyComplete {
je.end = event.Timestamp
return je.createSummary(event)
}
case "journey/end":
je.journeyComplete = true
return je.createSummary(event)
Expand Down Expand Up @@ -155,23 +163,24 @@ func (je *journeyEnricher) createSummary(event *beat.Event) error {
down = 0
}

if je.journeyComplete {
eventext.MergeEventFields(event, common.MapStr{
"url": je.urlFields,
"synthetics": common.MapStr{
"type": "heartbeat/summary",
"journey": je.journey,
},
"monitor": common.MapStr{
"duration": common.MapStr{
"us": int64(je.end.Sub(je.start) / time.Microsecond),
},
},
"summary": common.MapStr{
"up": up,
"down": down,
eventext.MergeEventFields(event, common.MapStr{
"url": je.urlFields,
"synthetics": common.MapStr{
"type": "heartbeat/summary",
"journey": je.journey,
},
"monitor": common.MapStr{
"duration": common.MapStr{
"us": int64(je.end.Sub(je.start) / time.Microsecond),
},
})
},
"summary": common.MapStr{
"up": up,
"down": down,
},
})

if je.journeyComplete {
return je.firstError
}

Expand Down
113 changes: 102 additions & 11 deletions x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ import (
"github.com/elastic/go-lookslike/testslike"
)

func makeStepEvent(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent {
return &SynthEvent{
Type: typ,
TimestampEpochMicros: 1000 + ts,
PackageVersion: "1.0.0",
Step: &Step{Name: name, Index: index, Status: status},
Error: err,
Payload: common.MapStr{},
URL: urlstr,
}
}

func TestJourneyEnricher(t *testing.T) {
journey := &Journey{
Name: "A Journey Name",
Expand Down Expand Up @@ -50,17 +62,6 @@ func TestJourneyEnricher(t *testing.T) {
Journey: journey,
Payload: common.MapStr{},
}
makeStepEvent := func(typ string, ts float64, name string, index int, status string, urlstr string, err *SynthError) *SynthEvent {
return &SynthEvent{
Type: typ,
TimestampEpochMicros: 1000 + ts,
PackageVersion: "1.0.0",
Step: &Step{Name: name, Index: index, Status: status},
Error: err,
Payload: common.MapStr{},
URL: urlstr,
}
}
url1 := "http://example.net/url1"
url2 := "http://example.net/url2"
url3 := "http://example.net/url3"
Expand Down Expand Up @@ -121,6 +122,24 @@ func TestEnrichSynthEvent(t *testing.T) {
wantErr bool
check func(t *testing.T, e *beat.Event, je *journeyEnricher)
}{
{
"cmd/status",
&journeyEnricher{},
&SynthEvent{
Type: "cmd/status",
Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"},
},
true,
func(t *testing.T, e *beat.Event, je *journeyEnricher) {
v := lookslike.MustCompile(map[string]interface{}{
"summary": map[string]int{
"up": 0,
"down": 1,
},
})
testslike.Test(t, v, e.Fields)
},
},
{
"journey/end",
&journeyEnricher{},
Expand Down Expand Up @@ -195,3 +214,75 @@ func TestEnrichSynthEvent(t *testing.T) {
})
}
}

func TestNoSummaryOnAfterHook(t *testing.T) {
journey := &Journey{
Name: "A journey that fails after completing",
Id: "my-bad-after-all-hook",
}
journeyStart := &SynthEvent{
Type: "journey/start",
TimestampEpochMicros: 1000,
PackageVersion: "1.0.0",
Journey: journey,
Payload: common.MapStr{},
}
syntherr := &SynthError{
Message: "my-errmsg",
Name: "my-errname",
Stack: "my\nerr\nstack",
}
journeyEnd := &SynthEvent{
Type: "journey/end",
TimestampEpochMicros: 2000,
PackageVersion: "1.0.0",
Journey: journey,
Payload: common.MapStr{},
}
cmdStatus := &SynthEvent{
Type: "cmd/status",
Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"},
TimestampEpochMicros: 3000,
}

badStepUrl := "https://example.com/bad-step"
synthEvents := []*SynthEvent{
journeyStart,
makeStepEvent("step/start", 10, "Step1", 1, "", "", nil),
makeStepEvent("step/end", 20, "Step1", 1, "failed", badStepUrl, syntherr),
journeyEnd,
cmdStatus,
}

je := &journeyEnricher{}

for idx, se := range synthEvents {
e := &beat.Event{}
t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) {
enrichErr := je.enrich(e, se)

if se != nil && se.Type == "cmd/status" {
t.Run("no summary in cmd/status", func(t *testing.T) {
require.NotContains(t, e.Fields, "summary")
})
}

// Only the journey/end event should get a summary when
// it's emitted before the cmd/status (when an afterX hook fails).
if se != nil && se.Type == "journey/end" {
require.Equal(t, stepError(syntherr), enrichErr)

u, _ := url.Parse(badStepUrl)
t.Run("summary in journey/end", func(t *testing.T) {
v := lookslike.MustCompile(common.MapStr{
"synthetics.type": "heartbeat/summary",
"url": wrappers.URLFields(u),
"monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond),
})

testslike.Test(t, v, e.Fields)
})
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (e ExecMultiplexer) writeSynthEvent(se *SynthEvent) {
e.eventCounter.Store(-1)
}
hasCurrentJourney := e.currentJourney.Load()
if se.Type == "journey/end" {
if se.Type == "journey/end" || se.Type == "cmd/status" {
e.currentJourney.Store(false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestExecMultiplexer(t *testing.T) {
var testJourneys []*Journey
var testEvents []*SynthEvent
time := float64(0)
for jIdx := 0; jIdx < 3; jIdx++ {
for jIdx := 0; jIdx < 4; jIdx++ {
time++ // fake time to make events seem spaced out
journey := &Journey{
Name: fmt.Sprintf("J%d", jIdx),
Expand All @@ -45,11 +45,20 @@ func TestExecMultiplexer(t *testing.T) {
})
}

testEvents = append(testEvents, &SynthEvent{
Journey: journey,
Type: "journey/end",
TimestampEpochMicros: time,
})
// We want one of the test journeys to end with a cmd/status indicating it failed
if jIdx != 4 {
testEvents = append(testEvents, &SynthEvent{
Journey: journey,
Type: "journey/end",
TimestampEpochMicros: time,
})
} else {
testEvents = append(testEvents, &SynthEvent{
Journey: journey,
Type: "cmd/status",
TimestampEpochMicros: time,
})
}
}

// Write the test events in another go routine since writes block
Expand Down Expand Up @@ -77,7 +86,7 @@ Loop:
i := 0 // counter for index, resets on journey change
for _, se := range results {
require.Equal(t, i, se.index)
if se.Type == "journey/end" {
if se.Type == "journey/end" || se.Type == "cmd/status" {
i = 0
} else {
i++
Expand Down
7 changes: 4 additions & 3 deletions x-pack/heartbeat/monitors/browser/synthexec/synthexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ func runCmd(
if err != nil {
str := fmt.Sprintf("command exited with status %d: %s", cmd.ProcessState.ExitCode(), err)
mpx.writeSynthEvent(&SynthEvent{
Type: "cmd/status",
Error: &SynthError{Name: "cmdexit", Message: str},
Type: "cmd/status",
Error: &SynthError{Name: "cmdexit", Message: str},
TimestampEpochMicros: float64(time.Now().UnixMicro()),
})
logp.Warn("Error executing command '%s' (%d): %s", loggableCmd.String(), cmd.ProcessState.ExitCode(), err)
}
Expand Down Expand Up @@ -243,7 +244,7 @@ func lineToSynthEventFactory(typ string) func(bytes []byte, text string) (res *S
logp.Info("%s: %s", typ, text)
return &SynthEvent{
Type: typ,
TimestampEpochMicros: float64(time.Now().UnixNano() / int64(time.Millisecond)),
TimestampEpochMicros: float64(time.Now().UnixMicro()),
Payload: map[string]interface{}{
"message": text,
},
Expand Down

0 comments on commit dffe56c

Please sign in to comment.