Skip to content

Commit

Permalink
Refactoring: use new Fetch interface that automatically reports and l…
Browse files Browse the repository at this point in the history
…ogs errors (#11763)

Refactors code in the `kibana` Metricbeat module to use the new `Fetch` interface introduced in #10727.

Note that x-pack code paths in this module were not refactored to use the new interface as we don't want errors from those code paths to be reported into `metricbeat-*` indices, only logged to Metricbeat logs.

Related: #11767.
  • Loading branch information
ycombinator authored Apr 12, 2019
1 parent 6f1f994 commit 462f798
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 46 deletions.
8 changes: 2 additions & 6 deletions metricbeat/module/kibana/stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,12 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Kibana Stats API response")
r.Error(err)
return err
return errors.Wrap(err, "failure parsing Kibana Stats API response")
}

dataFields, err := schema.Apply(data)
if err != nil {
err = errors.Wrap(err, "failure to apply stats schema")
r.Error(err)
return err
return errors.Wrap(err, "failure to apply stats schema")
}

var event mb.Event
Expand Down
29 changes: 17 additions & 12 deletions metricbeat/module/kibana/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/kibana"
Expand Down Expand Up @@ -122,36 +121,42 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
now := time.Now()

m.fetchStats(r, now)
err := m.fetchStats(r, now)
if err != nil {
return err
}

if m.XPackEnabled {
m.fetchSettings(r, now)
}

return nil
}

func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) {
func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) error {
content, err := m.statsHTTP.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

if m.XPackEnabled {
intervalMs := m.calculateIntervalMs()
err = eventMappingStatsXPack(r, intervalMs, now, content)
if err != nil {
// Since this is an x-pack code path, we log the error but don't
// return it. Otherwise it would get reported into `metricbeat-*`
// indices.
m.Logger().Error(err)
return
return nil
}
} else {
err = eventMapping(r, content)
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
}
return eventMapping(r, content)
}

return nil
}

func (m *MetricSet) fetchSettings(r mb.ReporterV2, now time.Time) {
Expand Down
8 changes: 4 additions & 4 deletions metricbeat/module/kibana/stats/stats_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func TestFetch(t *testing.T) {
t.Skip("Kibana stats API is not available until 6.4.0")
}

f := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
Expand Down Expand Up @@ -85,8 +85,8 @@ func TestData(t *testing.T) {
t.Skip("Kibana stats API is not available until 6.4.0")
}

f := mbtest.NewReportingMetricSetV2(t, config)
err = mbtest.WriteEventsReporterV2(f, t, "")
f := mbtest.NewReportingMetricSetV2Error(t, config)
err = mbtest.WriteEventsReporterV2Error(f, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down
16 changes: 4 additions & 12 deletions metricbeat/module/kibana/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,26 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
event.Error = errors.Wrap(err, "failure parsing Kibana Status API response")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure parsing Kibana Status API response")
}

dataFields, err := schema.Apply(data)
if err != nil {
event.Error = errors.Wrap(err, "failure to apply status schema")
r.Event(event)
return event.Error
return errors.Wrap(err, "failure to apply status schema")
}

// Set service ID
uuid, err := dataFields.GetValue("uuid")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("uuid", elastic.Kibana)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("uuid", elastic.Kibana)
}
event.RootFields.Put("service.id", uuid)
dataFields.Delete("uuid")

// Set service version
version, err := dataFields.GetValue("version.number")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("version.number", elastic.Kibana)
r.Event(event)
return event.Error
return elastic.MakeErrorForMissingField("version.number", elastic.Kibana)
}
event.RootFields.Put("service.version", version)
dataFields.Delete("version")
Expand Down
13 changes: 3 additions & 10 deletions metricbeat/module/kibana/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package status

import (
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/kibana"
Expand Down Expand Up @@ -69,17 +68,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
content, err := m.http.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

err = eventMapping(r, content)

if err != nil {
m.Logger().Error(err)
return
}
return eventMapping(r, content)
}
4 changes: 2 additions & 2 deletions metricbeat/module/kibana/status/status_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUpWithTimeout(t, 600, "elasticsearch", "kibana")

f := mbtest.NewReportingMetricSetV2(t, mtest.GetConfig("status"))
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, mtest.GetConfig("status"))
events, errs := mbtest.ReportingFetchV2Error(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
Expand Down

0 comments on commit 462f798

Please sign in to comment.