From 155122c05e92398fac2c77cc0ae4c72531f27a64 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 9 Aug 2024 17:38:37 +0200 Subject: [PATCH] rate limit Elasticsearch client indexing error logs Use `periodic.Doer` to rate limit indexing error logs. This does not affects the logs sent to the event logger. --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 4 +-- go.mod | 2 +- go.sum | 4 +-- libbeat/outputs/elasticsearch/client.go | 40 +++++++++++++++++++++---- 5 files changed, 40 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2555fdf2c011..8dc59db33ebd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix FQDN being lowercased when used as `host.hostname` {issue}39993[39993] - Beats won't log start up information when running under the Elastic Agent {40390}40390[40390] - Filebeat now needs `dup3`, `faccessat2`, `prctl` and `setrlimit` syscalls to run the journald input. If this input is not being used, the syscalls are not needed. All Beats have those syscalls allowed now because the default seccomp policy is global to all Beats. {pull}40061[40061] +- Beats will rate limit the logs about errors when indexing events on Elasticsearch, logging a summary every 10s. The logs sent to the event log is unchanged. {issue}40157[40157] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index a4d77fac8092..4534c6fd177c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13000,11 +13000,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.9.15 +Version: v0.10.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.9.15/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.10.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index a621e7507223..f97a5dc549dc 100644 --- a/go.mod +++ b/go.mod @@ -193,7 +193,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.8.1 - github.com/elastic/elastic-agent-libs v0.9.15 + github.com/elastic/elastic-agent-libs v0.10.0 github.com/elastic/elastic-agent-system-metrics v0.11.0 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff diff --git a/go.sum b/go.sum index d67c89724a54..c124b93cfa4e 100644 --- a/go.sum +++ b/go.sum @@ -560,8 +560,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M= -github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= +github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q= +github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8= github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 933d04c789ca..4ffd025e8cab 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/periodic" "github.com/elastic/elastic-agent-libs/testing" "github.com/elastic/elastic-agent-libs/version" ) @@ -58,7 +59,10 @@ type Client struct { // forwarded to this index. Otherwise, they will be dropped. deadLetterIndex string - log *logp.Logger + log *logp.Logger + rlLogIndex *periodic.Doer + rlLogIndexTryDeadLetter *periodic.Doer + rlLogDeadLetter *periodic.Doer } // clientSettings contains the settings for a client. @@ -154,12 +158,33 @@ func NewClient( return nil } - // Make sure there's a non-nil obser + // Make sure there's a non-nil observer observer := s.observer if observer == nil { observer = outputs.NewNilObserver() } + log := logp.NewLogger("elasticsearch") + + pLogDeadLetter := periodic.NewDoer(10*time.Second, + func(count uint64, d time.Duration) { + log.Errorf( + "Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d) + }) + pLogIndex := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.", + count, d) + }) + pLogIndexTryDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.", + count, d) + }) + + pLogDeadLetter.Start() + pLogIndex.Start() + pLogIndexTryDeadLetter.Start() client := &Client{ conn: *conn, indexSelector: s.indexSelector, @@ -167,7 +192,10 @@ func NewClient( observer: observer, deadLetterIndex: s.deadLetterIndex, - log: logp.NewLogger("elasticsearch"), + log: log, + rlLogDeadLetter: pLogDeadLetter, + rlLogIndex: pLogIndex, + rlLogIndexTryDeadLetter: pLogIndexTryDeadLetter, } return client, nil @@ -478,14 +506,14 @@ func (client *Client) applyItemStatus( if encodedEvent.deadLetter { // Fatal error while sending an already-failed event to the dead letter // index, drop. - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus) + client.rlLogDeadLetter.Add() client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false } if client.deadLetterIndex == "" { // Fatal error and no dead letter index, drop. - client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus) + client.rlLogIndex.Add() client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false @@ -494,7 +522,7 @@ func (client *Client) applyItemStatus( // We count this as a "retryable failure", and then if the dead letter // ingestion succeeds it is counted in the "deadLetter" counter // rather than the "acked" counter. - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus) + client.rlLogIndexTryDeadLetter.Add() client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) }