From 3310fe648494023c9bc5d95761922e2b25f29798 Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 3 Mar 2020 15:44:12 +0100 Subject: [PATCH 1/7] Remove global logger from outputs and monitoring related to #15699 --- heartbeat/monitors/active/dialchain/socks5.go | 3 +- libbeat/monitoring/adapter/go-metrics.go | 18 ++--- .../monitoring/report/elasticsearch/client.go | 24 +++--- .../report/elasticsearch/elasticsearch.go | 13 ++-- libbeat/outputs/console/console.go | 11 +-- libbeat/outputs/elasticsearch/api_test.go | 2 +- libbeat/outputs/elasticsearch/bulkapi.go | 11 +-- libbeat/outputs/elasticsearch/client.go | 77 ++++++++++--------- libbeat/outputs/elasticsearch/client_test.go | 20 ++--- .../outputs/elasticsearch/elasticsearch.go | 17 ++-- libbeat/outputs/fileout/file.go | 14 ++-- libbeat/outputs/kafka/client.go | 34 ++++---- libbeat/outputs/kafka/config.go | 6 +- libbeat/outputs/kafka/config_test.go | 3 +- libbeat/outputs/kafka/kafka.go | 10 +-- libbeat/outputs/kafka/log.go | 13 +++- libbeat/outputs/kafka/partition.go | 20 ++--- libbeat/outputs/kafka/partition_test.go | 3 +- libbeat/outputs/logstash/async.go | 18 +++-- libbeat/outputs/logstash/enc.go | 5 +- libbeat/outputs/logstash/logstash.go | 3 - libbeat/outputs/logstash/sync.go | 17 ++-- libbeat/outputs/redis/client.go | 29 +++---- libbeat/outputs/redis/redis.go | 3 - libbeat/outputs/transport/client.go | 10 ++- libbeat/outputs/transport/proxy.go | 6 +- libbeat/outputs/transport/tcp.go | 2 +- libbeat/outputs/transport/transport.go | 6 +- 28 files changed, 214 insertions(+), 184 deletions(-) diff --git a/heartbeat/monitors/active/dialchain/socks5.go b/heartbeat/monitors/active/dialchain/socks5.go index ce8987a0623..1ba3d098343 100644 --- a/heartbeat/monitors/active/dialchain/socks5.go +++ b/heartbeat/monitors/active/dialchain/socks5.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs/transport" ) @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer { return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) { var timer timer - dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next)) + dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next)) if err != nil { return nil, err } diff --git a/libbeat/monitoring/adapter/go-metrics.go b/libbeat/monitoring/adapter/go-metrics.go index 27fbfbf4aff..2e67520b658 100644 --- a/libbeat/monitoring/adapter/go-metrics.go +++ b/libbeat/monitoring/adapter/go-metrics.go @@ -42,6 +42,7 @@ import ( type GoMetricsRegistry struct { mutex sync.Mutex + log *logp.Logger reg *monitoring.Registry filters *metricFilters @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil if v == nil { return NewGoMetrics(parent, name, filters...) } - - reg := v.(*monitoring.Registry) - return &GoMetricsRegistry{ - reg: reg, - shadow: metrics.NewRegistry(), - filters: makeFilters(filters...), - } + return newGoMetrics(v.(*monitoring.Registry), filters...) } // NewGoMetrics creates and registers a new GoMetricsRegistry with the parent // registry. func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry { + return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar),filters...) +} + +func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry{ return &GoMetricsRegistry{ - reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar), + log: logp.NewLogger("monitoring"), + reg: reg, shadow: metrics.NewRegistry(), filters: makeFilters(filters...), } @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() { r.shadow.UnregisterAll() err := r.reg.Clear() if err != nil { - logp.Err("Failed to clear registry: %v", err) + r.log.Errorf("Failed to clear registry: %v", err) } } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index ca3aeda9566..ff3545db1cf 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -36,6 +36,7 @@ import ( var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0") type publishClient struct { + log *logp.Logger es *esout.Client params map[string]string format report.Format @@ -47,6 +48,7 @@ func newPublishClient( format report.Format, ) (*publishClient, error) { p := &publishClient{ + log: logp.NewLogger(selector), es: es, params: params, format: format, @@ -55,7 +57,7 @@ func newPublishClient( } func (c *publishClient) Connect() error { - debugf("Monitoring client: connect.") + c.log.Debug("Monitoring client: connect.") err := c.es.Connect() if err != nil { @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error { } if !resp.Features.Monitoring.Enabled { - debugf("XPack monitoring is disabled.") + c.log.Debug("XPack monitoring is disabled.") return errNoMonitoring } - debugf("XPack monitoring is enabled") + c.log.Debug("XPack monitoring is enabled") return nil } @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error { // Extract type t, err := event.Content.Meta.GetValue("type") if err != nil { - logp.Err("Type not available in monitoring reported. Please report this error: %s", err) + c.log.Errorf("Type not available in monitoring reported. Please report this error: %s", err) continue } typ, ok := t.(string) if !ok { - logp.Err("monitoring type is not a string") + c.log.Error("monitoring type is not a string") } var params = map[string]string{} @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { return err } - logBulkFailures(result, []report.Event{document}) + logBulkFailures(c.log, result, []report.Event{document}) return err } @@ -245,25 +247,25 @@ func getMonitoringIndexName() string { return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } -func logBulkFailures(result esout.BulkResult, events []report.Event) { +func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) { reader := esout.NewJSONReader(result) err := esout.BulkReadToItems(reader) if err != nil { - logp.Err("failed to parse monitoring bulk items: %v", err) + log.Errorf("failed to parse monitoring bulk items: %v", err) return } for i := range events { - status, msg, err := esout.BulkReadItemStatus(reader) + status, msg, err := esout.BulkReadItemStatus(log, reader) if err != nil { - logp.Err("failed to parse monitoring bulk item status: %v", err) + log.Errorf("failed to parse monitoring bulk item status: %v", err) return } switch { case status < 300, status == http.StatusConflict: continue default: - logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg) } } } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index b84ea9e1d67..86749d2a1d6 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -59,11 +59,8 @@ type reporter struct { out []outputs.NetworkClient } - const selector = "monitoring" -var debugf = logp.MakeDebug(selector) - var errNoMonitoring = errors.New("xpack monitoring not available") // default monitoring api parameters @@ -115,7 +112,7 @@ func defaultConfig(settings report.Settings) config { } func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) { - log := logp.L().Named(selector) + log := logp.NewLogger(selector) config := defaultConfig(settings) if err := cfg.Unpack(&config); err != nil { return nil, err @@ -237,8 +234,8 @@ func (r *reporter) Stop() { } func (r *reporter) initLoop(c config) { - debugf("Start monitoring endpoint init loop.") - defer debugf("Finish monitoring endpoint init loop.") + r.logger.Debug("Start monitoring endpoint init loop.") + defer r.logger.Debug("Finish monitoring endpoint init loop.") log := r.logger @@ -256,7 +253,7 @@ func (r *reporter) initLoop(c config) { log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying. Error: ", err) logged = true } - debugf("Monitoring could not connect to Elasticsearch, failed with %v", err) + r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %v", err) } select { @@ -294,7 +291,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry()) if snapshot == nil { - debugf("Empty snapshot.") + log.Debug("Empty snapshot.") continue } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 3df69f4e5ba..9a687af3f8a 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -34,6 +34,7 @@ import ( ) type console struct { + log *logp.Logger out *os.File observer outputs.Observer writer *bufio.Writer @@ -95,7 +96,7 @@ func makeConsole( } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { - c := &console{out: os.Stdout, codec: codec, observer: observer, index: index} + c := &console{log: logp.NewLogger("console"), out: os.Stdout, codec: codec, observer: observer, index: index} c.writer = bufio.NewWriterSize(c.out, 8*1024) return c, nil } @@ -132,20 +133,20 @@ func (c *console) publishEvent(event *publisher.Event) bool { return false } - logp.Critical("Unable to encode event: %v", err) - logp.Debug("console", "Failed event: %v", event) + c.log.Errorf("Unable to encode event: %v", err) + c.log.Debugf("Failed event: %v", event) return false } if err := c.writeBuffer(serializedEvent); err != nil { c.observer.WriteError(err) - logp.Critical("Unable to publish events to console: %v", err) + c.log.Errorf("Unable to publish events to console: %v", err) return false } if err := c.writeBuffer(nl); err != nil { c.observer.WriteError(err) - logp.Critical("Error when appending newline to event: %v", err) + c.log.Errorf("Error when appending newline to event: %v", err) return false } diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index 73eaa7708cc..e9bcd3e304a 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -188,7 +188,7 @@ func newTestClient(url string) *Client { func (r QueryResult) String() string { out, err := json.Marshal(r) if err != nil { - logp.Warn("failed to marshal QueryResult (%v): %#v", err, r) + logp.NewLogger(logSelector).Warnf("failed to marshal QueryResult (%v): %#v", err, r) return "ERROR" } return string(out) diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 1eccdb235bc..712f85a7abd 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -26,6 +26,7 @@ import ( "strings" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) // MetaBuilder creates meta data for bulk requests @@ -63,7 +64,7 @@ func (conn *Connection) BulkWith( enc := conn.encoder enc.Reset() - if err := bulkEncode(enc, metaBuilder, body); err != nil { + if err := bulkEncode(conn.log, enc, metaBuilder, body); err != nil { return nil, err } @@ -92,7 +93,7 @@ func (conn *Connection) SendMonitoringBulk( enc := conn.encoder enc.Reset() - if err := bulkEncode(enc, nil, body); err != nil { + if err := bulkEncode(conn.log, enc, nil, body); err != nil { return nil, err } @@ -204,11 +205,11 @@ func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, err return status, BulkResult(resp), err } -func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { +func bulkEncode(log *logp.Logger, out bulkWriter, metaBuilder MetaBuilder, body []interface{}) error { if metaBuilder == nil { for _, obj := range body { if err := out.AddRaw(obj); err != nil { - debugf("Failed to encode message: %s", err) + log.Debugf("Failed to encode message: %s", err) return err } } @@ -216,7 +217,7 @@ func bulkEncode(out bulkWriter, metaBuilder MetaBuilder, body []interface{}) err for _, obj := range body { meta := metaBuilder(obj) if err := out.Add(meta, obj); err != nil { - debugf("Failed to encode event (dropping event): %s", err) + log.Debugf("Failed to encode event (dropping event): %s", err) } } } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 9838f355dd5..c50cb4e606c 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -85,6 +85,8 @@ type ConnectCallback func(client *Client) error // Connection manages the connection for a given client. type Connection struct { + log *logp.Logger + URL string Username string Password string @@ -171,7 +173,8 @@ func NewClient( s.URL = u.String() } - logp.Info("Elasticsearch url: %s", s.URL) + log := logp.NewLogger(logSelector) + log.Infof("Elasticsearch url: %s", s.URL) // TODO: add socks5 proxy support var dialer, tlsDialer transport.Dialer @@ -206,6 +209,7 @@ func NewClient( client := &Client{ Connection: Connection{ + log: log, URL: s.URL, Username: s.Username, Password: s.Password, @@ -334,7 +338,7 @@ func (client *Client) publishEvents( } origCount := len(data) - data = bulkEncodePublishRequest(client.GetVersion(), body, client.index, client.pipeline, eventType, data) + data = bulkEncodePublishRequest(client.Connection.log, client.GetVersion(), body, client.index, client.pipeline, eventType, data) newCount := len(data) if st != nil && origCount > newCount { st.Dropped(origCount - newCount) @@ -347,11 +351,11 @@ func (client *Client) publishEvents( requ.Reset(body) status, result, sendErr := client.sendBulkRequest(requ) if sendErr != nil { - logp.Err("Failed to perform any bulk index operations: %s", sendErr) + client.Connection.log.Error("Failed to perform any bulk index operations: %s", sendErr) return data, sendErr } - debugf("PublishEvents: %d events have been published to elasticsearch in %v.", + client.Connection.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", len(data), time.Now().Sub(begin)) @@ -363,7 +367,7 @@ func (client *Client) publishEvents( stats.fails = len(failedEvents) } else { client.json.init(result) - failedEvents, stats = bulkCollectPublishFails(&client.json, data) + failedEvents, stats = bulkCollectPublishFails(client.Connection.log, &client.json, data) } failed := len(failedEvents) @@ -391,6 +395,7 @@ func (client *Client) publishEvents( // fillBulkRequest encodes all bulk requests and returns slice of events // successfully added to bulk request. func bulkEncodePublishRequest( + log *logp.Logger, version common.Version, body bulkWriter, index outputs.IndexSelector, @@ -401,14 +406,14 @@ func bulkEncodePublishRequest( okEvents := data[:0] for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(version, index, pipeline, eventType, event) + meta, err := createEventBulkMeta(log, version, index, pipeline, eventType, event) if err != nil { - logp.Err("Failed to encode event meta data: %s", err) + log.Errorf("Failed to encode event meta data: %s", err) continue } if err := body.Add(meta, event); err != nil { - logp.Err("Failed to encode event: %s", err) - logp.Debug("elasticsearch", "Failed event: %v", event) + log.Errorf("Failed to encode event: %s", err) + log.Debugf("Failed event: %v", event) continue } okEvents = append(okEvents, data[i]) @@ -417,6 +422,7 @@ func bulkEncodePublishRequest( } func createEventBulkMeta( + log *logp.Logger, version common.Version, indexSel outputs.IndexSelector, pipelineSel *outil.Selector, @@ -441,7 +447,7 @@ func createEventBulkMeta( if s, ok := tmp.(string); ok { id = s } else { - logp.Err("Event ID '%v' is no string value", id) + log.Errorf("Event ID '%v' is no string value", id) } } } @@ -480,11 +486,12 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. func bulkCollectPublishFails( + log *logp.Logger, reader *JSONReader, data []publisher.Event, ) ([]publisher.Event, bulkResultStats) { if err := BulkReadToItems(reader); err != nil { - logp.Err("failed to parse bulk response: %v", err.Error()) + log.Errorf("failed to parse bulk response: %v", err.Error()) return nil, bulkResultStats{} } @@ -492,7 +499,7 @@ func bulkCollectPublishFails( failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := BulkReadItemStatus(reader) + status, msg, err := BulkReadItemStatus(log, reader) if err != nil { return nil, bulkResultStats{} } @@ -514,13 +521,13 @@ func bulkCollectPublishFails( stats.tooMany++ } else { // hard failure, don't collect - logp.Warn("Cannot index event %#v (status=%v): %s", data[i], status, msg) + log.Warnf("Cannot index event %#v (status=%v): %s", data[i], status, msg) stats.nonIndexable++ continue } } - debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) + log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) stats.fails++ failed = append(failed, data[i]) } @@ -562,7 +569,7 @@ func BulkReadToItems(reader *JSONReader) error { } // BulkReadItemStatus reads the status and error fields from the bulk item -func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) { +func BulkReadItemStatus(log *logp.Logger, reader *JSONReader) (int, []byte, error) { // skip outer dictionary if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject @@ -571,38 +578,38 @@ func BulkReadItemStatus(reader *JSONReader) (int, []byte, error) { // find first field in outer dictionary (e.g. 'create') kind, _, err := reader.nextFieldName() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } if kind == dictEnd { err = errUnexpectedEmptyObject - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } // parse actual item response code and error message - status, msg, err := itemStatusInner(reader) + status, msg, err := itemStatusInner(log, reader) if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } // close dictionary. Expect outer dictionary to have only one element kind, _, err = reader.step() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } if kind != dictEnd { err = errExpectedObjectEnd - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } return status, msg, nil } -func itemStatusInner(reader *JSONReader) (int, []byte, error) { +func itemStatusInner(log *logp.Logger, reader *JSONReader) (int, []byte, error) { if err := reader.ExpectDict(); err != nil { return 0, nil, errExpectedItemObject } @@ -612,7 +619,7 @@ func itemStatusInner(reader *JSONReader) (int, []byte, error) { for { kind, name, err := reader.nextFieldName() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) } if kind == dictEnd { break @@ -622,7 +629,7 @@ func itemStatusInner(reader *JSONReader) (int, []byte, error) { case bytes.Equal(name, nameStatus): // name == "status" status, err = reader.nextInt() if err != nil { - logp.Err("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } @@ -715,7 +722,7 @@ func (conn *Connection) Connect() error { } if version, err := common.NewVersion(versionString); err != nil { - logp.Err("Invalid version from Elasticsearch: %v", versionString) + conn.log.Errorf("Invalid version from Elasticsearch: %v", versionString) conn.version = common.Version{} } else { conn.version = *version @@ -730,11 +737,11 @@ func (conn *Connection) Connect() error { // Ping sends a GET request to the Elasticsearch. func (conn *Connection) Ping() (string, error) { - debugf("ES Ping(url=%v)", conn.URL) + conn.log.Debugf("ES Ping(url=%v)", conn.URL) status, body, err := conn.execRequest("GET", conn.URL, nil) if err != nil { - debugf("Ping request failed with: %v", err) + conn.log.Debugf("Ping request failed with: %v", err) return "", err } @@ -753,8 +760,8 @@ func (conn *Connection) Ping() (string, error) { return "", fmt.Errorf("Failed to parse JSON response: %v", err) } - debugf("Ping status code: %v", status) - logp.Info("Attempting to connect to Elasticsearch version %s", response.Version.Number) + conn.log.Debugf("Ping status code: %v", status) + conn.log.Infof("Attempting to connect to Elasticsearch version %s", response.Version.Number) return response.Version.Number, nil } @@ -772,7 +779,7 @@ func (conn *Connection) Request( ) (int, []byte, error) { url := addToURL(conn.URL, path, pipeline, params) - debugf("%s %s %s %v", method, url, pipeline, body) + conn.log.Debugf("%s %s %s %v", method, url, pipeline, body) return conn.RequestURL(method, url, body) } @@ -788,7 +795,7 @@ func (conn *Connection) RequestURL( } if err := conn.encoder.Marshal(body); err != nil { - logp.Warn("Failed to json encode body (%v): %#v", err, body) + conn.log.Warnf("Failed to json encode body (%v): %#v", err, body) return 0, nil, ErrJSONEncodeFailed } return conn.execRequest(method, url, conn.encoder.Reader()) @@ -800,7 +807,7 @@ func (conn *Connection) execRequest( ) (int, []byte, error) { req, err := http.NewRequest(method, url, body) if err != nil { - logp.Warn("Failed to create request %+v", err) + conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err } if body != nil { @@ -836,7 +843,7 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) if err != nil { return 0, nil, err } - defer closing(resp.Body) + defer closing(conn.log, resp.Body) status := resp.StatusCode obj, err := ioutil.ReadAll(resp.Body) @@ -858,9 +865,9 @@ func (conn *Connection) GetVersion() common.Version { return conn.version } -func closing(c io.Closer) { +func closing(log *logp.Logger, c io.Closer) { err := c.Close() if err != nil { - logp.Warn("Close failed with: %v", err) + log.Warnf("Close failed with: %v", err) } } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 7a0ac0e6824..9cabf8f9a6f 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -42,7 +42,7 @@ import ( func readStatusItem(in []byte) (int, string, error) { reader := NewJSONReader(in) - code, msg, err := BulkReadItemStatus(reader) + code, msg, err := BulkReadItemStatus(logp.L(), reader) return code, string(msg), err } @@ -103,7 +103,7 @@ func TestCollectPublishFailsNone(t *testing.T) { } reader := NewJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 0, len(res)) } @@ -121,7 +121,7 @@ func TestCollectPublishFailMiddle(t *testing.T) { events := []publisher.Event{event, eventFail, event} reader := NewJSONReader(response) - res, stats := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) @@ -142,7 +142,7 @@ func TestCollectPublishFailAll(t *testing.T) { events := []publisher.Event{event, event, event} reader := NewJSONReader(response) - res, stats := bulkCollectPublishFails(reader, events) + res, stats := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -184,7 +184,7 @@ func TestCollectPipelinePublishFail(t *testing.T) { events := []publisher.Event{event} reader := NewJSONReader(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } @@ -204,7 +204,7 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 0 { b.Fail() } @@ -227,7 +227,7 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 1 { b.Fail() } @@ -249,7 +249,7 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { reader := NewJSONReader(nil) for i := 0; i < b.N; i++ { reader.init(response) - res, _ := bulkCollectPublishFails(reader, events) + res, _ := bulkCollectPublishFails(logp.L(), reader, events) if len(res) != 3 { b.Fail() } @@ -390,7 +390,7 @@ func TestBulkEncodeEvents(t *testing.T) { recorder := &testBulkRecorder{} - encoded := bulkEncodePublishRequest(common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events) + encoded := bulkEncodePublishRequest(logp.L(), common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.False(t, recorder.inAction, "incomplete bulk") @@ -496,7 +496,7 @@ func TestBulkReadItemStatus(t *testing.T) { response := []byte(`{"create": {"status": 200}}`) reader := NewJSONReader(response) - code, _, err := BulkReadItemStatus(reader) + code, _, err := BulkReadItemStatus(logp.L(), reader) assert.NoError(t, err) assert.Equal(t, 200, code) } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 0cdad25cbe8..d2cc2170789 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -37,9 +37,6 @@ func init() { outputs.RegisterType("elasticsearch", makeES) } -var ( - debugf = logp.MakeDebug("elasticsearch") -) var ( // ErrNotConnected indicates failure due to client having no valid connection @@ -52,6 +49,8 @@ var ( ErrResponseRead = errors.New("bulk item status parse failed") ) +const logSelector = "elasticsearch" + // Callbacks must not depend on the result of a previous one, // because the ordering is not fixed. type callbacksRegistry struct { @@ -140,6 +139,7 @@ func makeES( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { + log := logp.NewLogger(logSelector) if !cfg.HasField("bulk_max_size") { cfg.SetInt("bulk_max_size", -1, defaultBulkSize) } @@ -171,7 +171,7 @@ func makeES( return outputs.Fail(err) } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } } @@ -184,7 +184,7 @@ func makeES( for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - logp.Err("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %v", host, err) return outputs.Fail(err) } @@ -258,7 +258,7 @@ func NewConnectedClient(cfg *common.Config) (*Client, error) { for _, client := range clients { err = client.Connect() if err != nil { - logp.Err("Error connecting to Elasticsearch at %v: %v", client.Connection.URL, err) + client.Connection.log.Errorf("Error connecting to Elasticsearch at %v: %v", client.Connection.URL, err) err = fmt.Errorf("Error connection to Elasticsearch %v: %v", client.Connection.URL, err) errors = append(errors, err.Error()) continue @@ -289,6 +289,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { return nil, err } + log := logp.NewLogger(logSelector) var proxyURL *url.URL if !config.ProxyDisable { proxyURL, err = parseProxyURL(config.ProxyURL) @@ -296,7 +297,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { return nil, err } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } } @@ -309,7 +310,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { for _, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - logp.Err("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %v", host, err) return nil, err } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 5080f9b87e8..7e4a4b36821 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,6 +35,7 @@ func init() { } type fileOutput struct { + log *logp.Logger filePath string beat beat.Info observer outputs.Observer @@ -58,6 +59,7 @@ func makeFileout( cfg.SetInt("bulk_max_size", -1, -1) fo := &fileOutput{ + log: logp.NewLogger("file"), beat: beat, observer: observer, } @@ -95,7 +97,7 @@ func (out *fileOutput) init(beat beat.Info, c config) error { return err } - logp.Info("Initialized file output. "+ + out.log.Infof("Initialized file output. "+ "path=%v max_size_bytes=%v max_backups=%v permissions=%v", path, c.RotateEveryKb*1024, c.NumberOfFiles, os.FileMode(c.Permissions)) @@ -123,11 +125,11 @@ func (out *fileOutput) Publish( serializedEvent, err := out.codec.Encode(out.beat.Beat, &event.Content) if err != nil { if event.Guaranteed() { - logp.Critical("Failed to serialize the event: %v", err) + out.log.Errorf("Failed to serialize the event: %v", err) } else { - logp.Warn("Failed to serialize the event: %v", err) + out.log.Warnf("Failed to serialize the event: %v", err) } - logp.Debug("file", "Failed event: %v", event) + out.log.Debugf( "Failed event: %v", event) dropped++ continue @@ -137,9 +139,9 @@ func (out *fileOutput) Publish( st.WriteError(err) if event.Guaranteed() { - logp.Critical("Writing event to file failed with: %v", err) + out.log.Errorf("Writing event to file failed with: %v", err) } else { - logp.Warn("Writing event to file failed with: %v", err) + out.log.Warnf("Writing event to file failed with: %v", err) } dropped++ diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index e5ab6f2ae85..b3b43839e2b 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -37,6 +37,7 @@ import ( ) type client struct { + log *logp.Logger observer outputs.Observer hosts []string topic outil.Selector @@ -75,6 +76,7 @@ func newKafkaClient( cfg *sarama.Config, ) (*client, error) { c := &client{ + log: logp.NewLogger(logSelector), observer: observer, hosts: hosts, topic: topic, @@ -90,12 +92,12 @@ func (c *client) Connect() error { c.mux.Lock() defer c.mux.Unlock() - debugf("connect: %v", c.hosts) + c.log.Debugf("connect: %v", c.hosts) // try to connect producer, err := sarama.NewAsyncProducer(c.hosts, &c.config) if err != nil { - logp.Err("Kafka connect fails with: %v", err) + c.log.Errorf("Kafka connect fails with: %v", err) return err } @@ -111,7 +113,7 @@ func (c *client) Connect() error { func (c *client) Close() error { c.mux.Lock() defer c.mux.Unlock() - debugf("closed kafka client") + c.log.Debug("closed kafka client") // producer was not created before the close() was called. if c.producer == nil { @@ -141,7 +143,7 @@ func (c *client) Publish(batch publisher.Batch) error { d := &events[i] msg, err := c.getEventMessage(d) if err != nil { - logp.Err("Dropping event: %v", err) + c.log.Errorf("Dropping event: %v", err) ref.done() c.observer.Dropped(1) continue @@ -165,8 +167,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err := data.Cache.GetValue("partition") if err == nil { - if logp.IsDebug(debugSelector) { - debugf("got event.Meta[\"partition\"] = %v", value) + if c.log.IsDebug(){ + c.log.Debugf("got event.Meta[\"partition\"] = %v", value) } if partition, ok := value.(int32); ok { msg.partition = partition @@ -175,8 +177,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err = data.Cache.GetValue("topic") if err == nil { - if logp.IsDebug(debugSelector) { - debugf("got event.Meta[\"topic\"] = %v", value) + if c.log.IsDebug(){ + c.log.Debugf("got event.Meta[\"topic\"] = %v", value) } if topic, ok := value.(string); ok { msg.topic = topic @@ -199,8 +201,8 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { serializedEvent, err := c.codec.Encode(c.index, event) if err != nil { - if logp.IsDebug(debugSelector) { - debugf("failed event: %v", event) + if c.log.IsDebug(){ + c.log.Debugf("failed event: %v", event) } return nil, err } @@ -225,7 +227,7 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { defer c.wg.Done() - defer debugf("Stop kafka ack worker") + defer c.log.Debug("Stop kafka ack worker") for libMsg := range ch { msg := libMsg.Metadata.(*message) @@ -235,7 +237,7 @@ func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) { func (c *client) errorWorker(ch <-chan *sarama.ProducerError) { defer c.wg.Done() - defer debugf("Stop kafka error handler") + defer c.log.Debug("Stop kafka error handler") for errMsg := range ch { msg := errMsg.Msg.Metadata.(*message) @@ -250,11 +252,11 @@ func (r *msgRef) done() { func (r *msgRef) fail(msg *message, err error) { switch err { case sarama.ErrInvalidMessage: - logp.Err("Kafka (topic=%v): dropping invalid message", msg.topic) + r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) r.client.observer.Dropped(1) case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize: - logp.Err("Kafka (topic=%v): dropping too large message of size %v.", + r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", msg.topic, len(msg.key)+len(msg.value)) r.client.observer.Dropped(1) @@ -272,7 +274,7 @@ func (r *msgRef) dec() { return } - debugf("finished kafka batch") + r.client.log.Debug("finished kafka batch") stats := r.client.observer err := r.err @@ -286,7 +288,7 @@ func (r *msgRef) dec() { stats.Acked(success) } - debugf("Kafka publish failed with: %v", err) + r.client.log.Debugf("Kafka publish failed with: %v", err) } else { r.batch.ACK() stats.Acked(r.total) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index d663b7a6113..9f108eb0e3d 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -186,8 +186,8 @@ func (c *kafkaConfig) Validate() error { return nil } -func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { - partitioner, err := makePartitioner(config.Partition) +func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, error) { + partitioner, err := makePartitioner(log, config.Partition) if err != nil { return nil, err } @@ -283,7 +283,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { ) if err := k.Validate(); err != nil { - logp.Err("Invalid kafka configuration: %v", err) + log.Errorf("Invalid kafka configuration: %v", err) return nil, err } return k, nil diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index ee404666e14..8d81e5b112f 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestConfigAcceptValid(t *testing.T) { @@ -45,7 +46,7 @@ func TestConfigAcceptValid(t *testing.T) { if err != nil { t.Fatalf("Can not create test configuration: %v", err) } - if _, err := newSaramaConfig(cfg); err != nil { + if _, err := newSaramaConfig(logp.L(), cfg); err != nil { t.Fatalf("Failure creating sarama config: %v", err) } }) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index a84d9790b2b..9f71edb8a68 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -38,10 +38,9 @@ const ( // not return ErrTempBulkFailure defaultMaxWaitRetry = 60 * time.Second - debugSelector = "kafka" + logSelector = "kafka" ) -var debugf = logp.MakeDebug(debugSelector) var ( errNoTopicSet = errors.New("No topic configured") @@ -49,7 +48,7 @@ var ( ) func init() { - sarama.Logger = kafkaLogger{} + sarama.Logger = kafkaLogger{log:logp.NewLogger(logSelector)} outputs.RegisterType("kafka", makeKafka) } @@ -60,7 +59,8 @@ func makeKafka( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { - debugf("initialize kafka output") + log := logp.NewLogger(logSelector) + log.Debug("initialize kafka output") config, err := readConfig(cfg) if err != nil { @@ -77,7 +77,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(config) + libCfg, err := newSaramaConfig(log, config) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/log.go b/libbeat/outputs/kafka/log.go index 11da0e377af..81a8dd0fba1 100644 --- a/libbeat/outputs/kafka/log.go +++ b/libbeat/outputs/kafka/log.go @@ -23,7 +23,9 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type kafkaLogger struct{} +type kafkaLogger struct{ + log *logp.Logger +} func (kl kafkaLogger) Print(v ...interface{}) { kl.Log("kafka message: %v", v...) @@ -37,7 +39,7 @@ func (kl kafkaLogger) Println(v ...interface{}) { kl.Log("kafka message: %v", v...) } -func (kafkaLogger) Log(format string, v ...interface{}) { +func (kl kafkaLogger) Log(format string, v ...interface{}) { warn := false for _, val := range v { if err, ok := val.(sarama.KError); ok { @@ -47,9 +49,12 @@ func (kafkaLogger) Log(format string, v ...interface{}) { } } } + if kl.log == nil{ + kl.log = logp.NewLogger(logSelector) + } if warn { - logp.Warn(format, v...) + kl.log.Warnf(format, v...) } else { - logp.Info(format, v...) + kl.log.Infof(format, v...) } } diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go index 399c6d9de96..57bb632b247 100644 --- a/libbeat/outputs/kafka/partition.go +++ b/libbeat/outputs/kafka/partition.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type partitionBuilder func(*common.Config) (func() partitioner, error) +type partitionBuilder func(*logp.Logger, *common.Config) (func() partitioner, error) type partitioner func(*message, int32) (int32, error) @@ -45,9 +45,10 @@ type messagePartitioner struct { } func makePartitioner( + log *logp.Logger, partition map[string]*common.Config, ) (sarama.PartitionerConstructor, error) { - mkStrategy, reachable, err := initPartitionStrategy(partition) + mkStrategy, reachable, err := initPartitionStrategy(log, partition) if err != nil { return nil, err } @@ -67,6 +68,7 @@ var partitioners = map[string]partitionBuilder{ } func initPartitionStrategy( + log *logp.Logger, partition map[string]*common.Config, ) (func() partitioner, bool, error) { if len(partition) == 0 { @@ -90,7 +92,7 @@ func initPartitionStrategy( if mk == nil { return nil, false, fmt.Errorf("unknown kafka partition mode %v", name) } - constr, err := mk(config) + constr, err := mk(log, config) if err != nil { return nil, false, err } @@ -136,7 +138,7 @@ func (p *messagePartitioner) Partition( return msg.partition, nil } -func cfgRandomPartitioner(config *common.Config) (func() partitioner, error) { +func cfgRandomPartitioner(_ *logp.Logger, config *common.Config) (func() partitioner, error) { cfg := struct { GroupEvents int `config:"group_events" validate:"min=1"` }{ @@ -163,7 +165,7 @@ func cfgRandomPartitioner(config *common.Config) (func() partitioner, error) { }, nil } -func cfgRoundRobinPartitioner(config *common.Config) (func() partitioner, error) { +func cfgRoundRobinPartitioner(_ *logp.Logger, config *common.Config) (func() partitioner, error) { cfg := struct { GroupEvents int `config:"group_events" validate:"min=1"` }{ @@ -191,7 +193,7 @@ func cfgRoundRobinPartitioner(config *common.Config) (func() partitioner, error) }, nil } -func cfgHashPartitioner(config *common.Config) (func() partitioner, error) { +func cfgHashPartitioner(log *logp.Logger,config *common.Config) (func() partitioner, error) { cfg := struct { Hash []string `config:"hash"` Random bool `config:"random"` @@ -207,7 +209,7 @@ func cfgHashPartitioner(config *common.Config) (func() partitioner, error) { } return func() partitioner { - return makeFieldsHashPartitioner(cfg.Hash, !cfg.Random) + return makeFieldsHashPartitioner(log, cfg.Hash, !cfg.Random) }, nil } @@ -235,7 +237,7 @@ func makeHashPartitioner() partitioner { } } -func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { +func makeFieldsHashPartitioner(log *logp.Logger, fields []string, dropFail bool) partitioner { generator := rand.New(rand.NewSource(rand.Int63())) hasher := fnv.New32a() @@ -254,7 +256,7 @@ func makeFieldsHashPartitioner(fields []string, dropFail bool) partitioner { if err != nil { if dropFail { - logp.Err("Hashing partition key failed: %v", err) + log.Errorf("Hashing partition key failed: %v", err) return -1, err } diff --git a/libbeat/outputs/kafka/partition_test.go b/libbeat/outputs/kafka/partition_test.go index 67ea36444e7..22711117ec7 100644 --- a/libbeat/outputs/kafka/partition_test.go +++ b/libbeat/outputs/kafka/partition_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -196,7 +197,7 @@ func TestPartitioners(t *testing.T) { continue } - constr, err := makePartitioner(pcfg.Partition) + constr, err := makePartitioner(logp.L(), pcfg.Partition) if err != nil { t.Error(err) continue diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b3357f72589..79b89c19ec8 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -33,6 +33,7 @@ import ( ) type asyncClient struct { + log *logp.Logger *transport.Client observer outputs.Observer client *v2.AsyncClient @@ -59,7 +60,10 @@ func newAsyncClient( observer outputs.Observer, config *Config, ) (*asyncClient, error) { + + log := logp.NewLogger("logstash") c := &asyncClient{ + log: log, Client: conn, observer: observer, } @@ -69,10 +73,10 @@ func newAsyncClient( } if config.TTL != 0 { - logp.Warn(`The async Logstash client does not support the "ttl" option`) + log.Warn(`The async Logstash client does not support the "ttl" option`) } - enc := makeLogstashEventEncoder(beat, config.EscapeHTML, config.Index) + enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index) queueSize := config.Pipelining - 1 timeout := config.Timeout @@ -112,7 +116,7 @@ func makeClientFactory( } func (c *asyncClient) Connect() error { - logp.Debug("logstash", "connect") + c.log.Debug("connect") return c.connect() } @@ -120,7 +124,7 @@ func (c *asyncClient) Close() error { c.mutex.Lock() defer c.mutex.Unlock() - logp.Debug("logstash", "close connection") + c.log.Debug("close connection") if c.client != nil { err := c.client.Close() @@ -164,7 +168,7 @@ func (c *asyncClient) Publish(batch publisher.Batch) error { n, err = c.publishWindowed(ref, events) } - debugf("%v events out of %v events sent to logstash host %s. Continue sending", + c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending", n, len(events), c.Host()) events = events[n:] @@ -188,7 +192,7 @@ func (c *asyncClient) publishWindowed( batchSize := len(events) windowSize := c.win.get() - debugf("Try to publish %v events to logstash host %s with window size %v", + c.log.Debugf("Try to publish %v events to logstash host %s with window size %v", batchSize, c.Host(), windowSize) // prepare message payload @@ -272,5 +276,5 @@ func (r *msgRef) dec() { } r.batch.RetryEvents(r.slice) - logp.Err("Failed to publish events caused by: %v", err) + r.client.log.Errorf("Failed to publish events caused by: %v", err) } diff --git a/libbeat/outputs/logstash/enc.go b/libbeat/outputs/logstash/enc.go index 747a7d3fd5b..4e44d836d43 100644 --- a/libbeat/outputs/logstash/enc.go +++ b/libbeat/outputs/logstash/enc.go @@ -21,10 +21,11 @@ import ( "strings" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs/codec/json" ) -func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) func(interface{}) ([]byte, error) { +func makeLogstashEventEncoder(log *logp.Logger, info beat.Info, escapeHTML bool, index string) func(interface{}) ([]byte, error) { enc := json.New(info.Version, json.Config{ Pretty: false, EscapeHTML: escapeHTML, @@ -33,7 +34,7 @@ func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) fun return func(event interface{}) (d []byte, err error) { d, err = enc.Encode(index, event.(*beat.Event)) if err != nil { - debugf("Failed to encode event: %v", event) + log.Debugf("Failed to encode event: %v", event) } return } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index b2c3bb95565..8202a0ceff7 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/transport" ) @@ -32,8 +31,6 @@ const ( defaultPort = 5044 ) -var debugf = logp.MakeDebug("logstash") - func init() { outputs.RegisterType("logstash", makeLogstash) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index cd37e0bbb31..85497ae2059 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -29,6 +29,7 @@ import ( ) type syncClient struct { + log *logp.Logger *transport.Client client *v2.SyncClient observer outputs.Observer @@ -43,7 +44,9 @@ func newSyncClient( observer outputs.Observer, config *Config, ) (*syncClient, error) { + log := logp.NewLogger("logstash") c := &syncClient{ + log: log, Client: conn, observer: observer, ttl: config.TTL, @@ -57,7 +60,7 @@ func newSyncClient( } var err error - enc := makeLogstashEventEncoder(beat, config.EscapeHTML, config.Index) + enc := makeLogstashEventEncoder(log, beat, config.EscapeHTML, config.Index) c.client, err = v2.NewSyncClientWithConn(conn, v2.JSONEncoder(enc), v2.Timeout(config.Timeout), @@ -71,7 +74,7 @@ func newSyncClient( } func (c *syncClient) Connect() error { - logp.Debug("logstash", "connect") + c.log.Debug("connect") err := c.Client.Connect() if err != nil { return err @@ -87,13 +90,13 @@ func (c *syncClient) Close() error { if c.ticker != nil { c.ticker.Stop() } - logp.Debug("logstash", "close connection") + c.log.Debug("close connection") return c.Client.Close() } func (c *syncClient) reconnect() error { if err := c.Client.Close(); err != nil { - logp.Err("error closing connection to logstash host %s: %s, reconnecting...", c.Host(), err) + c.log.Errorf("error closing connection to logstash host %s: %s, reconnecting...", c.Host(), err) } return c.Client.Connect() } @@ -138,7 +141,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { n, err = c.publishWindowed(events) } - debugf("%v events out of %v events sent to logstash host %s. Continue sending", + c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending", n, len(events), c.Host()) events = events[n:] @@ -152,7 +155,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { } _ = c.Close() - logp.Err("Failed to publish events caused by: %v", err) + c.log.Errorf("Failed to publish events caused by: %v", err) rest := len(events) st.Failed(rest) @@ -168,7 +171,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) { batchSize := len(events) windowSize := c.win.get() - debugf("Try to publish %v events to logstash host %s with window size %v", + c.log.Debugf("Try to publish %v events to logstash host %s with window size %v", batchSize, c.Host(), windowSize) // prepare message payload diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index df1c3b91a59..0205b3aa241 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -46,6 +46,7 @@ type publishFn func( ) ([]publisher.Event, error) type client struct { + log *logp.Logger *transport.Client observer outputs.Observer index string @@ -74,6 +75,7 @@ func newClient( index string, codec codec.Codec, ) *client { return &client{ + log: logp.NewLogger("redis"), Client: tc, observer: observer, timeout: timeout, @@ -87,7 +89,7 @@ func newClient( } func (c *client) Connect() error { - debugf("connect") + c.log.Debug("connect") err := c.Client.Connect() if err != nil { return err @@ -128,7 +130,7 @@ func initRedisConn(c redis.Conn, pwd string, db int) error { } func (c *client) Close() error { - debugf("close connection") + c.log.Debug("close connection") return c.Client.Close() } @@ -224,7 +226,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { args := make([]interface{}, 1, len(data)+1) args[0] = dest - okEvents, args := serializeEvents(args, 1, data, c.index, c.codec) + okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil @@ -233,7 +235,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { // RPUSH returns total length of list -> fail and retry all on error _, err := conn.Do(command, args...) if err != nil { - logp.Err("Failed to %v to redis list with: %v", command, err) + c.log.Errorf("Failed to %v to redis list with: %v", command, err) return okEvents, err } @@ -247,7 +249,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF return func(key outil.Selector, data []publisher.Event) ([]publisher.Event, error) { var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) - okEvents, serialized = serializeEvents(serialized, 0, data, c.index, c.codec) + okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec) c.observer.Dropped(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil @@ -258,14 +260,14 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF for i, serializedEvent := range serialized { eventKey, err := key.Select(&okEvents[i].Content) if err != nil { - logp.Err("Failed to set redis key: %v", err) + c.log.Errorf("Failed to set redis key: %v", err) dropped++ continue } data = append(data, okEvents[i]) if err := conn.Send(command, eventKey, serializedEvent); err != nil { - logp.Err("Failed to execute %v: %v", command, err) + c.log.Errorf("Failed to execute %v: %v", command, err) return okEvents, err } } @@ -281,12 +283,12 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF _, err := conn.Receive() if err != nil { if _, ok := err.(redis.Error); ok { - logp.Err("Failed to %v event to list with %v", + c.log.Errorf("Failed to %v event to list with %v", command, err) failed = append(failed, data[i]) lastErr = err } else { - logp.Err("Failed to %v multiple events to list with %v", + c.log.Errorf("Failed to %v multiple events to list with %v", command, err) failed = append(failed, data[i:]...) lastErr = err @@ -301,6 +303,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF } func serializeEvents( + log *logp.Logger, to []interface{}, i int, data []publisher.Event, @@ -312,8 +315,8 @@ func serializeEvents( for _, d := range data { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - logp.Err("Encoding event failed with error: %v", err) - logp.Debug("redis", "Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %v", err) + log.Debugf("Failed event: %v", d.Content) goto failLoop } @@ -330,8 +333,8 @@ failLoop: for _, d := range rest { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - logp.Err("Encoding event failed with error: %v", err) - logp.Debug("redis", "Failed event: %v", d.Content) + log.Errorf("Encoding event failed with error: %v", err) + log.Debugf("Failed event: %v", d.Content) i++ continue } diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index 7c29568123e..05e57f7ab57 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/outputs/outil" @@ -39,8 +38,6 @@ type redisOut struct { beat beat.Info } -var debugf = logp.MakeDebug("redis") - const ( defaultWaitRetry = 1 * time.Second defaultMaxWaitRetry = 60 * time.Second diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index 338801b444d..16bdee70cf5 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -23,10 +23,12 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/testing" ) type Client struct { + log *logp.Logger dialer Dialer network string host string @@ -46,7 +48,7 @@ type Config struct { func MakeDialer(c *Config) (Dialer, error) { var err error dialer := NetDialer(c.Timeout) - dialer, err = ProxyDialer(c.Proxy, dialer) + dialer, err = ProxyDialer(logp.NewLogger(logSelector), c.Proxy, dialer) if err != nil { return nil, err } @@ -91,6 +93,7 @@ func NewClientWithDialer(d Dialer, c *Config, network, host string, defaultPort } client := &Client{ + log: logp.NewLogger(logSelector), dialer: d, network: network, host: host, @@ -128,7 +131,7 @@ func (c *Client) Close() error { defer c.mutex.Unlock() if c.conn != nil { - debugf("closing") + c.log.Debug("closing") err := c.conn.Close() c.conn = nil return err @@ -215,7 +218,8 @@ func (c *Client) SetWriteDeadline(t time.Time) error { func (c *Client) handleError(err error) error { if err != nil { - debugf("handle error: %v", err) + + c.log.Debugf("handle error: %v", err) if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) { _ = c.Close() diff --git a/libbeat/outputs/transport/proxy.go b/libbeat/outputs/transport/proxy.go index a755a914a67..18aa9bc6c8d 100644 --- a/libbeat/outputs/transport/proxy.go +++ b/libbeat/outputs/transport/proxy.go @@ -53,7 +53,7 @@ func (c *ProxyConfig) Validate() error { return nil } -func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { +func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, error) { if config == nil || config.URL == "" { return forward, nil } @@ -67,7 +67,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { return nil, err } - logp.Info("proxy host: '%s'", url.Host) + log.Infof("proxy host: '%s'", url.Host) return DialerFunc(func(network, address string) (net.Conn, error) { var err error var addresses []string @@ -80,7 +80,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { if config.LocalResolve { addresses, err = net.LookupHost(host) if err != nil { - logp.Warn(`DNS lookup failure "%s": %v`, host, err) + log.Warnf(`DNS lookup failure "%s": %v`, host, err) return nil, err } } else { diff --git a/libbeat/outputs/transport/tcp.go b/libbeat/outputs/transport/tcp.go index 8ffc70debe1..2b6eb6f4cba 100644 --- a/libbeat/outputs/transport/tcp.go +++ b/libbeat/outputs/transport/tcp.go @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer { d.Fatal("dns lookup", err) d.Info("addresses", strings.Join(addresses, ", ")) if err != nil { - logp.Warn(`DNS lookup failure "%s": %v`, host, err) + logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %v`, host, err) return nil, err } diff --git a/libbeat/outputs/transport/transport.go b/libbeat/outputs/transport/transport.go index 7ff01c2fa30..4b9a295333f 100644 --- a/libbeat/outputs/transport/transport.go +++ b/libbeat/outputs/transport/transport.go @@ -20,8 +20,6 @@ package transport import ( "errors" "net" - - "github.com/elastic/beats/v7/libbeat/logp" ) type Dialer interface { @@ -30,10 +28,10 @@ type Dialer interface { type DialerFunc func(network, address string) (net.Conn, error) +const logSelector = "transport" + var ( ErrNotConnected = errors.New("client is not connected") - - debugf = logp.MakeDebug("transport") ) func (d DialerFunc) Dial(network, address string) (net.Conn, error) { From 7da851666f7addbc3a33947f20ce55bb94818a30 Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 3 Mar 2020 16:32:01 +0100 Subject: [PATCH 2/7] fix format --- libbeat/monitoring/adapter/go-metrics.go | 8 ++++---- libbeat/monitoring/report/elasticsearch/client.go | 4 ++-- .../monitoring/report/elasticsearch/elasticsearch.go | 1 + libbeat/outputs/elasticsearch/elasticsearch.go | 1 - libbeat/outputs/fileout/file.go | 6 +++--- libbeat/outputs/kafka/client.go | 10 +++++----- libbeat/outputs/kafka/kafka.go | 3 +-- libbeat/outputs/kafka/log.go | 4 ++-- libbeat/outputs/kafka/partition.go | 2 +- libbeat/outputs/redis/client.go | 2 +- libbeat/outputs/transport/client.go | 4 ++-- 11 files changed, 22 insertions(+), 23 deletions(-) diff --git a/libbeat/monitoring/adapter/go-metrics.go b/libbeat/monitoring/adapter/go-metrics.go index 2e67520b658..5cc703ecad4 100644 --- a/libbeat/monitoring/adapter/go-metrics.go +++ b/libbeat/monitoring/adapter/go-metrics.go @@ -42,7 +42,7 @@ import ( type GoMetricsRegistry struct { mutex sync.Mutex - log *logp.Logger + log *logp.Logger reg *monitoring.Registry filters *metricFilters @@ -67,12 +67,12 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil // NewGoMetrics creates and registers a new GoMetricsRegistry with the parent // registry. func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry { - return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar),filters...) + return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar), filters...) } -func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry{ +func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry { return &GoMetricsRegistry{ - log: logp.NewLogger("monitoring"), + log: logp.NewLogger("monitoring"), reg: reg, shadow: metrics.NewRegistry(), filters: makeFilters(filters...), diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index ff3545db1cf..76be0d74b0c 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -36,7 +36,7 @@ import ( var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0") type publishClient struct { - log *logp.Logger + log *logp.Logger es *esout.Client params map[string]string format report.Format @@ -48,7 +48,7 @@ func newPublishClient( format report.Format, ) (*publishClient, error) { p := &publishClient{ - log: logp.NewLogger(selector), + log: logp.NewLogger(selector), es: es, params: params, format: format, diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 86749d2a1d6..1ef7430fea7 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -59,6 +59,7 @@ type reporter struct { out []outputs.NetworkClient } + const selector = "monitoring" var errNoMonitoring = errors.New("xpack monitoring not available") diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index d2cc2170789..e71e9b14a5f 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -37,7 +37,6 @@ func init() { outputs.RegisterType("elasticsearch", makeES) } - var ( // ErrNotConnected indicates failure due to client having no valid connection ErrNotConnected = errors.New("not connected") diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 7e4a4b36821..cf897557c35 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -35,7 +35,7 @@ func init() { } type fileOutput struct { - log *logp.Logger + log *logp.Logger filePath string beat beat.Info observer outputs.Observer @@ -59,7 +59,7 @@ func makeFileout( cfg.SetInt("bulk_max_size", -1, -1) fo := &fileOutput{ - log: logp.NewLogger("file"), + log: logp.NewLogger("file"), beat: beat, observer: observer, } @@ -129,7 +129,7 @@ func (out *fileOutput) Publish( } else { out.log.Warnf("Failed to serialize the event: %v", err) } - out.log.Debugf( "Failed event: %v", event) + out.log.Debugf("Failed event: %v", event) dropped++ continue diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index b3b43839e2b..f8ebd46f179 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -37,7 +37,7 @@ import ( ) type client struct { - log *logp.Logger + log *logp.Logger observer outputs.Observer hosts []string topic outil.Selector @@ -76,7 +76,7 @@ func newKafkaClient( cfg *sarama.Config, ) (*client, error) { c := &client{ - log: logp.NewLogger(logSelector), + log: logp.NewLogger(logSelector), observer: observer, hosts: hosts, topic: topic, @@ -167,7 +167,7 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err := data.Cache.GetValue("partition") if err == nil { - if c.log.IsDebug(){ + if c.log.IsDebug() { c.log.Debugf("got event.Meta[\"partition\"] = %v", value) } if partition, ok := value.(int32); ok { @@ -177,7 +177,7 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { value, err = data.Cache.GetValue("topic") if err == nil { - if c.log.IsDebug(){ + if c.log.IsDebug() { c.log.Debugf("got event.Meta[\"topic\"] = %v", value) } if topic, ok := value.(string); ok { @@ -201,7 +201,7 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) { serializedEvent, err := c.codec.Encode(c.index, event) if err != nil { - if c.log.IsDebug(){ + if c.log.IsDebug() { c.log.Debugf("failed event: %v", event) } return nil, err diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 9f71edb8a68..b8c6c4dcfff 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -41,14 +41,13 @@ const ( logSelector = "kafka" ) - var ( errNoTopicSet = errors.New("No topic configured") errNoHosts = errors.New("No hosts configured") ) func init() { - sarama.Logger = kafkaLogger{log:logp.NewLogger(logSelector)} + sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)} outputs.RegisterType("kafka", makeKafka) } diff --git a/libbeat/outputs/kafka/log.go b/libbeat/outputs/kafka/log.go index 81a8dd0fba1..29ebf6cf869 100644 --- a/libbeat/outputs/kafka/log.go +++ b/libbeat/outputs/kafka/log.go @@ -23,7 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -type kafkaLogger struct{ +type kafkaLogger struct { log *logp.Logger } @@ -49,7 +49,7 @@ func (kl kafkaLogger) Log(format string, v ...interface{}) { } } } - if kl.log == nil{ + if kl.log == nil { kl.log = logp.NewLogger(logSelector) } if warn { diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go index 57bb632b247..654fabc66d3 100644 --- a/libbeat/outputs/kafka/partition.go +++ b/libbeat/outputs/kafka/partition.go @@ -193,7 +193,7 @@ func cfgRoundRobinPartitioner(_ *logp.Logger, config *common.Config) (func() par }, nil } -func cfgHashPartitioner(log *logp.Logger,config *common.Config) (func() partitioner, error) { +func cfgHashPartitioner(log *logp.Logger, config *common.Config) (func() partitioner, error) { cfg := struct { Hash []string `config:"hash"` Random bool `config:"random"` diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 0205b3aa241..d97e861ba52 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -75,7 +75,7 @@ func newClient( index string, codec codec.Codec, ) *client { return &client{ - log: logp.NewLogger("redis"), + log: logp.NewLogger("redis"), Client: tc, observer: observer, timeout: timeout, diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index 16bdee70cf5..7c2c0b898ea 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -28,7 +28,7 @@ import ( ) type Client struct { - log *logp.Logger + log *logp.Logger dialer Dialer network string host string @@ -93,7 +93,7 @@ func NewClientWithDialer(d Dialer, c *Config, network, host string, defaultPort } client := &Client{ - log: logp.NewLogger(logSelector), + log: logp.NewLogger(logSelector), dialer: d, network: network, host: host, From 466f0277974b9cde7e17c79f3409957d794c5ef8 Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 3 Mar 2020 17:08:30 +0100 Subject: [PATCH 3/7] go mod tidy --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index de1bb8b07f9..5b050f213fa 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 + github.com/elastic/beats v7.6.0+incompatible github.com/elastic/ecs v1.4.0 github.com/elastic/go-libaudit v0.4.0 github.com/elastic/go-licenser v0.2.1 diff --git a/go.sum b/go.sum index d95286d6c56..c4f38904244 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 h1:DW6WrARxK5J+o8uAKCiACi5wy9EK1UzrsCpGBPsKHAA= github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/elastic/beats v7.6.0+incompatible h1:vUcDbZ/qsvox6pC/t/tSdSItlt1/pKbsKaHEVS4GWss= +github.com/elastic/beats v7.6.0+incompatible/go.mod h1:7cX7zGsOwJ01FLkZs9Tg5nBdnQi6XB3hYAyWekpKgeY= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ecs v1.4.0 h1:BGIUwWJhThRO2IQxzm7ekV9TMUGwZoYyevT5/1xmMf0= From 4390ce8452c5f627ce5725bde20e9f9b29de2ffe Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 3 Mar 2020 17:16:22 +0100 Subject: [PATCH 4/7] revert go mod tidy changes after merging master --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 5b050f213fa..de1bb8b07f9 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,6 @@ require ( github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 - github.com/elastic/beats v7.6.0+incompatible github.com/elastic/ecs v1.4.0 github.com/elastic/go-libaudit v0.4.0 github.com/elastic/go-licenser v0.2.1 diff --git a/go.sum b/go.sum index c4f38904244..d95286d6c56 100644 --- a/go.sum +++ b/go.sum @@ -209,8 +209,6 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 h1:DW6WrARxK5J+o8uAKCiACi5wy9EK1UzrsCpGBPsKHAA= github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= -github.com/elastic/beats v7.6.0+incompatible h1:vUcDbZ/qsvox6pC/t/tSdSItlt1/pKbsKaHEVS4GWss= -github.com/elastic/beats v7.6.0+incompatible/go.mod h1:7cX7zGsOwJ01FLkZs9Tg5nBdnQi6XB3hYAyWekpKgeY= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqrj3lotWinO9+jFmeDXIC4gvIQs= github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ecs v1.4.0 h1:BGIUwWJhThRO2IQxzm7ekV9TMUGwZoYyevT5/1xmMf0= From f706c08c4421cdbbf91bd170afd3a1139a0f7e57 Mon Sep 17 00:00:00 2001 From: simitt Date: Wed, 4 Mar 2020 15:32:49 +0100 Subject: [PATCH 5/7] Use %+v for error logging --- libbeat/monitoring/adapter/go-metrics.go | 2 +- .../monitoring/report/elasticsearch/client.go | 6 +-- .../report/elasticsearch/elasticsearch.go | 2 +- libbeat/outputs/console/console.go | 6 +-- libbeat/outputs/elasticsearch/api_test.go | 2 +- libbeat/outputs/elasticsearch/bulkapi.go | 4 +- libbeat/outputs/elasticsearch/client.go | 41 +++++++++---------- .../outputs/elasticsearch/elasticsearch.go | 6 +-- libbeat/outputs/fileout/file.go | 8 ++-- libbeat/outputs/kafka/client.go | 6 +-- libbeat/outputs/kafka/config.go | 2 +- libbeat/outputs/kafka/partition.go | 2 +- libbeat/outputs/logstash/async.go | 2 +- libbeat/outputs/logstash/sync.go | 4 +- libbeat/outputs/redis/client.go | 14 +++---- libbeat/outputs/transport/client.go | 2 +- libbeat/outputs/transport/proxy.go | 2 +- libbeat/outputs/transport/tcp.go | 2 +- 18 files changed, 56 insertions(+), 57 deletions(-) diff --git a/libbeat/monitoring/adapter/go-metrics.go b/libbeat/monitoring/adapter/go-metrics.go index 5cc703ecad4..ecc0846919b 100644 --- a/libbeat/monitoring/adapter/go-metrics.go +++ b/libbeat/monitoring/adapter/go-metrics.go @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() { r.shadow.UnregisterAll() err := r.reg.Clear() if err != nil { - r.log.Errorf("Failed to clear registry: %v", err) + r.log.Errorf("Failed to clear registry: %+v", err) } } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 76be0d74b0c..42517219a90 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -110,7 +110,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { // Extract type t, err := event.Content.Meta.GetValue("type") if err != nil { - c.log.Errorf("Type not available in monitoring reported. Please report this error: %s", err) + c.log.Errorf("Type not available in monitoring reported. Please report this error: %+v", err) continue } @@ -251,14 +251,14 @@ func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report. reader := esout.NewJSONReader(result) err := esout.BulkReadToItems(reader) if err != nil { - log.Errorf("failed to parse monitoring bulk items: %v", err) + log.Errorf("failed to parse monitoring bulk items: %+v", err) return } for i := range events { status, msg, err := esout.BulkReadItemStatus(log, reader) if err != nil { - log.Errorf("failed to parse monitoring bulk item status: %v", err) + log.Errorf("failed to parse monitoring bulk item status: %+v", err) return } switch { diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 1ef7430fea7..e7b0d6b1713 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -254,7 +254,7 @@ func (r *reporter) initLoop(c config) { log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying. Error: ", err) logged = true } - r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %v", err) + r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %+v", err) } select { diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 9a687af3f8a..79aee6957d6 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -133,20 +133,20 @@ func (c *console) publishEvent(event *publisher.Event) bool { return false } - c.log.Errorf("Unable to encode event: %v", err) + c.log.Errorf("Unable to encode event: %+v", err) c.log.Debugf("Failed event: %v", event) return false } if err := c.writeBuffer(serializedEvent); err != nil { c.observer.WriteError(err) - c.log.Errorf("Unable to publish events to console: %v", err) + c.log.Errorf("Unable to publish events to console: %+v", err) return false } if err := c.writeBuffer(nl); err != nil { c.observer.WriteError(err) - c.log.Errorf("Error when appending newline to event: %v", err) + c.log.Errorf("Error when appending newline to event: %+v", err) return false } diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index e9bcd3e304a..3706dd64511 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -188,7 +188,7 @@ func newTestClient(url string) *Client { func (r QueryResult) String() string { out, err := json.Marshal(r) if err != nil { - logp.NewLogger(logSelector).Warnf("failed to marshal QueryResult (%v): %#v", err, r) + logp.NewLogger(logSelector).Warnf("failed to marshal QueryResult (%+v): %#v", err, r) return "ERROR" } return string(out) diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 712f85a7abd..325b54e9bef 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -209,7 +209,7 @@ func bulkEncode(log *logp.Logger, out bulkWriter, metaBuilder MetaBuilder, body if metaBuilder == nil { for _, obj := range body { if err := out.AddRaw(obj); err != nil { - log.Debugf("Failed to encode message: %s", err) + log.Debugf("Failed to encode message: %+v", err) return err } } @@ -217,7 +217,7 @@ func bulkEncode(log *logp.Logger, out bulkWriter, metaBuilder MetaBuilder, body for _, obj := range body { meta := metaBuilder(obj) if err := out.Add(meta, obj); err != nil { - log.Debugf("Failed to encode event (dropping event): %s", err) + log.Debugf("Failed to encode event (dropping event): %+v", err) } } } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c50cb4e606c..67b23cd699a 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -21,7 +21,6 @@ import ( "bytes" "encoding/base64" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -29,6 +28,8 @@ import ( "net/url" "time" + "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -351,7 +352,7 @@ func (client *Client) publishEvents( requ.Reset(body) status, result, sendErr := client.sendBulkRequest(requ) if sendErr != nil { - client.Connection.log.Error("Failed to perform any bulk index operations: %s", sendErr) + client.Connection.log.Error("Failed to perform any bulk index operations: %+v", sendErr) return data, sendErr } @@ -408,11 +409,11 @@ func bulkEncodePublishRequest( event := &data[i].Content meta, err := createEventBulkMeta(log, version, index, pipeline, eventType, event) if err != nil { - log.Errorf("Failed to encode event meta data: %s", err) + log.Errorf("Failed to encode event meta data: %+v", err) continue } if err := body.Add(meta, event); err != nil { - log.Errorf("Failed to encode event: %s", err) + log.Errorf("Failed to encode event: %+v", err) log.Debugf("Failed event: %v", event) continue } @@ -491,7 +492,7 @@ func bulkCollectPublishFails( data []publisher.Event, ) ([]publisher.Event, bulkResultStats) { if err := BulkReadToItems(reader); err != nil { - log.Errorf("failed to parse bulk response: %v", err.Error()) + log.Errorf("failed to parse bulk response: %+v", err) return nil, bulkResultStats{} } @@ -501,6 +502,7 @@ func bulkCollectPublishFails( for i := 0; i < count; i++ { status, msg, err := BulkReadItemStatus(log, reader) if err != nil { + log.Error(err) return nil, bulkResultStats{} } @@ -577,33 +579,31 @@ func BulkReadItemStatus(log *logp.Logger, reader *JSONReader) (int, []byte, erro // find first field in outer dictionary (e.g. 'create') kind, _, err := reader.nextFieldName() + parserErr := func(err error) error { + return errors.Wrapf(err, "Failed to parse bulk response item") + } if err != nil { - log.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err + return 0, nil, parserErr(err) } if kind == dictEnd { err = errUnexpectedEmptyObject - log.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err + return 0, nil, parserErr(err) } // parse actual item response code and error message status, msg, err := itemStatusInner(log, reader) if err != nil { - log.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err + return 0, nil, parserErr(err) } // close dictionary. Expect outer dictionary to have only one element kind, _, err = reader.step() if err != nil { - log.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err + return 0, nil, parserErr(err) } if kind != dictEnd { err = errExpectedObjectEnd - log.Errorf("Failed to parse bulk response item: %s", err) - return 0, nil, err + return 0, nil, parserErr(err) } return status, msg, nil @@ -619,7 +619,7 @@ func itemStatusInner(log *logp.Logger, reader *JSONReader) (int, []byte, error) for { kind, name, err := reader.nextFieldName() if err != nil { - log.Errorf("Failed to parse bulk response item: %s", err) + log.Errorf("Failed to parse bulk response item: %+v", err) } if kind == dictEnd { break @@ -629,7 +629,6 @@ func itemStatusInner(log *logp.Logger, reader *JSONReader) (int, []byte, error) case bytes.Equal(name, nameStatus): // name == "status" status, err = reader.nextInt() if err != nil { - log.Errorf("Failed to parse bulk response item: %s", err) return 0, nil, err } @@ -722,7 +721,7 @@ func (conn *Connection) Connect() error { } if version, err := common.NewVersion(versionString); err != nil { - conn.log.Errorf("Invalid version from Elasticsearch: %v", versionString) + conn.log.Errorf("Invalid version from Elasticsearch: %s", versionString) conn.version = common.Version{} } else { conn.version = *version @@ -741,7 +740,7 @@ func (conn *Connection) Ping() (string, error) { status, body, err := conn.execRequest("GET", conn.URL, nil) if err != nil { - conn.log.Debugf("Ping request failed with: %v", err) + conn.log.Debugf("Ping request failed with: %+v", err) return "", err } @@ -795,7 +794,7 @@ func (conn *Connection) RequestURL( } if err := conn.encoder.Marshal(body); err != nil { - conn.log.Warnf("Failed to json encode body (%v): %#v", err, body) + conn.log.Warnf("Failed to json encode body (%+v): %#v", err, body) return 0, nil, ErrJSONEncodeFailed } return conn.execRequest(method, url, conn.encoder.Reader()) @@ -868,6 +867,6 @@ func (conn *Connection) GetVersion() common.Version { func closing(log *logp.Logger, c io.Closer) { err := c.Close() if err != nil { - log.Warnf("Close failed with: %v", err) + log.Warnf("Close failed with: %+v", err) } } diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index e71e9b14a5f..b80fd832d1f 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -183,7 +183,7 @@ func makeES( for i, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - log.Errorf("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %+v", host, err) return outputs.Fail(err) } @@ -257,7 +257,7 @@ func NewConnectedClient(cfg *common.Config) (*Client, error) { for _, client := range clients { err = client.Connect() if err != nil { - client.Connection.log.Errorf("Error connecting to Elasticsearch at %v: %v", client.Connection.URL, err) + client.Connection.log.Errorf("Error connecting to Elasticsearch at %v: %+v", client.Connection.URL, err) err = fmt.Errorf("Error connection to Elasticsearch %v: %v", client.Connection.URL, err) errors = append(errors, err.Error()) continue @@ -309,7 +309,7 @@ func NewElasticsearchClients(cfg *common.Config) ([]Client, error) { for _, host := range hosts { esURL, err := common.MakeURL(config.Protocol, config.Path, host, 9200) if err != nil { - log.Errorf("Invalid host param set: %s, Error: %v", host, err) + log.Errorf("Invalid host param set: %s, Error: %+v", host, err) return nil, err } diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index cf897557c35..c3f5d3c5e4e 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -125,9 +125,9 @@ func (out *fileOutput) Publish( serializedEvent, err := out.codec.Encode(out.beat.Beat, &event.Content) if err != nil { if event.Guaranteed() { - out.log.Errorf("Failed to serialize the event: %v", err) + out.log.Errorf("Failed to serialize the event: %+v", err) } else { - out.log.Warnf("Failed to serialize the event: %v", err) + out.log.Warnf("Failed to serialize the event: %+v", err) } out.log.Debugf("Failed event: %v", event) @@ -139,9 +139,9 @@ func (out *fileOutput) Publish( st.WriteError(err) if event.Guaranteed() { - out.log.Errorf("Writing event to file failed with: %v", err) + out.log.Errorf("Writing event to file failed with: %+v", err) } else { - out.log.Warnf("Writing event to file failed with: %v", err) + out.log.Warnf("Writing event to file failed with: %+v", err) } dropped++ diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index f8ebd46f179..b1920d14e99 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -97,7 +97,7 @@ func (c *client) Connect() error { // try to connect producer, err := sarama.NewAsyncProducer(c.hosts, &c.config) if err != nil { - c.log.Errorf("Kafka connect fails with: %v", err) + c.log.Errorf("Kafka connect fails with: %+v", err) return err } @@ -143,7 +143,7 @@ func (c *client) Publish(batch publisher.Batch) error { d := &events[i] msg, err := c.getEventMessage(d) if err != nil { - c.log.Errorf("Dropping event: %v", err) + c.log.Errorf("Dropping event: %+v", err) ref.done() c.observer.Dropped(1) continue @@ -288,7 +288,7 @@ func (r *msgRef) dec() { stats.Acked(success) } - r.client.log.Debugf("Kafka publish failed with: %v", err) + r.client.log.Debugf("Kafka publish failed with: %+v", err) } else { r.batch.ACK() stats.Acked(r.total) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 9f108eb0e3d..e4866bfe9c5 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -283,7 +283,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err ) if err := k.Validate(); err != nil { - log.Errorf("Invalid kafka configuration: %v", err) + log.Errorf("Invalid kafka configuration: %+v", err) return nil, err } return k, nil diff --git a/libbeat/outputs/kafka/partition.go b/libbeat/outputs/kafka/partition.go index 654fabc66d3..fb0b83a2ffe 100644 --- a/libbeat/outputs/kafka/partition.go +++ b/libbeat/outputs/kafka/partition.go @@ -256,7 +256,7 @@ func makeFieldsHashPartitioner(log *logp.Logger, fields []string, dropFail bool) if err != nil { if dropFail { - log.Errorf("Hashing partition key failed: %v", err) + log.Errorf("Hashing partition key failed: %+v", err) return -1, err } diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 79b89c19ec8..c19a6eb1073 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -276,5 +276,5 @@ func (r *msgRef) dec() { } r.batch.RetryEvents(r.slice) - r.client.log.Errorf("Failed to publish events caused by: %v", err) + r.client.log.Errorf("Failed to publish events caused by: %+v", err) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 85497ae2059..649189e8c3d 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -96,7 +96,7 @@ func (c *syncClient) Close() error { func (c *syncClient) reconnect() error { if err := c.Client.Close(); err != nil { - c.log.Errorf("error closing connection to logstash host %s: %s, reconnecting...", c.Host(), err) + c.log.Errorf("error closing connection to logstash host %s: %+v, reconnecting...", c.Host(), err) } return c.Client.Connect() } @@ -155,7 +155,7 @@ func (c *syncClient) Publish(batch publisher.Batch) error { } _ = c.Close() - c.log.Errorf("Failed to publish events caused by: %v", err) + c.log.Errorf("Failed to publish events caused by: %+v", err) rest := len(events) st.Failed(rest) diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index d97e861ba52..15c07b0d5b3 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -235,7 +235,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { // RPUSH returns total length of list -> fail and retry all on error _, err := conn.Do(command, args...) if err != nil { - c.log.Errorf("Failed to %v to redis list with: %v", command, err) + c.log.Errorf("Failed to %v to redis list with: %+v", command, err) return okEvents, err } @@ -260,14 +260,14 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF for i, serializedEvent := range serialized { eventKey, err := key.Select(&okEvents[i].Content) if err != nil { - c.log.Errorf("Failed to set redis key: %v", err) + c.log.Errorf("Failed to set redis key: %+v", err) dropped++ continue } data = append(data, okEvents[i]) if err := conn.Send(command, eventKey, serializedEvent); err != nil { - c.log.Errorf("Failed to execute %v: %v", command, err) + c.log.Errorf("Failed to execute %v: %+v", command, err) return okEvents, err } } @@ -283,12 +283,12 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF _, err := conn.Receive() if err != nil { if _, ok := err.(redis.Error); ok { - c.log.Errorf("Failed to %v event to list with %v", + c.log.Errorf("Failed to %v event to list with %+v", command, err) failed = append(failed, data[i]) lastErr = err } else { - c.log.Errorf("Failed to %v multiple events to list with %v", + c.log.Errorf("Failed to %v multiple events to list with %+v", command, err) failed = append(failed, data[i:]...) lastErr = err @@ -315,7 +315,7 @@ func serializeEvents( for _, d := range data { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - log.Errorf("Encoding event failed with error: %v", err) + log.Errorf("Encoding event failed with error: %+v", err) log.Debugf("Failed event: %v", d.Content) goto failLoop } @@ -333,7 +333,7 @@ failLoop: for _, d := range rest { serializedEvent, err := codec.Encode(index, &d.Content) if err != nil { - log.Errorf("Encoding event failed with error: %v", err) + log.Errorf("Encoding event failed with error: %+v", err) log.Debugf("Failed event: %v", d.Content) i++ continue diff --git a/libbeat/outputs/transport/client.go b/libbeat/outputs/transport/client.go index 7c2c0b898ea..af345ca7a4b 100644 --- a/libbeat/outputs/transport/client.go +++ b/libbeat/outputs/transport/client.go @@ -219,7 +219,7 @@ func (c *Client) SetWriteDeadline(t time.Time) error { func (c *Client) handleError(err error) error { if err != nil { - c.log.Debugf("handle error: %v", err) + c.log.Debugf("handle error: %+v", err) if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) { _ = c.Close() diff --git a/libbeat/outputs/transport/proxy.go b/libbeat/outputs/transport/proxy.go index 18aa9bc6c8d..381aa463206 100644 --- a/libbeat/outputs/transport/proxy.go +++ b/libbeat/outputs/transport/proxy.go @@ -80,7 +80,7 @@ func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, if config.LocalResolve { addresses, err = net.LookupHost(host) if err != nil { - log.Warnf(`DNS lookup failure "%s": %v`, host, err) + log.Warnf(`DNS lookup failure "%s": %+v`, host, err) return nil, err } } else { diff --git a/libbeat/outputs/transport/tcp.go b/libbeat/outputs/transport/tcp.go index 2b6eb6f4cba..2b84051b7aa 100644 --- a/libbeat/outputs/transport/tcp.go +++ b/libbeat/outputs/transport/tcp.go @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer { d.Fatal("dns lookup", err) d.Info("addresses", strings.Join(addresses, ", ")) if err != nil { - logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %v`, host, err) + logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %+v`, host, err) return nil, err } From 058b55d88d90cbd58e8217e870ac8e9d727c2915 Mon Sep 17 00:00:00 2001 From: simitt Date: Wed, 4 Mar 2020 15:41:19 +0100 Subject: [PATCH 6/7] Add dev changelog entry --- CHANGELOG-developer.next.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 7f96e3b10c2..d3175e1a388 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -31,6 +31,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Python 3 is required now to run python tests and tools. {pull}14798[14798] - The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667] - `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691] +- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761] + ==== Bugfixes From 23bbf89cb5c530fb7961675d4246b061c3e1dbaf Mon Sep 17 00:00:00 2001 From: simitt Date: Thu, 5 Mar 2020 11:30:23 +0100 Subject: [PATCH 7/7] Additional changes after merging --- libbeat/common/transport/client.go | 7 +++-- libbeat/common/transport/tlscommon/tls.go | 27 +++++++++++-------- .../common/transport/tlscommon/tls_config.go | 2 +- libbeat/common/transport/transport.go | 2 -- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/libbeat/common/transport/client.go b/libbeat/common/transport/client.go index 62fd3193d34..7027d1a3142 100644 --- a/libbeat/common/transport/client.go +++ b/libbeat/common/transport/client.go @@ -24,10 +24,12 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/testing" ) type Client struct { + log *logp.Logger dialer Dialer network string host string @@ -75,6 +77,7 @@ func NewClientWithDialer(d Dialer, c Config, network, host string, defaultPort i } client := &Client{ + log: logp.NewLogger(logSelector), dialer: d, network: network, host: host, @@ -112,7 +115,7 @@ func (c *Client) Close() error { defer c.mutex.Unlock() if c.conn != nil { - debugf("closing") + c.log.Debug("closing") err := c.conn.Close() c.conn = nil return err @@ -199,7 +202,7 @@ func (c *Client) SetWriteDeadline(t time.Time) error { func (c *Client) handleError(err error) error { if err != nil { - debugf("handle error: %v", err) + c.log.Debugf("handle error: %+v", err) if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) { _ = c.Close() diff --git a/libbeat/common/transport/tlscommon/tls.go b/libbeat/common/transport/tlscommon/tls.go index 432073226ef..67661c89bd4 100644 --- a/libbeat/common/transport/tlscommon/tls.go +++ b/libbeat/common/transport/tlscommon/tls.go @@ -29,6 +29,8 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) +const logSelector = "tls" + // LoadCertificate will load a certificate from disk and return a tls.Certificate or error func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) { certificate := config.Certificate @@ -46,31 +48,33 @@ func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) { return nil, nil } - certPEM, err := ReadPEMFile(certificate, config.Passphrase) + log := logp.NewLogger(logSelector) + + certPEM, err := ReadPEMFile(log, certificate, config.Passphrase) if err != nil { - logp.Critical("Failed reading certificate file %v: %+v", certificate, err) + log.Errorf("Failed reading certificate file %v: %+v", certificate, err) return nil, fmt.Errorf("%v %v", err, certificate) } - keyPEM, err := ReadPEMFile(key, config.Passphrase) + keyPEM, err := ReadPEMFile(log, key, config.Passphrase) if err != nil { - logp.Critical("Failed reading key file %v: %+v", key, err) + log.Errorf("Failed reading key file %v: %+v", key, err) return nil, fmt.Errorf("%v %v", err, key) } cert, err := tls.X509KeyPair(certPEM, keyPEM) if err != nil { - logp.Critical("Failed loading client certificate %+v", err) + log.Errorf("Failed loading client certificate %+v", err) return nil, err } - logp.Debug("tls", "loading certificate: %v and key %v", certificate, key) + log.Debugf("tls", "loading certificate: %v and key %v", certificate, key) return &cert, nil } // ReadPEMFile reads a PEM format file on disk and decrypt it with the privided password and // return the raw content. -func ReadPEMFile(path, passphrase string) ([]byte, error) { +func ReadPEMFile(log *logp.Logger, path, passphrase string) ([]byte, error) { pass := []byte(passphrase) var blocks []*pem.Block @@ -102,7 +106,7 @@ func ReadPEMFile(path, passphrase string) ([]byte, error) { } if err != nil { - logp.Err("Dropping encrypted pem '%v' block read from %v. %v", + log.Errorf("Dropping encrypted pem '%v' block read from %v. %+v", block.Type, path, err) continue } @@ -138,21 +142,22 @@ func LoadCertificateAuthorities(CAs []string) (*x509.CertPool, []error) { return nil, nil } + log := logp.NewLogger(logSelector) roots := x509.NewCertPool() for _, path := range CAs { pemData, err := ioutil.ReadFile(path) if err != nil { - logp.Critical("Failed reading CA certificate: %v", err) + log.Errorf("Failed reading CA certificate: %+v", err) errors = append(errors, fmt.Errorf("%v reading %v", err, path)) continue } if ok := roots.AppendCertsFromPEM(pemData); !ok { - logp.Critical("Failed to add CA to the cert pool, CA is not a valid PEM file") + log.Error("Failed to add CA to the cert pool, CA is not a valid PEM file") errors = append(errors, fmt.Errorf("%v adding %v to the list of known CAs", ErrNotACertificate, path)) continue } - logp.Debug("tls", "successfully loaded CA certificate: %v", path) + log.Debugf("tls", "successfully loaded CA certificate: %v", path) } return roots, errors diff --git a/libbeat/common/transport/tlscommon/tls_config.go b/libbeat/common/transport/tlscommon/tls_config.go index 346d796ee82..7dcd2162aa1 100644 --- a/libbeat/common/transport/tlscommon/tls_config.go +++ b/libbeat/common/transport/tlscommon/tls_config.go @@ -80,7 +80,7 @@ func (c *TLSConfig) ToConfig() *tls.Config { minVersion, maxVersion := extractMinMaxVersion(c.Versions) insecure := c.Verification != VerifyFull if insecure { - logp.Warn("SSL/TLS verifications disabled.") + logp.NewLogger("tls").Warn("SSL/TLS verifications disabled.") } // When we are usign the CAsha256 pin to validate the CA used to validate the chain diff --git a/libbeat/common/transport/transport.go b/libbeat/common/transport/transport.go index 24f86dff9a8..35b397c5876 100644 --- a/libbeat/common/transport/transport.go +++ b/libbeat/common/transport/transport.go @@ -32,8 +32,6 @@ type DialerFunc func(network, address string) (net.Conn, error) var ( ErrNotConnected = errors.New("client is not connected") - - debugf = logp.MakeDebug("transport") ) func (d DialerFunc) Dial(network, address string) (net.Conn, error) {