From 2bda2064c49b1f2d0e7e19e341b45e92cba696fd Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Wed, 1 May 2019 01:03:31 +0200
Subject: [PATCH 1/8] manage leader metricset so that followers don't report
 errors nor events

---
 metricbeat/module/etcd/leader/leader.go | 56 +++++++++++++++++++++----
 1 file changed, 49 insertions(+), 7 deletions(-)

diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go
index 51c100075150..5b1fdb080bcb 100644
--- a/metricbeat/module/etcd/leader/leader.go
+++ b/metricbeat/module/etcd/leader/leader.go
@@ -18,6 +18,11 @@
 package leader
 
 import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+
 	"github.com/pkg/errors"
 
 	"github.com/elastic/beats/libbeat/common"
@@ -30,6 +35,10 @@ const (
 	defaultScheme = "http"
 	defaultPath   = "/v2/stats/leader"
 	apiVersion    = "2"
+
+	// returned JSON management
+	msgElement        = "message"
+	msgValueNonLeader = "not current leader"
 )
 
 var (
@@ -71,14 +80,47 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
 // format. It publishes the event which is then forwarded to the output. In case
 // of an error set the Error field of mb.Event or simply call report.Error().
 func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
-	content, err := m.http.FetchContent()
+	res, err := m.http.FetchResponse()
+	if err != nil {
+		return errors.Wrap(err, "error fetching response")
+	}
+	defer res.Body.Close()
+
+	content, err := ioutil.ReadAll(res.Body)
 	if err != nil {
-		return errors.Wrap(err, "error in http fetch")
+		return errors.Wrapf(err, "error reading response from %q",
+			m.FullyQualifiedName())
+	}
+
+	if res.StatusCode == http.StatusOK {
+		reporter.Event(mb.Event{
+			MetricSetFields: eventMapping(content),
+			ModuleFields:    common.MapStr{"api_version": apiVersion},
+		})
+		return nil
+	}
+
+	// Errors might be reported as {"message":"<error message>"}
+	// let's look for that structure
+	var jsonResponse map[string]interface{}
+	if err = json.Unmarshal(content, &jsonResponse); err == nil {
+		if retMessage := jsonResponse[msgElement]; retMessage != "" {
+			// there is an error message element, let's use it
+
+			// If a 403 is returned and {"message":"not current leader"}
+			// do not consider this an error
+			// do not report events since this is not a leader
+			if res.StatusCode == http.StatusForbidden &&
+				retMessage == msgValueNonLeader {
+				return nil
+			}
+
+			return fmt.Errorf("fetching HTTP response for returned status code %d: %s",
+				res.StatusCode, retMessage)
+		}
 	}
 
-	reporter.Event(mb.Event{
-		MetricSetFields: eventMapping(content),
-		ModuleFields:    common.MapStr{"api_version": apiVersion},
-	})
-	return nil
+	// no message in the JSON payload, return standard error
+	return fmt.Errorf("fetching HTTP response returned status code %d", res.StatusCode)
+
 }

From 937bc741bd79f23ca9aee9442802253ed0daec95 Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Wed, 1 May 2019 01:36:52 +0200
Subject: [PATCH 2/8] no need to add metricset info to error, it is already
 added by an upstream error wrapper

---
 metricbeat/module/etcd/leader/leader.go | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go
index 5b1fdb080bcb..1c20ddaa0560 100644
--- a/metricbeat/module/etcd/leader/leader.go
+++ b/metricbeat/module/etcd/leader/leader.go
@@ -88,8 +88,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
 
 	content, err := ioutil.ReadAll(res.Body)
 	if err != nil {
-		return errors.Wrapf(err, "error reading response from %q",
-			m.FullyQualifiedName())
+		return errors.Wrapf(err, "error reading body response")
 	}
 
 	if res.StatusCode == http.StatusOK {

From cf6445d058da9eef2eded75f11a69a984f1082ab Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Wed, 1 May 2019 10:51:02 +0200
Subject: [PATCH 3/8] fix message typo

---
 metricbeat/module/etcd/leader/leader.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go
index 1c20ddaa0560..bfe59b09cdf6 100644
--- a/metricbeat/module/etcd/leader/leader.go
+++ b/metricbeat/module/etcd/leader/leader.go
@@ -114,7 +114,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
 				return nil
 			}
 
-			return fmt.Errorf("fetching HTTP response for returned status code %d: %s",
+			return fmt.Errorf("fetching HTTP response returned status code %d: %s",
 				res.StatusCode, retMessage)
 		}
 	}

From f2863079a0711e39fb7eb2a3426a66d519d294cd Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Wed, 1 May 2019 10:52:53 +0200
Subject: [PATCH 4/8] migrate to test case table. increase coverage for etcd
 leader tests

---
 .../etcd/_meta/test/leaderstats_empty.json    |   0
 .../etcd/_meta/test/leaderstats_follower.json |   1 +
 .../_meta/test/leaderstats_internalerror.json |   1 +
 metricbeat/module/etcd/leader/leader_test.go  | 110 ++++++++++++++----
 4 files changed, 90 insertions(+), 22 deletions(-)
 create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_empty.json
 create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_follower.json
 create mode 100644 metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json

diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_empty.json b/metricbeat/module/etcd/_meta/test/leaderstats_empty.json
new file mode 100644
index 000000000000..e69de29bb2d1
diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_follower.json b/metricbeat/module/etcd/_meta/test/leaderstats_follower.json
new file mode 100644
index 000000000000..c64ace3c384e
--- /dev/null
+++ b/metricbeat/module/etcd/_meta/test/leaderstats_follower.json
@@ -0,0 +1 @@
+{"message":"not current leader"}
\ No newline at end of file
diff --git a/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json b/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json
new file mode 100644
index 000000000000..af1c2edca60a
--- /dev/null
+++ b/metricbeat/module/etcd/_meta/test/leaderstats_internalerror.json
@@ -0,0 +1 @@
+{"message":"random error message"}
\ No newline at end of file
diff --git a/metricbeat/module/etcd/leader/leader_test.go b/metricbeat/module/etcd/leader/leader_test.go
index 990c0bca43f1..31dfa2e718ea 100644
--- a/metricbeat/module/etcd/leader/leader_test.go
+++ b/metricbeat/module/etcd/leader/leader_test.go
@@ -22,6 +22,7 @@ import (
 	"net/http"
 	"net/http/httptest"
 	"path/filepath"
+	"regexp"
 
 	"github.com/stretchr/testify/assert"
 
@@ -40,29 +41,94 @@ func TestEventMapping(t *testing.T) {
 }
 
 func TestFetchEventContent(t *testing.T) {
-	absPath, err := filepath.Abs("../_meta/test/")
-	assert.NoError(t, err)
 
-	response, err := ioutil.ReadFile(absPath + "/leaderstats.json")
-	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		w.WriteHeader(200)
-		w.Header().Set("Content-Type", "application/json;")
-		w.Write([]byte(response))
-	}))
-	defer server.Close()
-
-	config := map[string]interface{}{
-		"module":     "etcd",
-		"metricsets": []string{"leader"},
-		"hosts":      []string{server.URL},
-	}
+	const (
+		module              = "etcd"
+		metricset           = "leader"
+		mockedFetchLocation = "../_meta/test/"
+	)
 
-	f := mbtest.NewReportingMetricSetV2Error(t, config)
-	events, errs := mbtest.ReportingFetchV2Error(f)
-	if len(errs) > 0 {
-		t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
-	}
-	assert.NotEmpty(t, events)
+	var testcases = []struct {
+		name            string
+		mockedFetchFile string
+		httpCode        int
+
+		expectedFetchErrorRegexp string
+		expectedNumEvents        int
+	}{
+		{
+			name:              "Leader member stats",
+			mockedFetchFile:   "/leaderstats.json",
+			httpCode:          http.StatusOK,
+			expectedNumEvents: 1,
+		},
+		{
+			name:              "Follower member",
+			mockedFetchFile:   "/leaderstats_follower.json",
+			httpCode:          http.StatusForbidden,
+			expectedNumEvents: 0,
+		},
+		{
+			name:                     "Simulating credentials issue",
+			mockedFetchFile:          "/leaderstats_empty.json",
+			httpCode:                 http.StatusForbidden,
+			expectedFetchErrorRegexp: "fetching HTTP response returned status code 403",
+			expectedNumEvents:        0,
+		},
+		{
+			name:                     "Simulating failure message",
+			mockedFetchFile:          "/leaderstats_internalerror.json",
+			httpCode:                 http.StatusInternalServerError,
+			expectedFetchErrorRegexp: "fetching HTTP response returned status code 500:.+",
+			expectedNumEvents:        0,
+		}}
+
+	for _, tc := range testcases {
+		t.Run(tc.name, func(t *testing.T) {
+
+			absPath, err := filepath.Abs(mockedFetchLocation + tc.mockedFetchFile)
+			assert.NoError(t, err)
+
+			response, err := ioutil.ReadFile(absPath)
+			assert.NoError(t, err)
 
-	t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events[0])
+			server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(tc.httpCode)
+				w.Header().Set("Content-Type", "application/json;")
+				w.Write([]byte(response))
+			}))
+			defer server.Close()
+
+			config := map[string]interface{}{
+				"module":     module,
+				"metricsets": []string{metricset},
+				"hosts":      []string{server.URL},
+			}
+
+			f := mbtest.NewReportingMetricSetV2Error(t, config)
+			events, errs := mbtest.ReportingFetchV2Error(f)
+
+			if tc.expectedFetchErrorRegexp != "" {
+				for _, err := range errs {
+					if match, _ := regexp.MatchString(tc.expectedFetchErrorRegexp, err.Error()); match {
+						// found expected fetch error, no need for further checks
+						return
+					}
+				}
+				t.Fatalf("Expected fetch error not found:\n Expected:%s\n Got: %+v",
+					tc.expectedFetchErrorRegexp,
+					errs)
+			}
+
+			if len(errs) > 0 {
+				t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
+			}
+
+			assert.Equal(t, tc.expectedNumEvents, len(events))
+
+			for i := range events {
+				t.Logf("%s/%s event[%d]: %+v", f.Module().Name(), f.Name(), i, events[i])
+			}
+		})
+	}
 }

From a1f9882e2e203bc565d9b7ced56d3d08311d8f72 Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Thu, 2 May 2019 21:42:35 +0200
Subject: [PATCH 5/8] add debug message when skipping leader events from non
 leader members

---
 metricbeat/module/etcd/leader/leader.go | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go
index bfe59b09cdf6..1590e751750c 100644
--- a/metricbeat/module/etcd/leader/leader.go
+++ b/metricbeat/module/etcd/leader/leader.go
@@ -26,6 +26,8 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/elastic/beats/libbeat/common"
+	"github.com/elastic/beats/libbeat/logp"
+
 	"github.com/elastic/beats/metricbeat/helper"
 	"github.com/elastic/beats/metricbeat/mb"
 	"github.com/elastic/beats/metricbeat/mb/parse"
@@ -39,6 +41,8 @@ const (
 	// returned JSON management
 	msgElement        = "message"
 	msgValueNonLeader = "not current leader"
+
+	logSelector = "etcd.leader"
 )
 
 var (
@@ -55,11 +59,15 @@ func init() {
 	)
 }
 
+// Metricset for etcd.leader
 type MetricSet struct {
 	mb.BaseMetricSet
-	http *helper.HTTP
+	http         *helper.HTTP
+	logger       *logp.Logger
+	debugEnabled bool
 }
 
+// New etcd.leader metricset object
 func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
 	config := struct{}{}
 	if err := base.Module().UnpackConfig(&config); err != nil {
@@ -73,6 +81,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
 	return &MetricSet{
 		base,
 		http,
+		logp.NewLogger(logSelector),
+		logp.IsDebug(logSelector),
 	}, nil
 }
 
@@ -111,6 +121,9 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
 			// do not report events since this is not a leader
 			if res.StatusCode == http.StatusForbidden &&
 				retMessage == msgValueNonLeader {
+				if m.debugEnabled {
+					m.logger.Debugf("skipping event for non leader member %q", m.Host())
+				}
 				return nil
 			}
 

From 4b33792d5f827d172a61f40f109972f3dd0fac70 Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Thu, 2 May 2019 21:44:21 +0200
Subject: [PATCH 6/8] thanks @houndci-bot for pointing me to the right
 capitalization

---
 metricbeat/module/etcd/leader/leader.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/metricbeat/module/etcd/leader/leader.go b/metricbeat/module/etcd/leader/leader.go
index 1590e751750c..16a635554109 100644
--- a/metricbeat/module/etcd/leader/leader.go
+++ b/metricbeat/module/etcd/leader/leader.go
@@ -59,7 +59,7 @@ func init() {
 	)
 }
 
-// Metricset for etcd.leader
+// MetricSet for etcd.leader
 type MetricSet struct {
 	mb.BaseMetricSet
 	http         *helper.HTTP

From 37818fdc012c21becf2e789a9bfbff76ace3f87e Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Mon, 6 May 2019 16:24:54 +0200
Subject: [PATCH 7/8] add CHANGELOG entry

---
 CHANGELOG.next.asciidoc | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index 97cd4ab4d705..f3b2e8f83a03 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
 - Add Filebeat envoyproxy module. {pull}11700[11700]
 - Add apache2(httpd) log path (`/var/log/httpd`) to make apache2 module work out of the box on Redhat-family OSes. {issue}11887[11887] {pull}11888[11888]
 - Add support to new MongoDB additional diagnostic information {pull}11952[11952]
+- 
 
 *Heartbeat*
 
@@ -178,6 +179,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
 - Add check on object name in the counter path if the instance name is missing {issue}6528[6528] {pull}11878[11878]
 - Add AWS cloudwatch metricset. {pull}11798[11798] {issue}11734[11734]
 - Add `regions` in aws module config to specify target regions for querying cloudwatch metrics. {issue}11932[11932] {pull}11956[11956]
+- Keep `etcd` followers members from reporting `leader` metricset events {pull}12004[12004]
 
 *Packetbeat*
 

From 9942f7342d44568926c50d4b1950809a22508055 Mon Sep 17 00:00:00 2001
From: odacremolbap <odacremolbap@gmail.com>
Date: Mon, 6 May 2019 16:45:52 +0200
Subject: [PATCH 8/8] remove unwanted extra line

---
 CHANGELOG.next.asciidoc | 1 -
 1 file changed, 1 deletion(-)

diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index f3b2e8f83a03..2880a8408057 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -156,7 +156,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
 - Add Filebeat envoyproxy module. {pull}11700[11700]
 - Add apache2(httpd) log path (`/var/log/httpd`) to make apache2 module work out of the box on Redhat-family OSes. {issue}11887[11887] {pull}11888[11888]
 - Add support to new MongoDB additional diagnostic information {pull}11952[11952]
-- 
 
 *Heartbeat*