From 2a7f38edff1aed0f31d52b8c5d9b9073b372a9a6 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Mon, 1 Nov 2021 12:36:31 -0700 Subject: [PATCH] Add a ring-buffer reporter to libbeat Add a ring buffer reporter that when enabled will store configured namespaces in a buffer to allow operators to view recent metrics history. Defaults are to gather the stats namespace every 10s for 10m. Must be explicitly enabled, along with monitoring, and the HTTP endpoint. The buffer endpoint is intended to be used for diagnostics reporting. --- libbeat/api/server.go | 19 +++ libbeat/api/server_test.go | 36 ++++++ libbeat/cmd/instance/beat.go | 18 ++- libbeat/monitoring/monitoring.go | 15 +++ libbeat/monitoring/monitoring_test.go | 64 ++++++++++ libbeat/monitoring/report/buffer/buffer.go | 65 ++++++++++ .../monitoring/report/buffer/buffer_test.go | 84 +++++++++++++ libbeat/monitoring/report/buffer/config.go | 37 ++++++ libbeat/monitoring/report/buffer/reporter.go | 119 ++++++++++++++++++ 9 files changed, 456 insertions(+), 1 deletion(-) create mode 100644 libbeat/monitoring/monitoring_test.go create mode 100644 libbeat/monitoring/report/buffer/buffer.go create mode 100644 libbeat/monitoring/report/buffer/buffer_test.go create mode 100644 libbeat/monitoring/report/buffer/config.go create mode 100644 libbeat/monitoring/report/buffer/reporter.go diff --git a/libbeat/api/server.go b/libbeat/api/server.go index 0b7e6d022dc8..4bd279a880c2 100644 --- a/libbeat/api/server.go +++ b/libbeat/api/server.go @@ -72,6 +72,25 @@ func (s *Server) Stop() error { return s.l.Close() } +// AttachHandler will attach a handler at the specified route and return an error instead of panicing. +func (s *Server) AttachHandler(route string, h http.Handler) (err error) { + defer func() { + if r := recover(); r != nil { + switch r.(type) { + case error: + err = r.(error) + case string: + err = fmt.Errorf(r.(string)) + default: + err = fmt.Errorf("handle attempted to panic with %v", r) + } + } + }() + s.log.Infof("Attempting to attach %q to server.", route) + s.mux.Handle(route, h) + return +} + func parse(host string, port int) (string, string, error) { url, err := url.Parse(host) if err != nil { diff --git a/libbeat/api/server_test.go b/libbeat/api/server_test.go index e4df0f5b4e37..51117f5fa187 100644 --- a/libbeat/api/server_test.go +++ b/libbeat/api/server_test.go @@ -184,3 +184,39 @@ func simpleMux() *http.ServeMux { }) return mux } + +func TestAttachHandler(t *testing.T) { + url := "http://localhost:0" + + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "host": url, + }) + + s, err := New(nil, simpleMux(), cfg) + require.NoError(t, err) + go s.Start() + defer s.Stop() + + h := &testHandler{} + + err = s.AttachHandler("/test", h) + require.NoError(t, err) + + r, err := http.Get("http://" + s.l.Addr().String() + "/test") + require.NoError(t, err) + defer r.Body.Close() + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + + assert.Equal(t, "test!", string(body)) + + err = s.AttachHandler("/test", h) + assert.NotNil(t, err) +} + +type testHandler struct{} + +func (t *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "test!") +} diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 3e1da0f62fb3..bcd55499413a 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -63,6 +63,7 @@ import ( "github.com/elastic/beats/v7/libbeat/metric/system/host" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/monitoring/report" + "github.com/elastic/beats/v7/libbeat/monitoring/report/buffer" "github.com/elastic/beats/v7/libbeat/monitoring/report/log" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" @@ -106,6 +107,7 @@ type beatConfig struct { // beat internal components configurations HTTP *common.Config `config:"http"` HTTPPprof *common.Config `config:"http.pprof"` + BufferConfig *common.Config `config:"http.buffer"` Path paths.Path `config:"path"` Logging *common.Config `config:"logging"` MetricLogging *common.Config `config:"logging.metrics"` @@ -440,8 +442,9 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { // Start the API Server before the Seccomp lock down, we do this so we can create the unix socket // set the appropriate permission on the unix domain file without having to whitelist anything // that would be set at runtime. + var s *api.Server // buffer reporter may need to attach to the server. if b.Config.HTTP.Enabled() { - s, err := api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace) + s, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, monitoring.GetNamespace) if err != nil { return errw.Wrap(err, "could not start the HTTP server for the API") } @@ -477,6 +480,19 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { defer reporter.Stop() } + // only collect into a ring buffer if HTTP, and the ring buffer are explicitly enabled + if b.Config.HTTP.Enabled() && monitoring.IsBufferEnabled(b.Config.BufferConfig) { + buffReporter, err := buffer.MakeReporter(b.Info, b.Config.BufferConfig) + if err != nil { + return err + } + defer buffReporter.Stop() + + if err := s.AttachHandler("/buffer", buffReporter); err != nil { + return err + } + } + ctx, cancel := context.WithCancel(context.Background()) var stopBeat = func() { b.Instrumentation.Tracer().Close() diff --git a/libbeat/monitoring/monitoring.go b/libbeat/monitoring/monitoring.go index 2fb587510ee5..34afe1a12f28 100644 --- a/libbeat/monitoring/monitoring.go +++ b/libbeat/monitoring/monitoring.go @@ -108,3 +108,18 @@ func IsEnabled(monitoringCfg *common.Config) bool { return monitoringCfg.Enabled() } + +// IsBufferEnabled will check if the monitoring buffer is explicitly enabled. +func IsBufferEnabled(monitoringCfg *common.Config) bool { + if monitoringCfg == nil { + return false + } + fields := monitoringCfg.GetFields() + for _, field := range fields { + if field == "enabled" { + // default Enabled will return true, so we only return the value if it's defined. + return monitoringCfg.Enabled() + } + } + return false +} diff --git a/libbeat/monitoring/monitoring_test.go b/libbeat/monitoring/monitoring_test.go new file mode 100644 index 000000000000..e2b3623e7e2b --- /dev/null +++ b/libbeat/monitoring/monitoring_test.go @@ -0,0 +1,64 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitoring + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestIsBufferEnabled(t *testing.T) { + tests := []struct { + name string + input map[string]interface{} + expect bool + }{{ + name: "enabled", + input: map[string]interface{}{ + "enabled": true, + }, + expect: true, + }, { + name: "disabled", + input: map[string]interface{}{ + "enabled": false, + }, + expect: false, + }, { + name: "missing", + input: map[string]interface{}{ + "size": 10, + }, + expect: false, + }, { + name: "nil", + input: nil, + expect: false, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg, err := common.NewConfigFrom(tt.input) + require.NoError(t, err) + assert.Equal(t, tt.expect, IsBufferEnabled(cfg)) + }) + } +} diff --git a/libbeat/monitoring/report/buffer/buffer.go b/libbeat/monitoring/report/buffer/buffer.go new file mode 100644 index 000000000000..defd822b2470 --- /dev/null +++ b/libbeat/monitoring/report/buffer/buffer.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package buffer + +import "sync" + +// ringBuffer is a buffer with a fixed number of items that can be tracked. +// +// we assume that the size of the buffer is greater than one. +// the buffer should be thread-safe. +type ringBuffer struct { + entries []interface{} + i int + full bool + mu sync.Mutex +} + +// newBuffer returns a reference to a new ringBuffer with set size. +func newBuffer(size int) *ringBuffer { + return &ringBuffer{ + entries: make([]interface{}, size), + } +} + +// add will add the passed entry to the buffer. +func (r *ringBuffer) add(entry interface{}) { + r.mu.Lock() + defer r.mu.Unlock() + r.entries[r.i] = entry + r.i = (r.i + 1) % len(r.entries) + if r.i == 0 { + r.full = true + } +} + +// getAll returns all entries in the buffer in order +func (r *ringBuffer) getAll() []interface{} { + r.mu.Lock() + defer r.mu.Unlock() + if r.i == 0 && !r.full { + return []interface{}{} + } + if !r.full { + return r.entries[:r.i] + } + if r.full && r.i == 0 { + return r.entries + } + return append(r.entries[r.i:], r.entries[:r.i]...) +} diff --git a/libbeat/monitoring/report/buffer/buffer_test.go b/libbeat/monitoring/report/buffer/buffer_test.go new file mode 100644 index 000000000000..5c9252aa2e6b --- /dev/null +++ b/libbeat/monitoring/report/buffer/buffer_test.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package buffer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// TODO benchmarks? + +func Test_ringBuffer(t *testing.T) { + t.Run("Len 2 buffer", func(t *testing.T) { + r := newBuffer(2) + assert.Equal(t, 2, len(r.entries)) + assert.False(t, r.full) + assert.Equal(t, 0, r.i) + + assert.Empty(t, r.getAll()) + + r.add("1") + assert.False(t, r.full) + assert.Equal(t, 1, r.i) + assert.Equal(t, r.entries[0], "1") + assert.ElementsMatch(t, []string{"1"}, r.getAll()) + + r.add("2") + assert.True(t, r.full) + assert.Equal(t, 0, r.i) + assert.Equal(t, r.entries[1], "2") + assert.ElementsMatch(t, []string{"1", "2"}, r.getAll()) + + r.add("3") + assert.True(t, r.full) + assert.Equal(t, 1, r.i) + assert.Equal(t, r.entries[0], "3") + assert.ElementsMatch(t, []string{"2", "3"}, r.getAll()) + + r.add("4") + assert.True(t, r.full) + assert.Equal(t, 0, r.i) + assert.Equal(t, r.entries[1], "4") + assert.ElementsMatch(t, []string{"3", "4"}, r.getAll()) + }) + + t.Run("Len 3 buffer", func(t *testing.T) { + r := newBuffer(3) + assert.Empty(t, r.getAll()) + + r.add("1") + assert.ElementsMatch(t, []string{"1"}, r.getAll()) + + r.add("2") + assert.ElementsMatch(t, []string{"1", "2"}, r.getAll()) + + r.add("3") + assert.ElementsMatch(t, []string{"1", "2", "3"}, r.getAll()) + + r.add("4") + assert.ElementsMatch(t, []string{"2", "3", "4"}, r.getAll()) + + r.add("5") + assert.ElementsMatch(t, []string{"3", "4", "5"}, r.getAll()) + + r.add("6") + assert.ElementsMatch(t, []string{"4", "5", "6"}, r.getAll()) + }) +} diff --git a/libbeat/monitoring/report/buffer/config.go b/libbeat/monitoring/report/buffer/config.go new file mode 100644 index 000000000000..099ff7df50dc --- /dev/null +++ b/libbeat/monitoring/report/buffer/config.go @@ -0,0 +1,37 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package buffer + +import ( + "time" +) + +type config struct { + Period time.Duration `config:"period"` + Size int `config:"size" validate:"min=2"` + Namespaces []string `config:"namespaces"` +} + +// defaultConfig will gather 10m of data (every 10s) for the stats registry. +func defaultConfig() config { + return config{ + Period: 10 * time.Second, + Size: 60, + Namespaces: []string{"stats"}, + } +} diff --git a/libbeat/monitoring/report/buffer/reporter.go b/libbeat/monitoring/report/buffer/reporter.go new file mode 100644 index 000000000000..91dc1eac28d8 --- /dev/null +++ b/libbeat/monitoring/report/buffer/reporter.go @@ -0,0 +1,119 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package buffer + +import ( + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/monitoring" +) + +// reporter is a struct that will fill a ring buffer for each monitored registry. +type reporter struct { + config + wg sync.WaitGroup + done chan struct{} + registries map[string]*monitoring.Registry + + // ring buffers for namespaces + entries map[string]*ringBuffer +} + +// MakeReporter creates and starts a reporter with the given config. +func MakeReporter(beat beat.Info, cfg *common.Config) (*reporter, error) { + config := defaultConfig() + if cfg != nil { + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + } + + r := &reporter{ + config: config, + done: make(chan struct{}), + registries: map[string]*monitoring.Registry{}, + entries: map[string]*ringBuffer{}, + } + + for _, ns := range r.config.Namespaces { + reg := monitoring.GetNamespace(ns).GetRegistry() + r.registries[ns] = reg + r.entries[ns] = newBuffer(r.config.Size) + } + + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.snapshotLoop() + }() + return r, nil +} + +// Stop will stop the reporter from collecting new information. +// It will not clear any previously collected data. +func (r *reporter) Stop() { + close(r.done) + r.wg.Wait() + // Clear entries? +} + +// snapshotLoop will collect a snapshot for each monitored registry for the configured period and store them in the correct buffer. +func (r *reporter) snapshotLoop() { + ticker := time.NewTicker(r.config.Period) + defer ticker.Stop() + + for { + var ts time.Time + select { + case <-r.done: + return + case ts = <-ticker.C: + } + + for name, reg := range r.registries { + snap := monitoring.CollectStructSnapshot(reg, monitoring.Full, false) + if _, ok := snap["@timestamp"]; !ok { + snap["@timestamp"] = ts.UTC() + } + r.entries[name].add(snap) + } + } +} + +// ServeHTTP is an http.Handler that will respond with the monitored registries buffer's contents in JSON. +func (r *reporter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + resp := make(map[string][]interface{}, len(r.entries)) + for name, entries := range r.entries { + resp[name] = entries.getAll() + } + + p, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(500) + fmt.Fprintf(w, "Unable to encode JSON response: %v", err) + return + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.Write(p) +}